diff --git a/queue/nsq/nsq.go b/queue/nsq/nsq.go index 3cf446d..393838b 100644 --- a/queue/nsq/nsq.go +++ b/queue/nsq/nsq.go @@ -2,7 +2,7 @@ package nsq import ( "encoding/json" - "errors" + "runtime" "sync" "time" @@ -19,12 +19,14 @@ type Option func(*Worker) // Worker for NSQ type Worker struct { - q *nsq.Consumer - p *nsq.Producer - once sync.Once - addr string - topic string - channel string + q *nsq.Consumer + p *nsq.Producer + once sync.Once + maxInFlight int + addr string + topic string + channel string + runFunc func(msg *nsq.Message) error } // WithAddr setup the addr of NSQ @@ -48,12 +50,40 @@ func WithChannel(channel string) Option { } } +// WithRunFunc setup the run func of queue +func WithRunFunc(fn func(msg *nsq.Message) error) Option { + return func(w *Worker) { + w.runFunc = fn + } +} + +// WithMaxInFlight Maximum number of messages to allow in flight (concurrency knob) +func WithMaxInFlight(num int) Option { + return func(w *Worker) { + w.maxInFlight = num + } +} + // NewWorker for struc func NewWorker(opts ...Option) *Worker { w := &Worker{ - addr: "127.0.0.1:4150", - topic: "gorush", - channel: "ch", + addr: "127.0.0.1:4150", + topic: "gorush", + channel: "ch", + maxInFlight: runtime.NumCPU(), + runFunc: func(msg *nsq.Message) error { + if len(msg.Body) == 0 { + // Returning nil will automatically send a FIN command to NSQ to mark the message as processed. + // In this case, a message with an empty body is simply ignored/discarded. + return nil + } + var notification *gorush.PushNotification + if err := json.Unmarshal(msg.Body, ¬ification); err != nil { + return err + } + gorush.SendNotification(notification) + return nil + }, } // Loop through each option @@ -63,6 +93,7 @@ func NewWorker(opts ...Option) *Worker { } cfg := nsq.NewConfig() + cfg.MaxInFlight = w.maxInFlight q, err := nsq.NewConsumer(w.topic, w.channel, cfg) if err != nil { panic(err) @@ -102,19 +133,18 @@ func (s *Worker) Run(quit chan struct{}) error { s.q.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error { wg.Add(1) defer wg.Done() - var notification *gorush.PushNotification - if err := json.Unmarshal(msg.Body, ¬ification); err != nil { - return err - } - gorush.SendNotification(notification) - return nil + // run custom func + return s.runFunc(msg) })) + // wait close signal select { case <-quit: } + // wait job completed wg.Wait() + return nil } @@ -137,11 +167,7 @@ func (s *Worker) Usage() int { // Queue send notification to queue func (s *Worker) Queue(job queue.QueuedMessage) error { - v, ok := job.(*gorush.PushNotification) - if !ok { - return errors.New("wrong type of job") - } - err := s.p.Publish(s.topic, v.Bytes()) + err := s.p.Publish(s.topic, job.Bytes()) if err != nil { return err } diff --git a/queue/nsq/nsq_test.go b/queue/nsq/nsq_test.go index f4a0984..eb0e348 100644 --- a/queue/nsq/nsq_test.go +++ b/queue/nsq/nsq_test.go @@ -7,8 +7,19 @@ import ( "github.com/appleboy/gorush/logx" "github.com/appleboy/gorush/queue" + "github.com/nsqio/go-nsq" ) +var host = "nsq" + +type mockMessage struct { + msg string +} + +func (m mockMessage) Bytes() []byte { + return []byte(m.msg) +} + func TestMain(m *testing.M) { if err := logx.InitLog( "debug", @@ -24,11 +35,39 @@ func TestMain(m *testing.M) { func TestShutdown(t *testing.T) { w := NewWorker( - WithAddr("nsq:4150"), + WithAddr(host+":4150"), WithTopic("test"), ) q := queue.NewQueue(w, 2) q.Start() time.Sleep(1 * time.Second) q.Shutdown() + q.Wait() +} + +func TestCustomFuncAndWait(t *testing.T) { + m := mockMessage{ + msg: "foo", + } + w := NewWorker( + WithAddr(host+":4150"), + WithTopic("test"), + WithMaxInFlight(2), + WithRunFunc(func(msg *nsq.Message) error { + logx.LogAccess.Infof("get message: %s", msg.Body) + time.Sleep(500 * time.Millisecond) + return nil + }), + ) + q := queue.NewQueue(w, 2) + q.Start() + time.Sleep(100 * time.Millisecond) + q.Queue(m) + q.Queue(m) + q.Queue(m) + q.Queue(m) + time.Sleep(600 * time.Millisecond) + q.Shutdown() + q.Wait() + // you will see the execute time > 1000ms }