chore(queue): Shutdown service once. (#610)

Signed-off-by: Bo-Yi Wu <appleboy.tw@gmail.com>
This commit is contained in:
Bo-Yi Wu 2021-07-24 10:17:42 +08:00 committed by GitHub
parent d9947ea44d
commit 6ebbbe5026
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 28 additions and 2 deletions

View File

@ -38,6 +38,8 @@ func TestShutdown(t *testing.T) {
q.Start() q.Start()
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
q.Shutdown() q.Shutdown()
// check shutdown once
q.Shutdown()
q.Wait() q.Wait()
} }

View File

@ -3,6 +3,7 @@ package queue
import ( import (
"errors" "errors"
"runtime" "runtime"
"sync"
) )
type ( type (
@ -13,6 +14,7 @@ type (
routineGroup *routineGroup routineGroup *routineGroup
quit chan struct{} quit chan struct{}
worker Worker worker Worker
stopOnce sync.Once
} }
) )
@ -82,8 +84,10 @@ func (q *Queue) Start() {
// Shutdown stops all queues. // Shutdown stops all queues.
func (q *Queue) Shutdown() { func (q *Queue) Shutdown() {
q.stopOnce.Do(func() {
q.worker.Shutdown() q.worker.Shutdown()
close(q.quit) close(q.quit)
})
} }
// Wait all process // Wait all process

View File

@ -76,3 +76,23 @@ func TestCustomFuncAndWait(t *testing.T) {
q.Wait() q.Wait()
// you will see the execute time > 1000ms // you will see the execute time > 1000ms
} }
func TestShutDonwPanic(t *testing.T) {
w := NewWorker(
WithRunFunc(func(msg queue.QueuedMessage) error {
logx.LogAccess.Infof("get message: %s", msg.Bytes())
time.Sleep(100 * time.Millisecond)
return nil
}),
)
q, err := queue.NewQueue(
queue.WithWorker(w),
queue.WithWorkerCount(2),
)
assert.NoError(t, err)
q.Start()
q.Shutdown()
// check shutdown once
q.Shutdown()
q.Wait()
}