chore(queue): to handle panic cases from inside the worker (#594)

This commit is contained in:
Bo-Yi Wu 2021-07-17 08:20:45 +08:00 committed by GitHub
parent 4349a17017
commit 95866bf205
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 55 additions and 8 deletions

View File

@ -63,14 +63,25 @@ func (q *Queue) Queue(job interface{}) error {
return q.worker.Queue(job) return q.worker.Queue(job)
} }
func (q *Queue) startWorker() { func (q *Queue) work(num int) {
for i := 0; i < q.workerCount; i++ {
go func(num int) {
q.routineGroup.Run(func() { q.routineGroup.Run(func() {
// to handle panic cases from inside the worker
// in such case, we start a new goroutine
defer func() {
if err := recover(); err != nil {
logx.LogError.Error(err)
go q.work(num)
}
}()
logx.LogAccess.Info("started the worker num ", num) logx.LogAccess.Info("started the worker num ", num)
q.worker.Run(q.quit) q.worker.Run(q.quit)
logx.LogAccess.Info("closed the worker num ", num) logx.LogAccess.Info("closed the worker num ", num)
}) })
}(i) }
func (q *Queue) startWorker() {
for i := 0; i < q.workerCount; i++ {
go q.work(i)
} }
} }

View File

@ -13,6 +13,8 @@ var _ queue.Worker = (*Worker)(nil)
// Option for queue system // Option for queue system
type Option func(*Worker) type Option func(*Worker)
var errMaxCapacity = errors.New("max capacity reached")
// Worker for simple queue using channel // Worker for simple queue using channel
type Worker struct { type Worker struct {
queueNotification chan gorush.PushNotification queueNotification chan gorush.PushNotification
@ -49,7 +51,7 @@ func (s *Worker) Queue(job interface{}) error {
case s.queueNotification <- job.(gorush.PushNotification): case s.queueNotification <- job.(gorush.PushNotification):
return nil return nil
default: default:
return errors.New("max capacity reached") return errMaxCapacity
} }
} }

View File

@ -0,0 +1,34 @@
package simple
import (
"runtime"
"testing"
"github.com/appleboy/gorush/gorush"
"github.com/stretchr/testify/assert"
)
func TestQueueUsage(t *testing.T) {
w := NewWorker()
assert.Equal(t, runtime.NumCPU()<<1, w.Capacity())
assert.Equal(t, 0, w.Usage())
w.Queue(gorush.PushNotification{})
assert.Equal(t, 1, w.Usage())
}
func TestMaxCapacity(t *testing.T) {
w := NewWorker(WithQueueNum(2))
assert.Equal(t, 2, w.Capacity())
assert.Equal(t, 0, w.Usage())
assert.NoError(t, w.Queue(gorush.PushNotification{}))
assert.Equal(t, 1, w.Usage())
assert.NoError(t, w.Queue(gorush.PushNotification{}))
assert.Equal(t, 2, w.Usage())
assert.Error(t, w.Queue(gorush.PushNotification{}))
assert.Equal(t, 2, w.Usage())
err := w.Queue(gorush.PushNotification{})
assert.Equal(t, errMaxCapacity, err)
}