fix(NSQ): close service after waiting all job completed. (#601)

This commit is contained in:
Bo-Yi Wu 2021-07-18 00:19:17 +08:00 committed by GitHub
parent 87be86be08
commit d6b4a0ae39
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 58 additions and 2 deletions

View File

@ -80,6 +80,11 @@ services:
- name: redis - name: redis
image: redis image: redis
- name: nsq
image: nsqio/nsq
commands:
- /nsqd
volumes: volumes:
- name: gopath - name: gopath
temp: {} temp: {}

1
.gitignore vendored
View File

@ -36,3 +36,4 @@ custom
release release
coverage.txt coverage.txt
node_modules node_modules
config.yml

View File

@ -118,6 +118,13 @@
name: 'redis', name: 'redis',
image: 'redis', image: 'redis',
}, },
{
name: 'nsq',
image: 'nsqio/nsq',
commands: [
"/nsqd",
],
},
], ],
}, },

View File

@ -98,7 +98,10 @@ func (s *Worker) AfterRun() error {
// Run start the worker // Run start the worker
func (s *Worker) Run(quit chan struct{}) error { func (s *Worker) Run(quit chan struct{}) error {
wg := &sync.WaitGroup{}
s.q.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error { s.q.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error {
wg.Add(1)
defer wg.Done()
var notification gorush.PushNotification var notification gorush.PushNotification
if err := json.Unmarshal(msg.Body, &notification); err != nil { if err := json.Unmarshal(msg.Body, &notification); err != nil {
return err return err
@ -110,6 +113,8 @@ func (s *Worker) Run(quit chan struct{}) error {
select { select {
case <-quit: case <-quit:
} }
wg.Wait()
return nil return nil
} }

34
queue/nsq/nsq_test.go Normal file
View File

@ -0,0 +1,34 @@
package nsq
import (
"log"
"testing"
"time"
"github.com/appleboy/gorush/logx"
"github.com/appleboy/gorush/queue"
)
func TestMain(m *testing.M) {
if err := logx.InitLog(
"debug",
"stdout",
"debug",
"stdout",
); err != nil {
log.Fatalf("Can't load log module, error: %v", err)
}
m.Run()
}
func TestShutdown(t *testing.T) {
w := NewWorker(
WithAddr("nsq:4150"),
WithTopic("test"),
)
q := queue.NewQueue(w, 2)
q.Start()
time.Sleep(1 * time.Second)
q.Shutdown()
}

View File

@ -64,7 +64,9 @@ func (q *Queue) Queue(job interface{}) error {
} }
func (q *Queue) work(num int) { func (q *Queue) work(num int) {
q.worker.BeforeRun() if err := q.worker.BeforeRun(); err != nil {
logx.LogError.Fatal(err)
}
q.routineGroup.Run(func() { q.routineGroup.Run(func() {
// to handle panic cases from inside the worker // to handle panic cases from inside the worker
// in such case, we start a new goroutine // in such case, we start a new goroutine
@ -79,7 +81,9 @@ func (q *Queue) work(num int) {
q.worker.Run(q.quit) q.worker.Run(q.quit)
logx.LogAccess.Info("closed the worker num ", num) logx.LogAccess.Info("closed the worker num ", num)
}) })
q.worker.AfterRun() if err := q.worker.AfterRun(); err != nil {
logx.LogError.Fatal(err)
}
} }
func (q *Queue) startWorker() { func (q *Queue) startWorker() {