chore: support single queue and multiple worker (#589)

This commit is contained in:
Bo-Yi Wu
2021-07-16 12:10:34 +08:00
committed by GitHub
parent 0658f18430
commit ab8b1991ab
34 changed files with 1020 additions and 881 deletions

89
queue/queue.go Normal file
View File

@@ -0,0 +1,89 @@
package queue
import (
"runtime"
"github.com/appleboy/gorush/config"
"github.com/appleboy/gorush/logx"
"github.com/appleboy/gorush/queue/simple"
)
type (
// A Queue is a message queue.
Queue struct {
workerCount int
queueCount int
routineGroup *routineGroup
quit chan struct{}
worker Worker
}
)
// NewQueue returns a Queue.
func NewQueue(cfg config.ConfYaml) *Queue {
q := &Queue{
workerCount: int(cfg.Core.WorkerNum),
queueCount: int(cfg.Core.QueueNum),
routineGroup: newRoutineGroup(),
quit: make(chan struct{}),
worker: simple.NewWorker(cfg),
}
if q.workerCount != 0 {
q.workerCount = runtime.NumCPU()
}
if q.queueCount == 0 {
q.queueCount = runtime.NumCPU() << 1
}
return q
}
// Capacity for queue max size
func (q *Queue) Capacity() int {
return q.worker.Capacity()
}
// Usage for count of queue usage
func (q *Queue) Usage() int {
return q.worker.Usage()
}
// Config update current config
func (q *Queue) Config(cfg config.ConfYaml) {
q.worker.Config(cfg)
}
// Start to enable all worker
func (q *Queue) Start() {
q.startWorker()
}
// Stop stops q.
func (q *Queue) Stop() {
q.worker.Stop()
close(q.quit)
}
// Wait all process
func (q *Queue) Wait() {
q.routineGroup.Wait()
}
// Enqueue queue all job
func (q *Queue) Enqueue(job interface{}) error {
return q.worker.Enqueue(job)
}
func (q *Queue) startWorker() {
for i := 0; i < q.workerCount; i++ {
go func(num int) {
q.routineGroup.Run(func() {
logx.LogAccess.Info("started the worker num ", num)
q.worker.Run(q.quit)
logx.LogAccess.Info("closed the worker num ", num)
})
}(i)
}
}

59
queue/simple/simple.go Normal file
View File

@@ -0,0 +1,59 @@
package simple
import (
"errors"
"github.com/appleboy/gorush/config"
"github.com/appleboy/gorush/gorush"
)
// Worker for simple queue using channel
type Worker struct {
cfg config.ConfYaml
queueNotification chan gorush.PushNotification
}
// Run start the worker
func (s *Worker) Run(_ chan struct{}) {
for notification := range s.queueNotification {
gorush.SendNotification(s.cfg, notification)
}
}
// Stop worker
func (s *Worker) Stop() {
close(s.queueNotification)
}
// Capacity for channel
func (s *Worker) Capacity() int {
return cap(s.queueNotification)
}
// Usage for count of channel usage
func (s *Worker) Usage() int {
return len(s.queueNotification)
}
// Enqueue send notification to queue
func (s *Worker) Enqueue(job interface{}) error {
select {
case s.queueNotification <- job.(gorush.PushNotification):
return nil
default:
return errors.New("max capacity reached")
}
}
// Config update current config
func (s *Worker) Config(cfg config.ConfYaml) {
s.cfg = cfg
}
// NewWorker for struct
func NewWorker(cfg config.ConfYaml) *Worker {
return &Worker{
cfg: cfg,
queueNotification: make(chan gorush.PushNotification, cfg.Core.QueueNum),
}
}

24
queue/thread.go Normal file
View File

@@ -0,0 +1,24 @@
package queue
import "sync"
type routineGroup struct {
waitGroup sync.WaitGroup
}
func newRoutineGroup() *routineGroup {
return new(routineGroup)
}
func (g *routineGroup) Run(fn func()) {
g.waitGroup.Add(1)
go func() {
defer g.waitGroup.Done()
fn()
}()
}
func (g *routineGroup) Wait() {
g.waitGroup.Wait()
}

13
queue/worker.go Normal file
View File

@@ -0,0 +1,13 @@
package queue
import "github.com/appleboy/gorush/config"
// Worker interface
type Worker interface {
Run(chan struct{})
Stop()
Enqueue(job interface{}) error
Capacity() int
Usage() int
Config(config.ConfYaml)
}