diff --git a/main.go b/main.go index 6a6a9c3..8dae9b5 100644 --- a/main.go +++ b/main.go @@ -329,7 +329,13 @@ func main() { logx.LogError.Fatalf("we don't support queue engine: %s", cfg.Queue.Engine) } - q := queue.NewQueue(w, int(cfg.Core.WorkerNum)) + q, err := queue.NewQueue( + queue.WithWorker(w), + queue.WithWorkerCount(int(cfg.Core.WorkerNum)), + ) + if err != nil { + logx.LogError.Fatal(err) + } q.Start() finished := make(chan struct{}) diff --git a/queue/nsq/nsq_test.go b/queue/nsq/nsq_test.go index 64a537c..a471d02 100644 --- a/queue/nsq/nsq_test.go +++ b/queue/nsq/nsq_test.go @@ -6,7 +6,9 @@ import ( "github.com/appleboy/gorush/logx" "github.com/appleboy/gorush/queue" + "github.com/nsqio/go-nsq" + "github.com/stretchr/testify/assert" ) var host = "nsq" @@ -28,7 +30,11 @@ func TestShutdown(t *testing.T) { WithAddr(host+":4150"), WithTopic("test"), ) - q := queue.NewQueue(w, 2) + q, err := queue.NewQueue( + queue.WithWorker(w), + queue.WithWorkerCount(2), + ) + assert.NoError(t, err) q.Start() time.Sleep(1 * time.Second) q.Shutdown() @@ -49,7 +55,11 @@ func TestCustomFuncAndWait(t *testing.T) { return nil }), ) - q := queue.NewQueue(w, 2) + q, err := queue.NewQueue( + queue.WithWorker(w), + queue.WithWorkerCount(2), + ) + assert.NoError(t, err) q.Start() time.Sleep(100 * time.Millisecond) q.Queue(m) diff --git a/queue/queue.go b/queue/queue.go index f048667..61004cf 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -1,6 +1,7 @@ package queue import ( + "errors" "runtime" ) @@ -15,21 +16,53 @@ type ( } ) +// Option for queue system +type Option func(*Queue) + +// ErrMissingWorker missing define worker +var ErrMissingWorker = errors.New("missing worker module") + +// WithWorkerCount set worker count +func WithWorkerCount(num int) Option { + return func(q *Queue) { + q.workerCount = num + } +} + +// WithLogger set custom logger +func WithLogger(l Logger) Option { + return func(q *Queue) { + q.logger = l + } +} + +// WithWorker set custom worker +func WithWorker(w Worker) Option { + return func(q *Queue) { + q.worker = w + } +} + // NewQueue returns a Queue. -func NewQueue(w Worker, workerNum int) *Queue { +func NewQueue(opts ...Option) (*Queue, error) { q := &Queue{ workerCount: runtime.NumCPU(), routineGroup: newRoutineGroup(), quit: make(chan struct{}), - worker: w, logger: new(defaultLogger), } - if workerNum > 0 { - q.workerCount = workerNum + // Loop through each option + for _, opt := range opts { + // Call the option giving the instantiated + opt(q) } - return q + if q.worker == nil { + return nil, ErrMissingWorker + } + + return q, nil } // Capacity for queue max size diff --git a/queue/simple/simple_test.go b/queue/simple/simple_test.go index 1365ab3..df3f2de 100644 --- a/queue/simple/simple_test.go +++ b/queue/simple/simple_test.go @@ -60,7 +60,11 @@ func TestCustomFuncAndWait(t *testing.T) { return nil }), ) - q := queue.NewQueue(w, 2) + q, err := queue.NewQueue( + queue.WithWorker(w), + queue.WithWorkerCount(2), + ) + assert.NoError(t, err) q.Start() time.Sleep(100 * time.Millisecond) q.Queue(m) diff --git a/router/server_test.go b/router/server_test.go index ad223dd..0aa0803 100644 --- a/router/server_test.go +++ b/router/server_test.go @@ -37,7 +37,10 @@ func TestMain(m *testing.M) { } w = simple.NewWorker() - q = queue.NewQueue(w, 4) + q, _ = queue.NewQueue( + queue.WithWorker(w), + queue.WithWorkerCount(4), + ) q.Start() defer func() { q.Shutdown()