diff --git a/main.go b/main.go index f714df7..15bb7ff 100644 --- a/main.go +++ b/main.go @@ -317,14 +317,14 @@ func main() { } w := simple.NewWorker(int(cfg.Core.QueueNum)) - q := queue.NewQueue(w) + q := queue.NewQueue(w, int(cfg.Core.WorkerNum)) q.Start() finished := make(chan struct{}) ctx := withContextFunc(context.Background(), func() { logx.LogAccess.Info("close the queue system, current queue usage: ", q.Usage()) // stop queue system - q.Stop() + q.Shutdown() // wait job completed q.Wait() close(finished) diff --git a/queue/queue.go b/queue/queue.go index d884d2a..0b54a4d 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -10,7 +10,6 @@ type ( // A Queue is a message queue. Queue struct { workerCount int - queueCount int routineGroup *routineGroup quit chan struct{} worker Worker @@ -18,15 +17,18 @@ type ( ) // NewQueue returns a Queue. -func NewQueue(w Worker) *Queue { +func NewQueue(w Worker, workerNum int) *Queue { q := &Queue{ workerCount: runtime.NumCPU(), - queueCount: runtime.NumCPU() << 1, routineGroup: newRoutineGroup(), quit: make(chan struct{}), worker: w, } + if workerNum > 0 { + q.workerCount = workerNum + } + return q } @@ -45,9 +47,9 @@ func (q *Queue) Start() { q.startWorker() } -// Stop stops q. -func (q *Queue) Stop() { - q.worker.Stop() +// Shutdown stops all queues. +func (q *Queue) Shutdown() { + q.worker.Shutdown() close(q.quit) } @@ -56,9 +58,9 @@ func (q *Queue) Wait() { q.routineGroup.Wait() } -// Enqueue queue all job -func (q *Queue) Enqueue(job interface{}) error { - return q.worker.Enqueue(job) +// Queue to queue all job +func (q *Queue) Queue(job interface{}) error { + return q.worker.Queue(job) } func (q *Queue) startWorker() { diff --git a/queue/simple/simple.go b/queue/simple/simple.go index fda27ec..c86b370 100644 --- a/queue/simple/simple.go +++ b/queue/simple/simple.go @@ -4,23 +4,29 @@ import ( "errors" "github.com/appleboy/gorush/gorush" + "github.com/appleboy/gorush/queue" ) +var _ queue.Worker = (*Worker)(nil) + // Worker for simple queue using channel type Worker struct { queueNotification chan gorush.PushNotification } // Run start the worker -func (s *Worker) Run(_ chan struct{}) { +func (s *Worker) Run(_ chan struct{}) error { for notification := range s.queueNotification { gorush.SendNotification(notification) } + + return nil } -// Stop worker -func (s *Worker) Stop() { +// Shutdown worker +func (s *Worker) Shutdown() error { close(s.queueNotification) + return nil } // Capacity for channel @@ -33,8 +39,8 @@ func (s *Worker) Usage() int { return len(s.queueNotification) } -// Enqueue send notification to queue -func (s *Worker) Enqueue(job interface{}) error { +// Queue send notification to queue +func (s *Worker) Queue(job interface{}) error { select { case s.queueNotification <- job.(gorush.PushNotification): return nil diff --git a/queue/worker.go b/queue/worker.go index 5224b6a..e32d3d9 100644 --- a/queue/worker.go +++ b/queue/worker.go @@ -2,9 +2,9 @@ package queue // Worker interface type Worker interface { - Run(chan struct{}) - Stop() - Enqueue(job interface{}) error + Run(chan struct{}) error + Shutdown() error + Queue(job interface{}) error Capacity() int Usage() int } diff --git a/router/server.go b/router/server.go index 7f2e21e..c28e804 100644 --- a/router/server.go +++ b/router/server.go @@ -278,7 +278,7 @@ func handleNotification(ctx context.Context, cfg config.ConfYaml, req gorush.Req notification.AddWaitCount() } - if err := q.Enqueue(*notification); err != nil { + if err := q.Queue(*notification); err != nil { markFailedNotification(cfg, notification, "max capacity reached") } diff --git a/router/server_test.go b/router/server_test.go index 64826dc..d4a0e18 100644 --- a/router/server_test.go +++ b/router/server_test.go @@ -47,10 +47,10 @@ func TestMain(m *testing.M) { } w = simple.NewWorker(int(cfg.Core.QueueNum)) - q = queue.NewQueue(w) + q = queue.NewQueue(w, 4) q.Start() defer func() { - q.Stop() + q.Shutdown() q.Wait() }()