chore: move queue to appleboy/queue (#612)
* chore: move queue to appleboy/queue Signed-off-by: Bo-Yi Wu <appleboy.tw@gmail.com>
This commit is contained in:
		
							parent
							
								
									fcfa195bc6
								
							
						
					
					
						commit
						14f19fcb20
					
				
							
								
								
									
										1
									
								
								go.mod
								
								
								
								
							
							
						
						
									
										1
									
								
								go.mod
								
								
								
								
							| 
						 | 
				
			
			@ -7,6 +7,7 @@ require (
 | 
			
		|||
	github.com/appleboy/gin-status-api v1.1.0
 | 
			
		||||
	github.com/appleboy/go-fcm v0.1.5
 | 
			
		||||
	github.com/appleboy/gofight/v2 v2.1.2
 | 
			
		||||
	github.com/appleboy/queue v0.0.2 // indirect
 | 
			
		||||
	github.com/asdine/storm/v3 v3.2.1
 | 
			
		||||
	github.com/buger/jsonparser v1.1.1
 | 
			
		||||
	github.com/dgraph-io/badger/v3 v3.2103.1
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
							
								
								
									
										2
									
								
								go.sum
								
								
								
								
							
							
						
						
									
										2
									
								
								go.sum
								
								
								
								
							| 
						 | 
				
			
			@ -40,6 +40,8 @@ github.com/appleboy/go-fcm v0.1.5 h1:fKbcZf/7vwGsvDkcop8a+kCHnK+tt4wXX0X7uEzwI6E
 | 
			
		|||
github.com/appleboy/go-fcm v0.1.5/go.mod h1:MSxZ4LqGRsnywOjnlXJXMqbjZrG4vf+0oHitfC9HRH0=
 | 
			
		||||
github.com/appleboy/gofight/v2 v2.1.2 h1:VOy3jow4vIK8BRQJoC/I9muxyYlJ2yb9ht2hZoS3rf4=
 | 
			
		||||
github.com/appleboy/gofight/v2 v2.1.2/go.mod h1:frW+U1QZEdDgixycTj4CygQ48yLTUhplt43+Wczp3rw=
 | 
			
		||||
github.com/appleboy/queue v0.0.2 h1:yjdRfa2G8Q/4NYaBdqR0m6GPjgdlWMc+NvSACR01LoE=
 | 
			
		||||
github.com/appleboy/queue v0.0.2/go.mod h1:mmhZvP5Zl0avp+LA4XL29Aad0Df39ZxOTIiMs/OhM2Q=
 | 
			
		||||
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
 | 
			
		||||
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
 | 
			
		||||
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
							
								
								
									
										2
									
								
								main.go
								
								
								
								
							
							
						
						
									
										2
									
								
								main.go
								
								
								
								
							| 
						 | 
				
			
			@ -18,13 +18,13 @@ import (
 | 
			
		|||
	"github.com/appleboy/gorush/core"
 | 
			
		||||
	"github.com/appleboy/gorush/logx"
 | 
			
		||||
	"github.com/appleboy/gorush/notify"
 | 
			
		||||
	"github.com/appleboy/gorush/queue"
 | 
			
		||||
	"github.com/appleboy/gorush/queue/nsq"
 | 
			
		||||
	"github.com/appleboy/gorush/queue/simple"
 | 
			
		||||
	"github.com/appleboy/gorush/router"
 | 
			
		||||
	"github.com/appleboy/gorush/rpc"
 | 
			
		||||
	"github.com/appleboy/gorush/status"
 | 
			
		||||
 | 
			
		||||
	"github.com/appleboy/queue"
 | 
			
		||||
	"golang.org/x/sync/errgroup"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -11,9 +11,9 @@ import (
 | 
			
		|||
	"github.com/appleboy/gorush/config"
 | 
			
		||||
	"github.com/appleboy/gorush/core"
 | 
			
		||||
	"github.com/appleboy/gorush/logx"
 | 
			
		||||
	"github.com/appleboy/gorush/queue"
 | 
			
		||||
 | 
			
		||||
	"github.com/appleboy/go-fcm"
 | 
			
		||||
	"github.com/appleboy/queue"
 | 
			
		||||
	jsoniter "github.com/json-iterator/go"
 | 
			
		||||
	"github.com/msalihkarakasli/go-hms-push/push/model"
 | 
			
		||||
)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,43 +0,0 @@
 | 
			
		|||
package queue
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
 | 
			
		||||
	"github.com/rs/zerolog/log"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Logger interface is used throughout gorush
 | 
			
		||||
type Logger interface {
 | 
			
		||||
	Infof(format string, args ...interface{})
 | 
			
		||||
	Errorf(format string, args ...interface{})
 | 
			
		||||
	Fatalf(format string, args ...interface{})
 | 
			
		||||
	Info(args ...interface{})
 | 
			
		||||
	Error(args ...interface{})
 | 
			
		||||
	Fatal(args ...interface{})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type defaultLogger struct{}
 | 
			
		||||
 | 
			
		||||
func (l defaultLogger) Infof(format string, args ...interface{}) {
 | 
			
		||||
	log.Info().Msgf(format, args...)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (l defaultLogger) Errorf(format string, args ...interface{}) {
 | 
			
		||||
	log.Error().Msgf(format, args...)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (l defaultLogger) Fatalf(format string, args ...interface{}) {
 | 
			
		||||
	log.Fatal().Msgf(format, args...)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (l defaultLogger) Info(args ...interface{}) {
 | 
			
		||||
	log.Info().Msg(fmt.Sprint(args...))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (l defaultLogger) Error(args ...interface{}) {
 | 
			
		||||
	log.Error().Msg(fmt.Sprint(args...))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (l defaultLogger) Fatal(args ...interface{}) {
 | 
			
		||||
	log.Fatal().Msg(fmt.Sprint(args...))
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -7,8 +7,8 @@ import (
 | 
			
		|||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/appleboy/gorush/notify"
 | 
			
		||||
	"github.com/appleboy/gorush/queue"
 | 
			
		||||
 | 
			
		||||
	"github.com/appleboy/queue"
 | 
			
		||||
	"github.com/nsqio/go-nsq"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -5,8 +5,8 @@ import (
 | 
			
		|||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/appleboy/gorush/logx"
 | 
			
		||||
	"github.com/appleboy/gorush/queue"
 | 
			
		||||
 | 
			
		||||
	"github.com/appleboy/queue"
 | 
			
		||||
	"github.com/nsqio/go-nsq"
 | 
			
		||||
	"github.com/stretchr/testify/assert"
 | 
			
		||||
)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
							
								
								
									
										138
									
								
								queue/queue.go
								
								
								
								
							
							
						
						
									
										138
									
								
								queue/queue.go
								
								
								
								
							| 
						 | 
				
			
			@ -1,138 +0,0 @@
 | 
			
		|||
package queue
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"errors"
 | 
			
		||||
	"runtime"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"sync/atomic"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type (
 | 
			
		||||
	// A Queue is a message queue.
 | 
			
		||||
	Queue struct {
 | 
			
		||||
		logger         Logger
 | 
			
		||||
		workerCount    int
 | 
			
		||||
		routineGroup   *routineGroup
 | 
			
		||||
		quit           chan struct{}
 | 
			
		||||
		worker         Worker
 | 
			
		||||
		stopOnce       sync.Once
 | 
			
		||||
		runningWorkers int32
 | 
			
		||||
	}
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Option for queue system
 | 
			
		||||
type Option func(*Queue)
 | 
			
		||||
 | 
			
		||||
// ErrMissingWorker missing define worker
 | 
			
		||||
var ErrMissingWorker = errors.New("missing worker module")
 | 
			
		||||
 | 
			
		||||
// WithWorkerCount set worker count
 | 
			
		||||
func WithWorkerCount(num int) Option {
 | 
			
		||||
	return func(q *Queue) {
 | 
			
		||||
		q.workerCount = num
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// WithLogger set custom logger
 | 
			
		||||
func WithLogger(l Logger) Option {
 | 
			
		||||
	return func(q *Queue) {
 | 
			
		||||
		q.logger = l
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// WithWorker set custom worker
 | 
			
		||||
func WithWorker(w Worker) Option {
 | 
			
		||||
	return func(q *Queue) {
 | 
			
		||||
		q.worker = w
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewQueue returns a Queue.
 | 
			
		||||
func NewQueue(opts ...Option) (*Queue, error) {
 | 
			
		||||
	q := &Queue{
 | 
			
		||||
		workerCount:  runtime.NumCPU(),
 | 
			
		||||
		routineGroup: newRoutineGroup(),
 | 
			
		||||
		quit:         make(chan struct{}),
 | 
			
		||||
		logger:       new(defaultLogger),
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Loop through each option
 | 
			
		||||
	for _, opt := range opts {
 | 
			
		||||
		// Call the option giving the instantiated
 | 
			
		||||
		opt(q)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if q.worker == nil {
 | 
			
		||||
		return nil, ErrMissingWorker
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return q, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Capacity for queue max size
 | 
			
		||||
func (q *Queue) Capacity() int {
 | 
			
		||||
	return q.worker.Capacity()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Usage for count of queue usage
 | 
			
		||||
func (q *Queue) Usage() int {
 | 
			
		||||
	return q.worker.Usage()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Start to enable all worker
 | 
			
		||||
func (q *Queue) Start() {
 | 
			
		||||
	q.startWorker()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Shutdown stops all queues.
 | 
			
		||||
func (q *Queue) Shutdown() {
 | 
			
		||||
	q.stopOnce.Do(func() {
 | 
			
		||||
		q.worker.Shutdown()
 | 
			
		||||
		close(q.quit)
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Workers returns the numbers of workers has been created.
 | 
			
		||||
func (q *Queue) Workers() int {
 | 
			
		||||
	return int(atomic.LoadInt32(&q.runningWorkers))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Wait all process
 | 
			
		||||
func (q *Queue) Wait() {
 | 
			
		||||
	q.routineGroup.Wait()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Queue to queue all job
 | 
			
		||||
func (q *Queue) Queue(job QueuedMessage) error {
 | 
			
		||||
	return q.worker.Queue(job)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (q *Queue) work() {
 | 
			
		||||
	num := atomic.AddInt32(&q.runningWorkers, 1)
 | 
			
		||||
	if err := q.worker.BeforeRun(); err != nil {
 | 
			
		||||
		q.logger.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
	q.routineGroup.Run(func() {
 | 
			
		||||
		// to handle panic cases from inside the worker
 | 
			
		||||
		// in such case, we start a new goroutine
 | 
			
		||||
		defer func() {
 | 
			
		||||
			atomic.AddInt32(&q.runningWorkers, -1)
 | 
			
		||||
			if err := recover(); err != nil {
 | 
			
		||||
				q.logger.Error(err)
 | 
			
		||||
				go q.work()
 | 
			
		||||
			}
 | 
			
		||||
		}()
 | 
			
		||||
		q.logger.Infof("start the worker num: %d", num)
 | 
			
		||||
		q.worker.Run(q.quit)
 | 
			
		||||
		q.logger.Infof("stop the worker num: %d", num)
 | 
			
		||||
	})
 | 
			
		||||
	if err := q.worker.AfterRun(); err != nil {
 | 
			
		||||
		q.logger.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (q *Queue) startWorker() {
 | 
			
		||||
	for i := 0; i < q.workerCount; i++ {
 | 
			
		||||
		go q.work()
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -5,7 +5,8 @@ import (
 | 
			
		|||
	"runtime"
 | 
			
		||||
 | 
			
		||||
	"github.com/appleboy/gorush/notify"
 | 
			
		||||
	"github.com/appleboy/gorush/queue"
 | 
			
		||||
 | 
			
		||||
	"github.com/appleboy/queue"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var _ queue.Worker = (*Worker)(nil)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -7,8 +7,8 @@ import (
 | 
			
		|||
 | 
			
		||||
	"github.com/appleboy/gorush/logx"
 | 
			
		||||
	"github.com/appleboy/gorush/notify"
 | 
			
		||||
	"github.com/appleboy/gorush/queue"
 | 
			
		||||
 | 
			
		||||
	"github.com/appleboy/queue"
 | 
			
		||||
	"github.com/stretchr/testify/assert"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,24 +0,0 @@
 | 
			
		|||
package queue
 | 
			
		||||
 | 
			
		||||
import "sync"
 | 
			
		||||
 | 
			
		||||
type routineGroup struct {
 | 
			
		||||
	waitGroup sync.WaitGroup
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newRoutineGroup() *routineGroup {
 | 
			
		||||
	return new(routineGroup)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *routineGroup) Run(fn func()) {
 | 
			
		||||
	g.waitGroup.Add(1)
 | 
			
		||||
 | 
			
		||||
	go func() {
 | 
			
		||||
		defer g.waitGroup.Done()
 | 
			
		||||
		fn()
 | 
			
		||||
	}()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *routineGroup) Wait() {
 | 
			
		||||
	g.waitGroup.Wait()
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -1,18 +0,0 @@
 | 
			
		|||
package queue
 | 
			
		||||
 | 
			
		||||
// Worker interface
 | 
			
		||||
type Worker interface {
 | 
			
		||||
	BeforeRun() error
 | 
			
		||||
	Run(chan struct{}) error
 | 
			
		||||
	AfterRun() error
 | 
			
		||||
 | 
			
		||||
	Shutdown() error
 | 
			
		||||
	Queue(job QueuedMessage) error
 | 
			
		||||
	Capacity() int
 | 
			
		||||
	Usage() int
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// QueuedMessage ...
 | 
			
		||||
type QueuedMessage interface {
 | 
			
		||||
	Bytes() []byte
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -14,10 +14,10 @@ import (
 | 
			
		|||
	"github.com/appleboy/gorush/logx"
 | 
			
		||||
	"github.com/appleboy/gorush/metric"
 | 
			
		||||
	"github.com/appleboy/gorush/notify"
 | 
			
		||||
	"github.com/appleboy/gorush/queue"
 | 
			
		||||
	"github.com/appleboy/gorush/status"
 | 
			
		||||
 | 
			
		||||
	api "github.com/appleboy/gin-status-api"
 | 
			
		||||
	"github.com/appleboy/queue"
 | 
			
		||||
	"github.com/gin-contrib/logger"
 | 
			
		||||
	"github.com/gin-gonic/gin"
 | 
			
		||||
	"github.com/gin-gonic/gin/binding"
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -8,9 +8,9 @@ import (
 | 
			
		|||
 | 
			
		||||
	"github.com/appleboy/gorush/config"
 | 
			
		||||
	"github.com/appleboy/gorush/logx"
 | 
			
		||||
	"github.com/appleboy/gorush/queue"
 | 
			
		||||
 | 
			
		||||
	"github.com/apex/gateway"
 | 
			
		||||
	"github.com/appleboy/queue"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// RunHTTPServer provide run http or https protocol.
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -12,8 +12,8 @@ import (
 | 
			
		|||
 | 
			
		||||
	"github.com/appleboy/gorush/config"
 | 
			
		||||
	"github.com/appleboy/gorush/logx"
 | 
			
		||||
	"github.com/appleboy/gorush/queue"
 | 
			
		||||
 | 
			
		||||
	"github.com/appleboy/queue"
 | 
			
		||||
	"golang.org/x/sync/errgroup"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -14,11 +14,11 @@ import (
 | 
			
		|||
	"github.com/appleboy/gorush/config"
 | 
			
		||||
	"github.com/appleboy/gorush/core"
 | 
			
		||||
	"github.com/appleboy/gorush/notify"
 | 
			
		||||
	"github.com/appleboy/gorush/queue"
 | 
			
		||||
	"github.com/appleboy/gorush/queue/simple"
 | 
			
		||||
	"github.com/appleboy/gorush/status"
 | 
			
		||||
 | 
			
		||||
	"github.com/appleboy/gofight/v2"
 | 
			
		||||
	"github.com/appleboy/queue"
 | 
			
		||||
	"github.com/buger/jsonparser"
 | 
			
		||||
	"github.com/gin-gonic/gin"
 | 
			
		||||
	"github.com/stretchr/testify/assert"
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue