From 6ebbbe5026e5e9808151bc0da95297bc77133ec7 Mon Sep 17 00:00:00 2001 From: Bo-Yi Wu Date: Sat, 24 Jul 2021 10:17:42 +0800 Subject: [PATCH] chore(queue): Shutdown service once. (#610) Signed-off-by: Bo-Yi Wu --- queue/nsq/nsq_test.go | 2 ++ queue/queue.go | 8 ++++++-- queue/simple/simple_test.go | 20 ++++++++++++++++++++ 3 files changed, 28 insertions(+), 2 deletions(-) diff --git a/queue/nsq/nsq_test.go b/queue/nsq/nsq_test.go index a471d02..0f11c57 100644 --- a/queue/nsq/nsq_test.go +++ b/queue/nsq/nsq_test.go @@ -38,6 +38,8 @@ func TestShutdown(t *testing.T) { q.Start() time.Sleep(1 * time.Second) q.Shutdown() + // check shutdown once + q.Shutdown() q.Wait() } diff --git a/queue/queue.go b/queue/queue.go index 61004cf..909571b 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -3,6 +3,7 @@ package queue import ( "errors" "runtime" + "sync" ) type ( @@ -13,6 +14,7 @@ type ( routineGroup *routineGroup quit chan struct{} worker Worker + stopOnce sync.Once } ) @@ -82,8 +84,10 @@ func (q *Queue) Start() { // Shutdown stops all queues. func (q *Queue) Shutdown() { - q.worker.Shutdown() - close(q.quit) + q.stopOnce.Do(func() { + q.worker.Shutdown() + close(q.quit) + }) } // Wait all process diff --git a/queue/simple/simple_test.go b/queue/simple/simple_test.go index 2d5f3c5..5adfd1a 100644 --- a/queue/simple/simple_test.go +++ b/queue/simple/simple_test.go @@ -76,3 +76,23 @@ func TestCustomFuncAndWait(t *testing.T) { q.Wait() // you will see the execute time > 1000ms } + +func TestShutDonwPanic(t *testing.T) { + w := NewWorker( + WithRunFunc(func(msg queue.QueuedMessage) error { + logx.LogAccess.Infof("get message: %s", msg.Bytes()) + time.Sleep(100 * time.Millisecond) + return nil + }), + ) + q, err := queue.NewQueue( + queue.WithWorker(w), + queue.WithWorkerCount(2), + ) + assert.NoError(t, err) + q.Start() + q.Shutdown() + // check shutdown once + q.Shutdown() + q.Wait() +}