chore: move NSQ and simple package to appleboy/queue (#613)

Signed-off-by: Bo-Yi Wu <appleboy.tw@gmail.com>
This commit is contained in:
Bo-Yi Wu 2021-07-24 22:33:44 +08:00 committed by GitHub
parent cd14311f70
commit 7b16a4db98
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 33 additions and 514 deletions

2
go.mod
View File

@ -7,7 +7,7 @@ require (
github.com/appleboy/gin-status-api v1.1.0
github.com/appleboy/go-fcm v0.1.5
github.com/appleboy/gofight/v2 v2.1.2
github.com/appleboy/queue v0.0.2
github.com/appleboy/queue v0.0.3
github.com/asdine/storm/v3 v3.2.1
github.com/buger/jsonparser v1.1.1
github.com/dgraph-io/badger/v3 v3.2103.1

4
go.sum
View File

@ -40,8 +40,8 @@ github.com/appleboy/go-fcm v0.1.5 h1:fKbcZf/7vwGsvDkcop8a+kCHnK+tt4wXX0X7uEzwI6E
github.com/appleboy/go-fcm v0.1.5/go.mod h1:MSxZ4LqGRsnywOjnlXJXMqbjZrG4vf+0oHitfC9HRH0=
github.com/appleboy/gofight/v2 v2.1.2 h1:VOy3jow4vIK8BRQJoC/I9muxyYlJ2yb9ht2hZoS3rf4=
github.com/appleboy/gofight/v2 v2.1.2/go.mod h1:frW+U1QZEdDgixycTj4CygQ48yLTUhplt43+Wczp3rw=
github.com/appleboy/queue v0.0.2 h1:yjdRfa2G8Q/4NYaBdqR0m6GPjgdlWMc+NvSACR01LoE=
github.com/appleboy/queue v0.0.2/go.mod h1:mmhZvP5Zl0avp+LA4XL29Aad0Df39ZxOTIiMs/OhM2Q=
github.com/appleboy/queue v0.0.3 h1:rntqVTm6ilh80VCVQjwA0vDMCl1cfveq6GS6X98fKwE=
github.com/appleboy/queue v0.0.3/go.mod h1:6Mn0z4hURZW/26huvRXG0SJ4o7pBdo6hOryRiegy/4Q=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=

26
main.go
View File

@ -2,6 +2,7 @@ package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"log"
@ -18,13 +19,14 @@ import (
"github.com/appleboy/gorush/core"
"github.com/appleboy/gorush/logx"
"github.com/appleboy/gorush/notify"
"github.com/appleboy/gorush/queue/nsq"
"github.com/appleboy/gorush/queue/simple"
"github.com/appleboy/gorush/router"
"github.com/appleboy/gorush/rpc"
"github.com/appleboy/gorush/status"
"github.com/appleboy/queue"
"github.com/appleboy/queue/nsq"
"github.com/appleboy/queue/simple"
n "github.com/nsqio/go-nsq"
"golang.org/x/sync/errgroup"
)
@ -322,9 +324,27 @@ func main() {
case core.LocalQueue:
w = simple.NewWorker(
simple.WithQueueNum(int(cfg.Core.QueueNum)),
simple.WithRunFunc(func(msg queue.QueuedMessage) error {
notify.SendNotification(msg)
return nil
}),
)
case core.NSQ:
w = nsq.NewWorker()
w = nsq.NewWorker(
nsq.WithRunFunc(func(msg *n.Message) error {
if len(msg.Body) == 0 {
// Returning nil will automatically send a FIN command to NSQ to mark the message as processed.
// In this case, a message with an empty body is simply ignored/discarded.
return nil
}
var notification *notify.PushNotification
if err := json.Unmarshal(msg.Body, &notification); err != nil {
return err
}
notify.SendNotification(notification)
return nil
}),
)
default:
logx.LogError.Fatalf("we don't support queue engine: %s", cfg.Queue.Engine)
}

View File

@ -1,29 +0,0 @@
# NSQ
A realtime distributed messaging platform
## Setup
start the NSQ lookupd
```sh
nsqlookupd
```
start the NSQ server
```sh
nsqd --lookupd-tcp-address=localhost:4160
```
start the NSQ admin dashboard
```sh
nsqadmin --lookupd-http-address localhost:4161
```
## Testing
```sh
go test -v ./...
```

View File

@ -1,176 +0,0 @@
package nsq
import (
"encoding/json"
"runtime"
"sync"
"time"
"github.com/appleboy/gorush/notify"
"github.com/appleboy/queue"
"github.com/nsqio/go-nsq"
)
var _ queue.Worker = (*Worker)(nil)
// Option for queue system
type Option func(*Worker)
// Worker for NSQ
type Worker struct {
q *nsq.Consumer
p *nsq.Producer
startOnce sync.Once
maxInFlight int
addr string
topic string
channel string
runFunc func(msg *nsq.Message) error
}
// WithAddr setup the addr of NSQ
func WithAddr(addr string) Option {
return func(w *Worker) {
w.addr = addr
}
}
// WithTopic setup the topic of NSQ
func WithTopic(topic string) Option {
return func(w *Worker) {
w.topic = topic
}
}
// WithChannel setup the channel of NSQ
func WithChannel(channel string) Option {
return func(w *Worker) {
w.channel = channel
}
}
// WithRunFunc setup the run func of queue
func WithRunFunc(fn func(msg *nsq.Message) error) Option {
return func(w *Worker) {
w.runFunc = fn
}
}
// WithMaxInFlight Maximum number of messages to allow in flight (concurrency knob)
func WithMaxInFlight(num int) Option {
return func(w *Worker) {
w.maxInFlight = num
}
}
// NewWorker for struc
func NewWorker(opts ...Option) *Worker {
w := &Worker{
addr: "127.0.0.1:4150",
topic: "gorush",
channel: "ch",
maxInFlight: runtime.NumCPU(),
runFunc: func(msg *nsq.Message) error {
if len(msg.Body) == 0 {
// Returning nil will automatically send a FIN command to NSQ to mark the message as processed.
// In this case, a message with an empty body is simply ignored/discarded.
return nil
}
var notification *notify.PushNotification
if err := json.Unmarshal(msg.Body, &notification); err != nil {
return err
}
notify.SendNotification(notification)
return nil
},
}
// Loop through each option
for _, opt := range opts {
// Call the option giving the instantiated
opt(w)
}
cfg := nsq.NewConfig()
cfg.MaxInFlight = w.maxInFlight
q, err := nsq.NewConsumer(w.topic, w.channel, cfg)
if err != nil {
panic(err)
}
w.q = q
p, err := nsq.NewProducer(w.addr, cfg)
if err != nil {
panic(err)
}
w.p = p
return w
}
// BeforeRun run script before start worker
func (s *Worker) BeforeRun() error {
return nil
}
// AfterRun run script after start worker
func (s *Worker) AfterRun() error {
s.startOnce.Do(func() {
time.Sleep(100 * time.Millisecond)
err := s.q.ConnectToNSQD(s.addr)
if err != nil {
panic("Could not connect nsq server: " + err.Error())
}
})
return nil
}
// Run start the worker
func (s *Worker) Run(quit chan struct{}) error {
wg := &sync.WaitGroup{}
s.q.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error {
wg.Add(1)
defer wg.Done()
// run custom func
return s.runFunc(msg)
}))
// wait close signal
select {
case <-quit:
}
// wait job completed
wg.Wait()
return nil
}
// Shutdown worker
func (s *Worker) Shutdown() error {
s.q.Stop()
s.p.Stop()
return nil
}
// Capacity for channel
func (s *Worker) Capacity() int {
return 0
}
// Usage for count of channel usage
func (s *Worker) Usage() int {
return 0
}
// Queue send notification to queue
func (s *Worker) Queue(job queue.QueuedMessage) error {
err := s.p.Publish(s.topic, job.Bytes())
if err != nil {
return err
}
return nil
}

