diff --git a/queue/queue.go b/queue/queue.go index 909571b..7c73639 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -4,17 +4,19 @@ import ( "errors" "runtime" "sync" + "sync/atomic" ) type ( // A Queue is a message queue. Queue struct { - logger Logger - workerCount int - routineGroup *routineGroup - quit chan struct{} - worker Worker - stopOnce sync.Once + logger Logger + workerCount int + routineGroup *routineGroup + quit chan struct{} + worker Worker + stopOnce sync.Once + runningWorkers int32 } ) @@ -90,6 +92,11 @@ func (q *Queue) Shutdown() { }) } +// Workers returns the numbers of workers has been created. +func (q *Queue) Workers() int { + return int(atomic.LoadInt32(&q.runningWorkers)) +} + // Wait all process func (q *Queue) Wait() { q.routineGroup.Wait() @@ -100,7 +107,8 @@ func (q *Queue) Queue(job QueuedMessage) error { return q.worker.Queue(job) } -func (q *Queue) work(num int) { +func (q *Queue) work() { + num := atomic.AddInt32(&q.runningWorkers, 1) if err := q.worker.BeforeRun(); err != nil { q.logger.Fatal(err) } @@ -108,15 +116,15 @@ func (q *Queue) work(num int) { // to handle panic cases from inside the worker // in such case, we start a new goroutine defer func() { + atomic.AddInt32(&q.runningWorkers, -1) if err := recover(); err != nil { q.logger.Error(err) - go q.work(num) + go q.work() } }() - - q.logger.Info("started the worker num ", num) + q.logger.Infof("start the worker num: %d", num) q.worker.Run(q.quit) - q.logger.Info("closed the worker num ", num) + q.logger.Infof("stop the worker num: %d", num) }) if err := q.worker.AfterRun(); err != nil { q.logger.Fatal(err) @@ -125,6 +133,6 @@ func (q *Queue) work(num int) { func (q *Queue) startWorker() { for i := 0; i < q.workerCount; i++ { - go q.work(i) + go q.work() } } diff --git a/queue/simple/simple_test.go b/queue/simple/simple_test.go index 5adfd1a..a03b66c 100644 --- a/queue/simple/simple_test.go +++ b/queue/simple/simple_test.go @@ -20,10 +20,6 @@ func (m mockMessage) Bytes() []byte { return []byte(m.msg) } -func TestMain(m *testing.M) { - m.Run() -} - func TestQueueUsage(t *testing.T) { w := NewWorker() assert.Equal(t, runtime.NumCPU()<<1, w.Capacity()) @@ -96,3 +92,27 @@ func TestShutDonwPanic(t *testing.T) { q.Shutdown() q.Wait() } + +func TestWorkersNum(t *testing.T) { + w := NewWorker( + WithRunFunc(func(msg queue.QueuedMessage) error { + logx.LogAccess.Infof("get message: %s", msg.Bytes()) + time.Sleep(100 * time.Millisecond) + return nil + }), + ) + q, err := queue.NewQueue( + queue.WithWorker(w), + queue.WithWorkerCount(2), + ) + assert.NoError(t, err) + q.Start() + q.Start() + q.Start() + q.Start() + time.Sleep(50 * time.Millisecond) + assert.Equal(t, 8, q.Workers()) + q.Shutdown() + q.Wait() + assert.Equal(t, 0, q.Workers()) +}