chore: support custom parameter in queue (#608)

* chore: support custom parameter in queue

Signed-off-by: Bo-Yi Wu <appleboy.tw@gmail.com>
This commit is contained in:
Bo-Yi Wu 2021-07-24 01:29:47 +08:00 committed by GitHub
parent 66923789ff
commit ce6e87639a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 66 additions and 10 deletions

View File

@ -329,7 +329,13 @@ func main() {
logx.LogError.Fatalf("we don't support queue engine: %s", cfg.Queue.Engine) logx.LogError.Fatalf("we don't support queue engine: %s", cfg.Queue.Engine)
} }
q := queue.NewQueue(w, int(cfg.Core.WorkerNum)) q, err := queue.NewQueue(
queue.WithWorker(w),
queue.WithWorkerCount(int(cfg.Core.WorkerNum)),
)
if err != nil {
logx.LogError.Fatal(err)
}
q.Start() q.Start()
finished := make(chan struct{}) finished := make(chan struct{})

View File

@ -6,7 +6,9 @@ import (
"github.com/appleboy/gorush/logx" "github.com/appleboy/gorush/logx"
"github.com/appleboy/gorush/queue" "github.com/appleboy/gorush/queue"
"github.com/nsqio/go-nsq" "github.com/nsqio/go-nsq"
"github.com/stretchr/testify/assert"
) )
var host = "nsq" var host = "nsq"
@ -28,7 +30,11 @@ func TestShutdown(t *testing.T) {
WithAddr(host+":4150"), WithAddr(host+":4150"),
WithTopic("test"), WithTopic("test"),
) )
q := queue.NewQueue(w, 2) q, err := queue.NewQueue(
queue.WithWorker(w),
queue.WithWorkerCount(2),
)
assert.NoError(t, err)
q.Start() q.Start()
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
q.Shutdown() q.Shutdown()
@ -49,7 +55,11 @@ func TestCustomFuncAndWait(t *testing.T) {
return nil return nil
}), }),
) )
q := queue.NewQueue(w, 2) q, err := queue.NewQueue(
queue.WithWorker(w),
queue.WithWorkerCount(2),
)
assert.NoError(t, err)
q.Start() q.Start()
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
q.Queue(m) q.Queue(m)

View File

@ -1,6 +1,7 @@
package queue package queue
import ( import (
"errors"
"runtime" "runtime"
) )
@ -15,21 +16,53 @@ type (
} }
) )
// Option for queue system
type Option func(*Queue)
// ErrMissingWorker missing define worker
var ErrMissingWorker = errors.New("missing worker module")
// WithWorkerCount set worker count
func WithWorkerCount(num int) Option {
return func(q *Queue) {
q.workerCount = num
}
}
// WithLogger set custom logger
func WithLogger(l Logger) Option {
return func(q *Queue) {
q.logger = l
}
}
// WithWorker set custom worker
func WithWorker(w Worker) Option {
return func(q *Queue) {
q.worker = w
}
}
// NewQueue returns a Queue. // NewQueue returns a Queue.
func NewQueue(w Worker, workerNum int) *Queue { func NewQueue(opts ...Option) (*Queue, error) {
q := &Queue{ q := &Queue{
workerCount: runtime.NumCPU(), workerCount: runtime.NumCPU(),
routineGroup: newRoutineGroup(), routineGroup: newRoutineGroup(),
quit: make(chan struct{}), quit: make(chan struct{}),
worker: w,
logger: new(defaultLogger), logger: new(defaultLogger),
} }
if workerNum > 0 { // Loop through each option
q.workerCount = workerNum for _, opt := range opts {
// Call the option giving the instantiated
opt(q)
} }
return q if q.worker == nil {
return nil, ErrMissingWorker
}
return q, nil
} }
// Capacity for queue max size // Capacity for queue max size

View File

@ -60,7 +60,11 @@ func TestCustomFuncAndWait(t *testing.T) {
return nil return nil
}), }),
) )
q := queue.NewQueue(w, 2) q, err := queue.NewQueue(
queue.WithWorker(w),
queue.WithWorkerCount(2),
)
assert.NoError(t, err)
q.Start() q.Start()
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
q.Queue(m) q.Queue(m)

View File

@ -37,7 +37,10 @@ func TestMain(m *testing.M) {
} }
w = simple.NewWorker() w = simple.NewWorker()
q = queue.NewQueue(w, 4) q, _ = queue.NewQueue(
queue.WithWorker(w),
queue.WithWorkerCount(4),
)
q.Start() q.Start()
defer func() { defer func() {
q.Shutdown() q.Shutdown()