View File

@ -1,75 +0,0 @@
package nsq
import (
"testing"
"time"
"github.com/appleboy/gorush/logx"
"github.com/appleboy/queue"
"github.com/nsqio/go-nsq"
"github.com/stretchr/testify/assert"
)
var host = "nsq"
type mockMessage struct {
msg string
}
func (m mockMessage) Bytes() []byte {
return []byte(m.msg)
}
func TestMain(m *testing.M) {
m.Run()
}
func TestShutdown(t *testing.T) {
w := NewWorker(
WithAddr(host+":4150"),
WithTopic("test"),
)
q, err := queue.NewQueue(
queue.WithWorker(w),
queue.WithWorkerCount(2),
)
assert.NoError(t, err)
q.Start()
time.Sleep(1 * time.Second)
q.Shutdown()
// check shutdown once
q.Shutdown()
q.Wait()
}
func TestCustomFuncAndWait(t *testing.T) {
m := mockMessage{
msg: "foo",
}
w := NewWorker(
WithAddr(host+":4150"),
WithTopic("test"),
WithMaxInFlight(2),
WithRunFunc(func(msg *nsq.Message) error {
logx.LogAccess.Infof("get message: %s", msg.Body)
time.Sleep(500 * time.Millisecond)
return nil
}),
)
q, err := queue.NewQueue(
queue.WithWorker(w),
queue.WithWorkerCount(2),
)
assert.NoError(t, err)
q.Start()
time.Sleep(100 * time.Millisecond)
q.Queue(m)
q.Queue(m)
q.Queue(m)
q.Queue(m)
time.Sleep(600 * time.Millisecond)
q.Shutdown()
q.Wait()
// you will see the execute time > 1000ms
}

