diff --git a/queue/simple/simple.go b/queue/simple/simple.go index 61e5d32..e14d2f8 100644 --- a/queue/simple/simple.go +++ b/queue/simple/simple.go @@ -17,8 +17,8 @@ var errMaxCapacity = errors.New("max capacity reached") // Worker for simple queue using channel type Worker struct { - QueueNotification chan queue.QueuedMessage - runFunc func(*Worker) error + queueNotification chan queue.QueuedMessage + runFunc func(queue.QueuedMessage) error } // BeforeRun run script before start worker @@ -33,29 +33,32 @@ func (s *Worker) AfterRun() error { // Run start the worker func (s *Worker) Run(_ chan struct{}) error { - return s.runFunc(s) + for notification := range s.queueNotification { + s.runFunc(notification) + } + return nil } // Shutdown worker func (s *Worker) Shutdown() error { - close(s.QueueNotification) + close(s.queueNotification) return nil } // Capacity for channel func (s *Worker) Capacity() int { - return cap(s.QueueNotification) + return cap(s.queueNotification) } // Usage for count of channel usage func (s *Worker) Usage() int { - return len(s.QueueNotification) + return len(s.queueNotification) } // Queue send notification to queue func (s *Worker) Queue(job queue.QueuedMessage) error { select { - case s.QueueNotification <- job: + case s.queueNotification <- job: return nil default: return errMaxCapacity @@ -65,12 +68,12 @@ func (s *Worker) Queue(job queue.QueuedMessage) error { // WithQueueNum setup the capcity of queue func WithQueueNum(num int) Option { return func(w *Worker) { - w.QueueNotification = make(chan queue.QueuedMessage, num) + w.queueNotification = make(chan queue.QueuedMessage, num) } } // WithRunFunc setup the run func of queue -func WithRunFunc(fn func(w *Worker) error) Option { +func WithRunFunc(fn func(queue.QueuedMessage) error) Option { return func(w *Worker) { w.runFunc = fn } @@ -79,11 +82,9 @@ func WithRunFunc(fn func(w *Worker) error) Option { // NewWorker for struc func NewWorker(opts ...Option) *Worker { w := &Worker{ - QueueNotification: make(chan queue.QueuedMessage, runtime.NumCPU()<<1), - runFunc: func(w *Worker) error { - for notification := range w.QueueNotification { - gorush.SendNotification(notification) - } + queueNotification: make(chan queue.QueuedMessage, runtime.NumCPU()<<1), + runFunc: func(msg queue.QueuedMessage) error { + gorush.SendNotification(msg) return nil }, } diff --git a/queue/simple/simple_test.go b/queue/simple/simple_test.go index 335908e..32244e8 100644 --- a/queue/simple/simple_test.go +++ b/queue/simple/simple_test.go @@ -1,14 +1,39 @@ package simple import ( + "log" "runtime" "testing" + "time" "github.com/appleboy/gorush/gorush" + "github.com/appleboy/gorush/logx" + "github.com/appleboy/gorush/queue" "github.com/stretchr/testify/assert" ) +type mockMessage struct { + msg string +} + +func (m mockMessage) Bytes() []byte { + return []byte(m.msg) +} + +func TestMain(m *testing.M) { + if err := logx.InitLog( + "debug", + "stdout", + "debug", + "stdout", + ); err != nil { + log.Fatalf("Can't load log module, error: %v", err) + } + + m.Run() +} + func TestQueueUsage(t *testing.T) { w := NewWorker() assert.Equal(t, runtime.NumCPU()<<1, w.Capacity()) @@ -33,3 +58,27 @@ func TestMaxCapacity(t *testing.T) { err := w.Queue(&gorush.PushNotification{}) assert.Equal(t, errMaxCapacity, err) } + +func TestCustomFuncAndWait(t *testing.T) { + m := mockMessage{ + msg: "foo", + } + w := NewWorker( + WithRunFunc(func(msg queue.QueuedMessage) error { + logx.LogAccess.Infof("get message: %s", msg.Bytes()) + time.Sleep(500 * time.Millisecond) + return nil + }), + ) + q := queue.NewQueue(w, 2) + q.Start() + time.Sleep(100 * time.Millisecond) + q.Queue(m) + q.Queue(m) + q.Queue(m) + q.Queue(m) + time.Sleep(600 * time.Millisecond) + q.Shutdown() + q.Wait() + // you will see the execute time > 1000ms +}