diff --git a/go.mod b/go.mod index 444022d..d5b1e8c 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/appleboy/gin-status-api v1.1.0 github.com/appleboy/go-fcm v0.1.5 github.com/appleboy/gofight/v2 v2.1.2 - github.com/appleboy/queue v0.0.2 + github.com/appleboy/queue v0.0.3 github.com/asdine/storm/v3 v3.2.1 github.com/buger/jsonparser v1.1.1 github.com/dgraph-io/badger/v3 v3.2103.1 diff --git a/go.sum b/go.sum index 50555e8..e535aa2 100644 --- a/go.sum +++ b/go.sum @@ -40,8 +40,8 @@ github.com/appleboy/go-fcm v0.1.5 h1:fKbcZf/7vwGsvDkcop8a+kCHnK+tt4wXX0X7uEzwI6E github.com/appleboy/go-fcm v0.1.5/go.mod h1:MSxZ4LqGRsnywOjnlXJXMqbjZrG4vf+0oHitfC9HRH0= github.com/appleboy/gofight/v2 v2.1.2 h1:VOy3jow4vIK8BRQJoC/I9muxyYlJ2yb9ht2hZoS3rf4= github.com/appleboy/gofight/v2 v2.1.2/go.mod h1:frW+U1QZEdDgixycTj4CygQ48yLTUhplt43+Wczp3rw= -github.com/appleboy/queue v0.0.2 h1:yjdRfa2G8Q/4NYaBdqR0m6GPjgdlWMc+NvSACR01LoE= -github.com/appleboy/queue v0.0.2/go.mod h1:mmhZvP5Zl0avp+LA4XL29Aad0Df39ZxOTIiMs/OhM2Q= +github.com/appleboy/queue v0.0.3 h1:rntqVTm6ilh80VCVQjwA0vDMCl1cfveq6GS6X98fKwE= +github.com/appleboy/queue v0.0.3/go.mod h1:6Mn0z4hURZW/26huvRXG0SJ4o7pBdo6hOryRiegy/4Q= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= diff --git a/main.go b/main.go index 0e51a93..4963598 100644 --- a/main.go +++ b/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "encoding/json" "flag" "fmt" "log" @@ -18,13 +19,14 @@ import ( "github.com/appleboy/gorush/core" "github.com/appleboy/gorush/logx" "github.com/appleboy/gorush/notify" - "github.com/appleboy/gorush/queue/nsq" - "github.com/appleboy/gorush/queue/simple" "github.com/appleboy/gorush/router" "github.com/appleboy/gorush/rpc" "github.com/appleboy/gorush/status" "github.com/appleboy/queue" + "github.com/appleboy/queue/nsq" + "github.com/appleboy/queue/simple" + n "github.com/nsqio/go-nsq" "golang.org/x/sync/errgroup" ) @@ -322,9 +324,27 @@ func main() { case core.LocalQueue: w = simple.NewWorker( simple.WithQueueNum(int(cfg.Core.QueueNum)), + simple.WithRunFunc(func(msg queue.QueuedMessage) error { + notify.SendNotification(msg) + return nil + }), ) case core.NSQ: - w = nsq.NewWorker() + w = nsq.NewWorker( + nsq.WithRunFunc(func(msg *n.Message) error { + if len(msg.Body) == 0 { + // Returning nil will automatically send a FIN command to NSQ to mark the message as processed. + // In this case, a message with an empty body is simply ignored/discarded. + return nil + } + var notification *notify.PushNotification + if err := json.Unmarshal(msg.Body, ¬ification); err != nil { + return err + } + notify.SendNotification(notification) + return nil + }), + ) default: logx.LogError.Fatalf("we don't support queue engine: %s", cfg.Queue.Engine) } diff --git a/queue/nsq/README.md b/queue/nsq/README.md deleted file mode 100644 index 204c66f..0000000 --- a/queue/nsq/README.md +++ /dev/null @@ -1,29 +0,0 @@ -# NSQ - -A realtime distributed messaging platform - -## Setup - -start the NSQ lookupd - -```sh -nsqlookupd -``` - -start the NSQ server - -```sh -nsqd --lookupd-tcp-address=localhost:4160 -``` - -start the NSQ admin dashboard - -```sh -nsqadmin --lookupd-http-address localhost:4161 -``` - -## Testing - -```sh -go test -v ./... -``` diff --git a/queue/nsq/nsq.go b/queue/nsq/nsq.go deleted file mode 100644 index cb96068..0000000 --- a/queue/nsq/nsq.go +++ /dev/null @@ -1,176 +0,0 @@ -package nsq - -import ( - "encoding/json" - "runtime" - "sync" - "time" - - "github.com/appleboy/gorush/notify" - - "github.com/appleboy/queue" - "github.com/nsqio/go-nsq" -) - -var _ queue.Worker = (*Worker)(nil) - -// Option for queue system -type Option func(*Worker) - -// Worker for NSQ -type Worker struct { - q *nsq.Consumer - p *nsq.Producer - startOnce sync.Once - maxInFlight int - addr string - topic string - channel string - runFunc func(msg *nsq.Message) error -} - -// WithAddr setup the addr of NSQ -func WithAddr(addr string) Option { - return func(w *Worker) { - w.addr = addr - } -} - -// WithTopic setup the topic of NSQ -func WithTopic(topic string) Option { - return func(w *Worker) { - w.topic = topic - } -} - -// WithChannel setup the channel of NSQ -func WithChannel(channel string) Option { - return func(w *Worker) { - w.channel = channel - } -} - -// WithRunFunc setup the run func of queue -func WithRunFunc(fn func(msg *nsq.Message) error) Option { - return func(w *Worker) { - w.runFunc = fn - } -} - -// WithMaxInFlight Maximum number of messages to allow in flight (concurrency knob) -func WithMaxInFlight(num int) Option { - return func(w *Worker) { - w.maxInFlight = num - } -} - -// NewWorker for struc -func NewWorker(opts ...Option) *Worker { - w := &Worker{ - addr: "127.0.0.1:4150", - topic: "gorush", - channel: "ch", - maxInFlight: runtime.NumCPU(), - runFunc: func(msg *nsq.Message) error { - if len(msg.Body) == 0 { - // Returning nil will automatically send a FIN command to NSQ to mark the message as processed. - // In this case, a message with an empty body is simply ignored/discarded. - return nil - } - var notification *notify.PushNotification - if err := json.Unmarshal(msg.Body, ¬ification); err != nil { - return err - } - notify.SendNotification(notification) - return nil - }, - } - - // Loop through each option - for _, opt := range opts { - // Call the option giving the instantiated - opt(w) - } - - cfg := nsq.NewConfig() - cfg.MaxInFlight = w.maxInFlight - q, err := nsq.NewConsumer(w.topic, w.channel, cfg) - if err != nil { - panic(err) - } - w.q = q - - p, err := nsq.NewProducer(w.addr, cfg) - if err != nil { - panic(err) - } - w.p = p - - return w -} - -// BeforeRun run script before start worker -func (s *Worker) BeforeRun() error { - return nil -} - -// AfterRun run script after start worker -func (s *Worker) AfterRun() error { - s.startOnce.Do(func() { - time.Sleep(100 * time.Millisecond) - err := s.q.ConnectToNSQD(s.addr) - if err != nil { - panic("Could not connect nsq server: " + err.Error()) - } - }) - - return nil -} - -// Run start the worker -func (s *Worker) Run(quit chan struct{}) error { - wg := &sync.WaitGroup{} - s.q.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error { - wg.Add(1) - defer wg.Done() - // run custom func - return s.runFunc(msg) - })) - - // wait close signal - select { - case <-quit: - } - - // wait job completed - wg.Wait() - - return nil -} - -// Shutdown worker -func (s *Worker) Shutdown() error { - s.q.Stop() - s.p.Stop() - return nil -} - -// Capacity for channel -func (s *Worker) Capacity() int { - return 0 -} - -// Usage for count of channel usage -func (s *Worker) Usage() int { - return 0 -} - -// Queue send notification to queue -func (s *Worker) Queue(job queue.QueuedMessage) error { - err := s.p.Publish(s.topic, job.Bytes()) - if err != nil { - return err - } - - return nil -} diff --git a/queue/nsq/nsq_test.go b/queue/nsq/nsq_test.go deleted file mode 100644 index d2ed72d..0000000 --- a/queue/nsq/nsq_test.go +++ /dev/null @@ -1,75 +0,0 @@ -package nsq - -import ( - "testing" - "time" - - "github.com/appleboy/gorush/logx" - - "github.com/appleboy/queue" - "github.com/nsqio/go-nsq" - "github.com/stretchr/testify/assert" -) - -var host = "nsq" - -type mockMessage struct { - msg string -} - -func (m mockMessage) Bytes() []byte { - return []byte(m.msg) -} - -func TestMain(m *testing.M) { - m.Run() -} - -func TestShutdown(t *testing.T) { - w := NewWorker( - WithAddr(host+":4150"), - WithTopic("test"), - ) - q, err := queue.NewQueue( - queue.WithWorker(w), - queue.WithWorkerCount(2), - ) - assert.NoError(t, err) - q.Start() - time.Sleep(1 * time.Second) - q.Shutdown() - // check shutdown once - q.Shutdown() - q.Wait() -} - -func TestCustomFuncAndWait(t *testing.T) { - m := mockMessage{ - msg: "foo", - } - w := NewWorker( - WithAddr(host+":4150"), - WithTopic("test"), - WithMaxInFlight(2), - WithRunFunc(func(msg *nsq.Message) error { - logx.LogAccess.Infof("get message: %s", msg.Body) - time.Sleep(500 * time.Millisecond) - return nil - }), - ) - q, err := queue.NewQueue( - queue.WithWorker(w), - queue.WithWorkerCount(2), - ) - assert.NoError(t, err) - q.Start() - time.Sleep(100 * time.Millisecond) - q.Queue(m) - q.Queue(m) - q.Queue(m) - q.Queue(m) - time.Sleep(600 * time.Millisecond) - q.Shutdown() - q.Wait() - // you will see the execute time > 1000ms -} diff --git a/queue/simple/simple.go b/queue/simple/simple.go deleted file mode 100644 index f58e51d..0000000 --- a/queue/simple/simple.go +++ /dev/null @@ -1,100 +0,0 @@ -package simple - -import ( - "errors" - "runtime" - - "github.com/appleboy/gorush/notify" - - "github.com/appleboy/queue" -) - -var _ queue.Worker = (*Worker)(nil) - -// Option for queue system -type Option func(*Worker) - -var errMaxCapacity = errors.New("max capacity reached") - -// Worker for simple queue using channel -type Worker struct { - queueNotification chan queue.QueuedMessage - runFunc func(queue.QueuedMessage) error -} - -// BeforeRun run script before start worker -func (s *Worker) BeforeRun() error { - return nil -} - -// AfterRun run script after start worker -func (s *Worker) AfterRun() error { - return nil -} - -// Run start the worker -func (s *Worker) Run(_ chan struct{}) error { - for notification := range s.queueNotification { - s.runFunc(notification) - } - return nil -} - -// Shutdown worker -func (s *Worker) Shutdown() error { - close(s.queueNotification) - return nil -} - -// Capacity for channel -func (s *Worker) Capacity() int { - return cap(s.queueNotification) -} - -// Usage for count of channel usage -func (s *Worker) Usage() int { - return len(s.queueNotification) -} - -// Queue send notification to queue -func (s *Worker) Queue(job queue.QueuedMessage) error { - select { - case s.queueNotification <- job: - return nil - default: - return errMaxCapacity - } -} - -// WithQueueNum setup the capcity of queue -func WithQueueNum(num int) Option { - return func(w *Worker) { - w.queueNotification = make(chan queue.QueuedMessage, num) - } -} - -// WithRunFunc setup the run func of queue -func WithRunFunc(fn func(queue.QueuedMessage) error) Option { - return func(w *Worker) { - w.runFunc = fn - } -} - -// NewWorker for struc -func NewWorker(opts ...Option) *Worker { - w := &Worker{ - queueNotification: make(chan queue.QueuedMessage, runtime.NumCPU()<<1), - runFunc: func(msg queue.QueuedMessage) error { - notify.SendNotification(msg) - return nil - }, - } - - // Loop through each option - for _, opt := range opts { - // Call the option giving the instantiated - opt(w) - } - - return w -} diff --git a/queue/simple/simple_test.go b/queue/simple/simple_test.go deleted file mode 100644 index 3c7fff1..0000000 --- a/queue/simple/simple_test.go +++ /dev/null @@ -1,126 +0,0 @@ -package simple - -import ( - "runtime" - "testing" - "time" - - "github.com/appleboy/gorush/logx" - "github.com/appleboy/gorush/notify" - - "github.com/appleboy/queue" - "github.com/stretchr/testify/assert" -) - -type mockMessage struct { - msg string -} - -func (m mockMessage) Bytes() []byte { - return []byte(m.msg) -} - -func TestQueueUsage(t *testing.T) { - w := NewWorker() - assert.Equal(t, runtime.NumCPU()<<1, w.Capacity()) - assert.Equal(t, 0, w.Usage()) - - w.Queue(¬ify.PushNotification{}) - assert.Equal(t, 1, w.Usage()) -} - -func TestMaxCapacity(t *testing.T) { - w := NewWorker(WithQueueNum(2)) - assert.Equal(t, 2, w.Capacity()) - assert.Equal(t, 0, w.Usage()) - - assert.NoError(t, w.Queue(¬ify.PushNotification{})) - assert.Equal(t, 1, w.Usage()) - assert.NoError(t, w.Queue(¬ify.PushNotification{})) - assert.Equal(t, 2, w.Usage()) - assert.Error(t, w.Queue(¬ify.PushNotification{})) - assert.Equal(t, 2, w.Usage()) - - err := w.Queue(¬ify.PushNotification{}) - assert.Equal(t, errMaxCapacity, err) -} - -func TestCustomFuncAndWait(t *testing.T) { - m := mockMessage{ - msg: "foo", - } - w := NewWorker( - WithRunFunc(func(msg queue.QueuedMessage) error { - logx.LogAccess.Infof("get message: %s", msg.Bytes()) - time.Sleep(500 * time.Millisecond) - return nil - }), - ) - q, err := queue.NewQueue( - queue.WithWorker(w), - queue.WithWorkerCount(2), - ) - assert.NoError(t, err) - q.Start() - time.Sleep(100 * time.Millisecond) - q.Queue(m) - q.Queue(m) - q.Queue(m) - q.Queue(m) - time.Sleep(600 * time.Millisecond) - q.Shutdown() - q.Wait() - // you will see the execute time > 1000ms -} - -func TestShutDonwPanic(t *testing.T) { - w := NewWorker( - WithRunFunc(func(msg queue.QueuedMessage) error { - logx.LogAccess.Infof("get message: %s", msg.Bytes()) - time.Sleep(100 * time.Millisecond) - return nil - }), - ) - q, err := queue.NewQueue( - queue.WithWorker(w), - queue.WithWorkerCount(2), - ) - assert.NoError(t, err) - q.Start() - q.Shutdown() - // check shutdown once - q.Shutdown() - q.Wait() -} - -func TestWorkersNum(t *testing.T) { - m := mockMessage{ - msg: "test", - } - w := NewWorker( - WithRunFunc(func(msg queue.QueuedMessage) error { - logx.LogAccess.Infof("get message: %s", msg.Bytes()) - time.Sleep(100 * time.Millisecond) - return nil - }), - ) - q, err := queue.NewQueue( - queue.WithWorker(w), - queue.WithWorkerCount(2), - queue.WithLogger(logx.LogAccess), - ) - assert.NoError(t, err) - q.Start() - q.Start() - q.Start() - q.Start() - q.Queue(m) - q.Queue(m) - q.Queue(m) - q.Queue(m) - time.Sleep(50 * time.Millisecond) - assert.Equal(t, 8, q.Workers()) - q.Shutdown() - q.Wait() - assert.Equal(t, 0, q.Workers()) -} diff --git a/router/server_test.go b/router/server_test.go index 552d242..baf76f5 100644 --- a/router/server_test.go +++ b/router/server_test.go @@ -14,11 +14,11 @@ import ( "github.com/appleboy/gorush/config" "github.com/appleboy/gorush/core" "github.com/appleboy/gorush/notify" - "github.com/appleboy/gorush/queue/simple" "github.com/appleboy/gorush/status" "github.com/appleboy/gofight/v2" "github.com/appleboy/queue" + "github.com/appleboy/queue/simple" "github.com/buger/jsonparser" "github.com/gin-gonic/gin" "github.com/stretchr/testify/assert" @@ -36,7 +36,12 @@ func TestMain(m *testing.M) { log.Fatal(err) } - w = simple.NewWorker() + w = simple.NewWorker( + simple.WithRunFunc(func(msg queue.QueuedMessage) error { + notify.SendNotification(msg) + return nil + }), + ) q, _ = queue.NewQueue( queue.WithWorker(w), queue.WithWorkerCount(4),