View File

@ -1,100 +0,0 @@
package simple
import (
"errors"
"runtime"
"github.com/appleboy/gorush/notify"
"github.com/appleboy/queue"
)
var _ queue.Worker = (*Worker)(nil)
// Option for queue system
type Option func(*Worker)
var errMaxCapacity = errors.New("max capacity reached")
// Worker for simple queue using channel
type Worker struct {
queueNotification chan queue.QueuedMessage
runFunc func(queue.QueuedMessage) error
}
// BeforeRun run script before start worker
func (s *Worker) BeforeRun() error {
return nil
}
// AfterRun run script after start worker
func (s *Worker) AfterRun() error {
return nil
}
// Run start the worker
func (s *Worker) Run(_ chan struct{}) error {
for notification := range s.queueNotification {
s.runFunc(notification)
}
return nil
}
// Shutdown worker
func (s *Worker) Shutdown() error {
close(s.queueNotification)
return nil
}
// Capacity for channel
func (s *Worker) Capacity() int {
return cap(s.queueNotification)
}
// Usage for count of channel usage
func (s *Worker) Usage() int {
return len(s.queueNotification)
}
// Queue send notification to queue
func (s *Worker) Queue(job queue.QueuedMessage) error {
select {
case s.queueNotification <- job:
return nil
default:
return errMaxCapacity
}
}
// WithQueueNum setup the capcity of queue
func WithQueueNum(num int) Option {
return func(w *Worker) {
w.queueNotification = make(chan queue.QueuedMessage, num)
}
}
// WithRunFunc setup the run func of queue
func WithRunFunc(fn func(queue.QueuedMessage) error) Option {
return func(w *Worker) {
w.runFunc = fn
}
}
// NewWorker for struc
func NewWorker(opts ...Option) *Worker {
w := &Worker{
queueNotification: make(chan queue.QueuedMessage, runtime.NumCPU()<<1),
runFunc: func(msg queue.QueuedMessage) error {
notify.SendNotification(msg)
return nil
},
}
// Loop through each option
for _, opt := range opts {
// Call the option giving the instantiated
opt(w)
}
return w
}

View File

