From 14f19fcb20485d5eb0cb518a235e073196ec9461 Mon Sep 17 00:00:00 2001 From: Bo-Yi Wu Date: Sat, 24 Jul 2021 17:18:35 +0800 Subject: [PATCH] chore: move queue to appleboy/queue (#612) * chore: move queue to appleboy/queue Signed-off-by: Bo-Yi Wu --- go.mod | 1 + go.sum | 2 + main.go | 2 +- notify/notification.go | 2 +- queue/logger.go | 43 ----------- queue/nsq/nsq.go | 2 +- queue/nsq/nsq_test.go | 2 +- queue/queue.go | 138 ------------------------------------ queue/simple/simple.go | 3 +- queue/simple/simple_test.go | 2 +- queue/thread.go | 24 ------- queue/worker.go | 18 ----- router/server.go | 2 +- router/server_lambda.go | 2 +- router/server_normal.go | 2 +- router/server_test.go | 2 +- 16 files changed, 14 insertions(+), 233 deletions(-) delete mode 100644 queue/logger.go delete mode 100644 queue/queue.go delete mode 100644 queue/thread.go delete mode 100644 queue/worker.go diff --git a/go.mod b/go.mod index dd72e49..c120c6f 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/appleboy/gin-status-api v1.1.0 github.com/appleboy/go-fcm v0.1.5 github.com/appleboy/gofight/v2 v2.1.2 + github.com/appleboy/queue v0.0.2 // indirect github.com/asdine/storm/v3 v3.2.1 github.com/buger/jsonparser v1.1.1 github.com/dgraph-io/badger/v3 v3.2103.1 diff --git a/go.sum b/go.sum index 4b7e037..50555e8 100644 --- a/go.sum +++ b/go.sum @@ -40,6 +40,8 @@ github.com/appleboy/go-fcm v0.1.5 h1:fKbcZf/7vwGsvDkcop8a+kCHnK+tt4wXX0X7uEzwI6E github.com/appleboy/go-fcm v0.1.5/go.mod h1:MSxZ4LqGRsnywOjnlXJXMqbjZrG4vf+0oHitfC9HRH0= github.com/appleboy/gofight/v2 v2.1.2 h1:VOy3jow4vIK8BRQJoC/I9muxyYlJ2yb9ht2hZoS3rf4= github.com/appleboy/gofight/v2 v2.1.2/go.mod h1:frW+U1QZEdDgixycTj4CygQ48yLTUhplt43+Wczp3rw= +github.com/appleboy/queue v0.0.2 h1:yjdRfa2G8Q/4NYaBdqR0m6GPjgdlWMc+NvSACR01LoE= +github.com/appleboy/queue v0.0.2/go.mod h1:mmhZvP5Zl0avp+LA4XL29Aad0Df39ZxOTIiMs/OhM2Q= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= diff --git a/main.go b/main.go index b538706..0e51a93 100644 --- a/main.go +++ b/main.go @@ -18,13 +18,13 @@ import ( "github.com/appleboy/gorush/core" "github.com/appleboy/gorush/logx" "github.com/appleboy/gorush/notify" - "github.com/appleboy/gorush/queue" "github.com/appleboy/gorush/queue/nsq" "github.com/appleboy/gorush/queue/simple" "github.com/appleboy/gorush/router" "github.com/appleboy/gorush/rpc" "github.com/appleboy/gorush/status" + "github.com/appleboy/queue" "golang.org/x/sync/errgroup" ) diff --git a/notify/notification.go b/notify/notification.go index dece32f..0540756 100644 --- a/notify/notification.go +++ b/notify/notification.go @@ -11,9 +11,9 @@ import ( "github.com/appleboy/gorush/config" "github.com/appleboy/gorush/core" "github.com/appleboy/gorush/logx" - "github.com/appleboy/gorush/queue" "github.com/appleboy/go-fcm" + "github.com/appleboy/queue" jsoniter "github.com/json-iterator/go" "github.com/msalihkarakasli/go-hms-push/push/model" ) diff --git a/queue/logger.go b/queue/logger.go deleted file mode 100644 index 5d19295..0000000 --- a/queue/logger.go +++ /dev/null @@ -1,43 +0,0 @@ -package queue - -import ( - "fmt" - - "github.com/rs/zerolog/log" -) - -// Logger interface is used throughout gorush -type Logger interface { - Infof(format string, args ...interface{}) - Errorf(format string, args ...interface{}) - Fatalf(format string, args ...interface{}) - Info(args ...interface{}) - Error(args ...interface{}) - Fatal(args ...interface{}) -} - -type defaultLogger struct{} - -func (l defaultLogger) Infof(format string, args ...interface{}) { - log.Info().Msgf(format, args...) -} - -func (l defaultLogger) Errorf(format string, args ...interface{}) { - log.Error().Msgf(format, args...) -} - -func (l defaultLogger) Fatalf(format string, args ...interface{}) { - log.Fatal().Msgf(format, args...) -} - -func (l defaultLogger) Info(args ...interface{}) { - log.Info().Msg(fmt.Sprint(args...)) -} - -func (l defaultLogger) Error(args ...interface{}) { - log.Error().Msg(fmt.Sprint(args...)) -} - -func (l defaultLogger) Fatal(args ...interface{}) { - log.Fatal().Msg(fmt.Sprint(args...)) -} diff --git a/queue/nsq/nsq.go b/queue/nsq/nsq.go index d5320c4..cb96068 100644 --- a/queue/nsq/nsq.go +++ b/queue/nsq/nsq.go @@ -7,8 +7,8 @@ import ( "time" "github.com/appleboy/gorush/notify" - "github.com/appleboy/gorush/queue" + "github.com/appleboy/queue" "github.com/nsqio/go-nsq" ) diff --git a/queue/nsq/nsq_test.go b/queue/nsq/nsq_test.go index 0f11c57..d2ed72d 100644 --- a/queue/nsq/nsq_test.go +++ b/queue/nsq/nsq_test.go @@ -5,8 +5,8 @@ import ( "time" "github.com/appleboy/gorush/logx" - "github.com/appleboy/gorush/queue" + "github.com/appleboy/queue" "github.com/nsqio/go-nsq" "github.com/stretchr/testify/assert" ) diff --git a/queue/queue.go b/queue/queue.go deleted file mode 100644 index 7c73639..0000000 --- a/queue/queue.go +++ /dev/null @@ -1,138 +0,0 @@ -package queue - -import ( - "errors" - "runtime" - "sync" - "sync/atomic" -) - -type ( - // A Queue is a message queue. - Queue struct { - logger Logger - workerCount int - routineGroup *routineGroup - quit chan struct{} - worker Worker - stopOnce sync.Once - runningWorkers int32 - } -) - -// Option for queue system -type Option func(*Queue) - -// ErrMissingWorker missing define worker -var ErrMissingWorker = errors.New("missing worker module") - -// WithWorkerCount set worker count -func WithWorkerCount(num int) Option { - return func(q *Queue) { - q.workerCount = num - } -} - -// WithLogger set custom logger -func WithLogger(l Logger) Option { - return func(q *Queue) { - q.logger = l - } -} - -// WithWorker set custom worker -func WithWorker(w Worker) Option { - return func(q *Queue) { - q.worker = w - } -} - -// NewQueue returns a Queue. -func NewQueue(opts ...Option) (*Queue, error) { - q := &Queue{ - workerCount: runtime.NumCPU(), - routineGroup: newRoutineGroup(), - quit: make(chan struct{}), - logger: new(defaultLogger), - } - - // Loop through each option - for _, opt := range opts { - // Call the option giving the instantiated - opt(q) - } - - if q.worker == nil { - return nil, ErrMissingWorker - } - - return q, nil -} - -// Capacity for queue max size -func (q *Queue) Capacity() int { - return q.worker.Capacity() -} - -// Usage for count of queue usage -func (q *Queue) Usage() int { - return q.worker.Usage() -} - -// Start to enable all worker -func (q *Queue) Start() { - q.startWorker() -} - -// Shutdown stops all queues. -func (q *Queue) Shutdown() { - q.stopOnce.Do(func() { - q.worker.Shutdown() - close(q.quit) - }) -} - -// Workers returns the numbers of workers has been created. -func (q *Queue) Workers() int { - return int(atomic.LoadInt32(&q.runningWorkers)) -} - -// Wait all process -func (q *Queue) Wait() { - q.routineGroup.Wait() -} - -// Queue to queue all job -func (q *Queue) Queue(job QueuedMessage) error { - return q.worker.Queue(job) -} - -func (q *Queue) work() { - num := atomic.AddInt32(&q.runningWorkers, 1) - if err := q.worker.BeforeRun(); err != nil { - q.logger.Fatal(err) - } - q.routineGroup.Run(func() { - // to handle panic cases from inside the worker - // in such case, we start a new goroutine - defer func() { - atomic.AddInt32(&q.runningWorkers, -1) - if err := recover(); err != nil { - q.logger.Error(err) - go q.work() - } - }() - q.logger.Infof("start the worker num: %d", num) - q.worker.Run(q.quit) - q.logger.Infof("stop the worker num: %d", num) - }) - if err := q.worker.AfterRun(); err != nil { - q.logger.Fatal(err) - } -} - -func (q *Queue) startWorker() { - for i := 0; i < q.workerCount; i++ { - go q.work() - } -} diff --git a/queue/simple/simple.go b/queue/simple/simple.go index 027e42b..f58e51d 100644 --- a/queue/simple/simple.go +++ b/queue/simple/simple.go @@ -5,7 +5,8 @@ import ( "runtime" "github.com/appleboy/gorush/notify" - "github.com/appleboy/gorush/queue" + + "github.com/appleboy/queue" ) var _ queue.Worker = (*Worker)(nil) diff --git a/queue/simple/simple_test.go b/queue/simple/simple_test.go index 9d5b192..3c7fff1 100644 --- a/queue/simple/simple_test.go +++ b/queue/simple/simple_test.go @@ -7,8 +7,8 @@ import ( "github.com/appleboy/gorush/logx" "github.com/appleboy/gorush/notify" - "github.com/appleboy/gorush/queue" + "github.com/appleboy/queue" "github.com/stretchr/testify/assert" ) diff --git a/queue/thread.go b/queue/thread.go deleted file mode 100644 index 473c351..0000000 --- a/queue/thread.go +++ /dev/null @@ -1,24 +0,0 @@ -package queue - -import "sync" - -type routineGroup struct { - waitGroup sync.WaitGroup -} - -func newRoutineGroup() *routineGroup { - return new(routineGroup) -} - -func (g *routineGroup) Run(fn func()) { - g.waitGroup.Add(1) - - go func() { - defer g.waitGroup.Done() - fn() - }() -} - -func (g *routineGroup) Wait() { - g.waitGroup.Wait() -} diff --git a/queue/worker.go b/queue/worker.go deleted file mode 100644 index 2dace56..0000000 --- a/queue/worker.go +++ /dev/null @@ -1,18 +0,0 @@ -package queue - -// Worker interface -type Worker interface { - BeforeRun() error - Run(chan struct{}) error - AfterRun() error - - Shutdown() error - Queue(job QueuedMessage) error - Capacity() int - Usage() int -} - -// QueuedMessage ... -type QueuedMessage interface { - Bytes() []byte -} diff --git a/router/server.go b/router/server.go index a918ed8..000fc0c 100644 --- a/router/server.go +++ b/router/server.go @@ -14,10 +14,10 @@ import ( "github.com/appleboy/gorush/logx" "github.com/appleboy/gorush/metric" "github.com/appleboy/gorush/notify" - "github.com/appleboy/gorush/queue" "github.com/appleboy/gorush/status" api "github.com/appleboy/gin-status-api" + "github.com/appleboy/queue" "github.com/gin-contrib/logger" "github.com/gin-gonic/gin" "github.com/gin-gonic/gin/binding" diff --git a/router/server_lambda.go b/router/server_lambda.go index 299e44e..59d0ada 100644 --- a/router/server_lambda.go +++ b/router/server_lambda.go @@ -8,9 +8,9 @@ import ( "github.com/appleboy/gorush/config" "github.com/appleboy/gorush/logx" - "github.com/appleboy/gorush/queue" "github.com/apex/gateway" + "github.com/appleboy/queue" ) // RunHTTPServer provide run http or https protocol. diff --git a/router/server_normal.go b/router/server_normal.go index 7884246..687a3ee 100644 --- a/router/server_normal.go +++ b/router/server_normal.go @@ -12,8 +12,8 @@ import ( "github.com/appleboy/gorush/config" "github.com/appleboy/gorush/logx" - "github.com/appleboy/gorush/queue" + "github.com/appleboy/queue" "golang.org/x/sync/errgroup" ) diff --git a/router/server_test.go b/router/server_test.go index 7487a2f..552d242 100644 --- a/router/server_test.go +++ b/router/server_test.go @@ -14,11 +14,11 @@ import ( "github.com/appleboy/gorush/config" "github.com/appleboy/gorush/core" "github.com/appleboy/gorush/notify" - "github.com/appleboy/gorush/queue" "github.com/appleboy/gorush/queue/simple" "github.com/appleboy/gorush/status" "github.com/appleboy/gofight/v2" + "github.com/appleboy/queue" "github.com/buger/jsonparser" "github.com/gin-gonic/gin" "github.com/stretchr/testify/assert"