chore(queue): support custom run func (#603)

Signed-off-by: Bo-Yi Wu <appleboy.tw@gmail.com>
This commit is contained in:
Bo-Yi Wu 2021-07-18 09:03:57 +08:00 committed by GitHub
parent 380162a38a
commit ed25b0f42a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 25 additions and 13 deletions

View File

@ -320,7 +320,9 @@ func main() {
var w queue.Worker
switch core.Queue(cfg.Queue.Engine) {
case core.LocalQueue:
w = simple.NewWorker(simple.WithQueueNum(int(cfg.Core.QueueNum)))
w = simple.NewWorker(
simple.WithQueueNum(int(cfg.Core.QueueNum)),
)
case core.NSQ:
w = nsq.NewWorker()
default:

View File

@ -17,7 +17,8 @@ var errMaxCapacity = errors.New("max capacity reached")
// Worker for simple queue using channel
type Worker struct {
queueNotification chan queue.QueuedMessage
QueueNotification chan queue.QueuedMessage
runFunc func(*Worker) error
}
// BeforeRun run script before start worker
@ -32,33 +33,29 @@ func (s *Worker) AfterRun() error {
// Run start the worker
func (s *Worker) Run(_ chan struct{}) error {
for notification := range s.queueNotification {
gorush.SendNotification(notification)
}
return nil
return s.runFunc(s)
}
// Shutdown worker
func (s *Worker) Shutdown() error {
close(s.queueNotification)
close(s.QueueNotification)
return nil
}
// Capacity for channel
func (s *Worker) Capacity() int {
return cap(s.queueNotification)
return cap(s.QueueNotification)
}
// Usage for count of channel usage
func (s *Worker) Usage() int {
return len(s.queueNotification)
return len(s.QueueNotification)
}
// Queue send notification to queue
func (s *Worker) Queue(job queue.QueuedMessage) error {
select {
case s.queueNotification <- job:
case s.QueueNotification <- job:
return nil
default:
return errMaxCapacity
@ -68,14 +65,27 @@ func (s *Worker) Queue(job queue.QueuedMessage) error {
// WithQueueNum setup the capcity of queue
func WithQueueNum(num int) Option {
return func(w *Worker) {
w.queueNotification = make(chan queue.QueuedMessage, num)
w.QueueNotification = make(chan queue.QueuedMessage, num)
}
}
// WithRunFunc setup the run func of queue
func WithRunFunc(fn func(w *Worker) error) Option {
return func(w *Worker) {
w.runFunc = fn
}
}
// NewWorker for struc
func NewWorker(opts ...Option) *Worker {
w := &Worker{
queueNotification: make(chan queue.QueuedMessage, runtime.NumCPU()<<1),
QueueNotification: make(chan queue.QueuedMessage, runtime.NumCPU()<<1),
runFunc: func(w *Worker) error {
for notification := range w.QueueNotification {
gorush.SendNotification(notification)
}
return nil
},
}
// Loop through each option