@ -1,126 +0,0 @@
package simple
import (
"runtime"
"testing"
"time"
"github.com/appleboy/gorush/logx"
"github.com/appleboy/gorush/notify"
"github.com/appleboy/queue"
"github.com/stretchr/testify/assert"
)
type mockMessage struct {
msg string
}
func (m mockMessage) Bytes() []byte {
return []byte(m.msg)
}
func TestQueueUsage(t *testing.T) {
w := NewWorker()
assert.Equal(t, runtime.NumCPU()<<1, w.Capacity())
assert.Equal(t, 0, w.Usage())
w.Queue(&notify.PushNotification{})
assert.Equal(t, 1, w.Usage())
}
func TestMaxCapacity(t *testing.T) {
w := NewWorker(WithQueueNum(2))
assert.Equal(t, 2, w.Capacity())
assert.Equal(t, 0, w.Usage())
assert.NoError(t, w.Queue(&notify.PushNotification{}))
assert.Equal(t, 1, w.Usage())
assert.NoError(t, w.Queue(&notify.PushNotification{}))
assert.Equal(t, 2, w.Usage())
assert.Error(t, w.Queue(&notify.PushNotification{}))
assert.Equal(t, 2, w.Usage())
err := w.Queue(&notify.PushNotification{})
assert.Equal(t, errMaxCapacity, err)
}
func TestCustomFuncAndWait(t *testing.T) {
m := mockMessage{
msg: "foo",
}
w := NewWorker(
WithRunFunc(func(msg queue.QueuedMessage) error {
logx.LogAccess.Infof("get message: %s", msg.Bytes())
time.Sleep(500 * time.Millisecond)
return nil
}),
)
q, err := queue.NewQueue(
queue.WithWorker(w),
queue.WithWorkerCount(2),
)
assert.NoError(t, err)
q.Start()
time.Sleep(100 * time.Millisecond)
q.Queue(m)
q.Queue(m)
q.Queue(m)
q.Queue(m)
time.Sleep(600 * time.Millisecond)
q.Shutdown()
q.Wait()
// you will see the execute time > 1000ms
}
func TestShutDonwPanic(t *testing.T) {
w := NewWorker(
WithRunFunc(func(msg queue.QueuedMessage) error {
logx.LogAccess.Infof("get message: %s", msg.Bytes())
time.Sleep(100 * time.Millisecond)
return nil
}),
)
q, err := queue.NewQueue(
queue.WithWorker(w),
queue.WithWorkerCount(2),
)
assert.NoError(t, err)
q.Start()
q.Shutdown()
// check shutdown once
q.Shutdown()
q.Wait()
}
func TestWorkersNum(t *testing.T) {
m := mockMessage{
msg: "test",
}
w := NewWorker(
WithRunFunc(func(msg queue.QueuedMessage) error {
logx.LogAccess.Infof("get message: %s", msg.Bytes())
time.Sleep(100 * time.Millisecond)
return nil
}),
)
q, err := queue.NewQueue(
queue.WithWorker(w),
queue.WithWorkerCount(2),
queue.WithLogger(logx.LogAccess),
)
assert.NoError(t, err)
q.Start()
q.Start()
q.Start()
q.Start()
q.Queue(m)
q.Queue(m)
q.Queue(m)
q.Queue(m)
time.Sleep(50 * time.Millisecond)
assert.Equal(t, 8, q.Workers())
q.Shutdown()
q.Wait()
assert.Equal(t, 0, q.Workers())
}

View File

@ -14,11 +14,11 @@ import (
"github.com/appleboy/gorush/config"
"github.com/appleboy/gorush/core"
"github.com/appleboy/gorush/notify"
"github.com/appleboy/gorush/queue/simple"
"github.com/appleboy/gorush/status"
"github.com/appleboy/gofight/v2"
"github.com/appleboy/queue"
"github.com/appleboy/queue/simple"
"github.com/buger/jsonparser"
"github.com/gin-gonic/gin"
"github.com/stretchr/testify/assert"
@ -36,7 +36,12 @@ func TestMain(m *testing.M) {
log.Fatal(err)
}
w = simple.NewWorker()
w = simple.NewWorker(
simple.WithRunFunc(func(msg queue.QueuedMessage) error {
notify.SendNotification(msg)
return nil
}),
)
q, _ = queue.NewQueue(
queue.WithWorker(w),
queue.WithWorkerCount(4),