diff --git a/queue/queue.go b/queue/queue.go index 0b54a4d..38600cd 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -63,14 +63,25 @@ func (q *Queue) Queue(job interface{}) error { return q.worker.Queue(job) } +func (q *Queue) work(num int) { + q.routineGroup.Run(func() { + // to handle panic cases from inside the worker + // in such case, we start a new goroutine + defer func() { + if err := recover(); err != nil { + logx.LogError.Error(err) + go q.work(num) + } + }() + + logx.LogAccess.Info("started the worker num ", num) + q.worker.Run(q.quit) + logx.LogAccess.Info("closed the worker num ", num) + }) +} + func (q *Queue) startWorker() { for i := 0; i < q.workerCount; i++ { - go func(num int) { - q.routineGroup.Run(func() { - logx.LogAccess.Info("started the worker num ", num) - q.worker.Run(q.quit) - logx.LogAccess.Info("closed the worker num ", num) - }) - }(i) + go q.work(i) } } diff --git a/queue/simple/simple.go b/queue/simple/simple.go index 6603ea5..ea8d611 100644 --- a/queue/simple/simple.go +++ b/queue/simple/simple.go @@ -13,6 +13,8 @@ var _ queue.Worker = (*Worker)(nil) // Option for queue system type Option func(*Worker) +var errMaxCapacity = errors.New("max capacity reached") + // Worker for simple queue using channel type Worker struct { queueNotification chan gorush.PushNotification @@ -49,7 +51,7 @@ func (s *Worker) Queue(job interface{}) error { case s.queueNotification <- job.(gorush.PushNotification): return nil default: - return errors.New("max capacity reached") + return errMaxCapacity } } diff --git a/queue/simple/simple_test.go b/queue/simple/simple_test.go new file mode 100644 index 0000000..c739ae8 --- /dev/null +++ b/queue/simple/simple_test.go @@ -0,0 +1,34 @@ +package simple + +import ( + "runtime" + "testing" + + "github.com/appleboy/gorush/gorush" + "github.com/stretchr/testify/assert" +) + +func TestQueueUsage(t *testing.T) { + w := NewWorker() + assert.Equal(t, runtime.NumCPU()<<1, w.Capacity()) + assert.Equal(t, 0, w.Usage()) + + w.Queue(gorush.PushNotification{}) + assert.Equal(t, 1, w.Usage()) +} + +func TestMaxCapacity(t *testing.T) { + w := NewWorker(WithQueueNum(2)) + assert.Equal(t, 2, w.Capacity()) + assert.Equal(t, 0, w.Usage()) + + assert.NoError(t, w.Queue(gorush.PushNotification{})) + assert.Equal(t, 1, w.Usage()) + assert.NoError(t, w.Queue(gorush.PushNotification{})) + assert.Equal(t, 2, w.Usage()) + assert.Error(t, w.Queue(gorush.PushNotification{})) + assert.Equal(t, 2, w.Usage()) + + err := w.Queue(gorush.PushNotification{}) + assert.Equal(t, errMaxCapacity, err) +}