diff --git a/main.go b/main.go index 6bd06fc..6a6a9c3 100644 --- a/main.go +++ b/main.go @@ -320,7 +320,9 @@ func main() { var w queue.Worker switch core.Queue(cfg.Queue.Engine) { case core.LocalQueue: - w = simple.NewWorker(simple.WithQueueNum(int(cfg.Core.QueueNum))) + w = simple.NewWorker( + simple.WithQueueNum(int(cfg.Core.QueueNum)), + ) case core.NSQ: w = nsq.NewWorker() default: diff --git a/queue/simple/simple.go b/queue/simple/simple.go index 2c27499..61e5d32 100644 --- a/queue/simple/simple.go +++ b/queue/simple/simple.go @@ -17,7 +17,8 @@ var errMaxCapacity = errors.New("max capacity reached") // Worker for simple queue using channel type Worker struct { - queueNotification chan queue.QueuedMessage + QueueNotification chan queue.QueuedMessage + runFunc func(*Worker) error } // BeforeRun run script before start worker @@ -32,33 +33,29 @@ func (s *Worker) AfterRun() error { // Run start the worker func (s *Worker) Run(_ chan struct{}) error { - for notification := range s.queueNotification { - gorush.SendNotification(notification) - } - - return nil + return s.runFunc(s) } // Shutdown worker func (s *Worker) Shutdown() error { - close(s.queueNotification) + close(s.QueueNotification) return nil } // Capacity for channel func (s *Worker) Capacity() int { - return cap(s.queueNotification) + return cap(s.QueueNotification) } // Usage for count of channel usage func (s *Worker) Usage() int { - return len(s.queueNotification) + return len(s.QueueNotification) } // Queue send notification to queue func (s *Worker) Queue(job queue.QueuedMessage) error { select { - case s.queueNotification <- job: + case s.QueueNotification <- job: return nil default: return errMaxCapacity @@ -68,14 +65,27 @@ func (s *Worker) Queue(job queue.QueuedMessage) error { // WithQueueNum setup the capcity of queue func WithQueueNum(num int) Option { return func(w *Worker) { - w.queueNotification = make(chan queue.QueuedMessage, num) + w.QueueNotification = make(chan queue.QueuedMessage, num) + } +} + +// WithRunFunc setup the run func of queue +func WithRunFunc(fn func(w *Worker) error) Option { + return func(w *Worker) { + w.runFunc = fn } } // NewWorker for struc func NewWorker(opts ...Option) *Worker { w := &Worker{ - queueNotification: make(chan queue.QueuedMessage, runtime.NumCPU()<<1), + QueueNotification: make(chan queue.QueuedMessage, runtime.NumCPU()<<1), + runFunc: func(w *Worker) error { + for notification := range w.QueueNotification { + gorush.SendNotification(notification) + } + return nil + }, } // Loop through each option