diff --git a/.drone.yml b/.drone.yml index 57b9f89..f11451c 100644 --- a/.drone.yml +++ b/.drone.yml @@ -80,6 +80,11 @@ services: - name: redis image: redis +- name: nsq + image: nsqio/nsq + commands: + - /nsqd + volumes: - name: gopath temp: {} diff --git a/.gitignore b/.gitignore index 683a0e6..1fd9c9a 100644 --- a/.gitignore +++ b/.gitignore @@ -36,3 +36,4 @@ custom release coverage.txt node_modules +config.yml diff --git a/pipeline.libsonnet b/pipeline.libsonnet index 2fc5997..03278ab 100644 --- a/pipeline.libsonnet +++ b/pipeline.libsonnet @@ -118,6 +118,13 @@ name: 'redis', image: 'redis', }, + { + name: 'nsq', + image: 'nsqio/nsq', + commands: [ + "/nsqd", + ], + }, ], }, diff --git a/queue/nsq/nsq.go b/queue/nsq/nsq.go index 5c44ddf..562ae5a 100644 --- a/queue/nsq/nsq.go +++ b/queue/nsq/nsq.go @@ -98,7 +98,10 @@ func (s *Worker) AfterRun() error { // Run start the worker func (s *Worker) Run(quit chan struct{}) error { + wg := &sync.WaitGroup{} s.q.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error { + wg.Add(1) + defer wg.Done() var notification gorush.PushNotification if err := json.Unmarshal(msg.Body, ¬ification); err != nil { return err @@ -110,6 +113,8 @@ func (s *Worker) Run(quit chan struct{}) error { select { case <-quit: } + + wg.Wait() return nil } diff --git a/queue/nsq/nsq_test.go b/queue/nsq/nsq_test.go new file mode 100644 index 0000000..f4a0984 --- /dev/null +++ b/queue/nsq/nsq_test.go @@ -0,0 +1,34 @@ +package nsq + +import ( + "log" + "testing" + "time" + + "github.com/appleboy/gorush/logx" + "github.com/appleboy/gorush/queue" +) + +func TestMain(m *testing.M) { + if err := logx.InitLog( + "debug", + "stdout", + "debug", + "stdout", + ); err != nil { + log.Fatalf("Can't load log module, error: %v", err) + } + + m.Run() +} + +func TestShutdown(t *testing.T) { + w := NewWorker( + WithAddr("nsq:4150"), + WithTopic("test"), + ) + q := queue.NewQueue(w, 2) + q.Start() + time.Sleep(1 * time.Second) + q.Shutdown() +} diff --git a/queue/queue.go b/queue/queue.go index 4a7c432..e9c0c5f 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -64,7 +64,9 @@ func (q *Queue) Queue(job interface{}) error { } func (q *Queue) work(num int) { - q.worker.BeforeRun() + if err := q.worker.BeforeRun(); err != nil { + logx.LogError.Fatal(err) + } q.routineGroup.Run(func() { // to handle panic cases from inside the worker // in such case, we start a new goroutine @@ -79,7 +81,9 @@ func (q *Queue) work(num int) { q.worker.Run(q.quit) logx.LogAccess.Info("closed the worker num ", num) }) - q.worker.AfterRun() + if err := q.worker.AfterRun(); err != nil { + logx.LogError.Fatal(err) + } } func (q *Queue) startWorker() {