refactor(queue): update run func in simple channel (#605)

This commit is contained in:
Bo-Yi Wu 2021-07-18 19:34:30 +08:00 committed by GitHub
parent d131d2935a
commit 05ec3209f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 64 additions and 14 deletions

View File

@ -17,8 +17,8 @@ var errMaxCapacity = errors.New("max capacity reached")
// Worker for simple queue using channel
type Worker struct {
QueueNotification chan queue.QueuedMessage
runFunc func(*Worker) error
queueNotification chan queue.QueuedMessage
runFunc func(queue.QueuedMessage) error
}
// BeforeRun run script before start worker
@ -33,29 +33,32 @@ func (s *Worker) AfterRun() error {
// Run start the worker
func (s *Worker) Run(_ chan struct{}) error {
return s.runFunc(s)
for notification := range s.queueNotification {
s.runFunc(notification)
}
return nil
}
// Shutdown worker
func (s *Worker) Shutdown() error {
close(s.QueueNotification)
close(s.queueNotification)
return nil
}
// Capacity for channel
func (s *Worker) Capacity() int {
return cap(s.QueueNotification)
return cap(s.queueNotification)
}
// Usage for count of channel usage
func (s *Worker) Usage() int {
return len(s.QueueNotification)
return len(s.queueNotification)
}
// Queue send notification to queue
func (s *Worker) Queue(job queue.QueuedMessage) error {
select {
case s.QueueNotification <- job:
case s.queueNotification <- job:
return nil
default:
return errMaxCapacity
@ -65,12 +68,12 @@ func (s *Worker) Queue(job queue.QueuedMessage) error {
// WithQueueNum setup the capcity of queue
func WithQueueNum(num int) Option {
return func(w *Worker) {
w.QueueNotification = make(chan queue.QueuedMessage, num)
w.queueNotification = make(chan queue.QueuedMessage, num)
}
}
// WithRunFunc setup the run func of queue
func WithRunFunc(fn func(w *Worker) error) Option {
func WithRunFunc(fn func(queue.QueuedMessage) error) Option {
return func(w *Worker) {
w.runFunc = fn
}
@ -79,11 +82,9 @@ func WithRunFunc(fn func(w *Worker) error) Option {
// NewWorker for struc
func NewWorker(opts ...Option) *Worker {
w := &Worker{
QueueNotification: make(chan queue.QueuedMessage, runtime.NumCPU()<<1),
runFunc: func(w *Worker) error {
for notification := range w.QueueNotification {
gorush.SendNotification(notification)
}
queueNotification: make(chan queue.QueuedMessage, runtime.NumCPU()<<1),
runFunc: func(msg queue.QueuedMessage) error {
gorush.SendNotification(msg)
return nil
},
}

View File

@ -1,14 +1,39 @@
package simple
import (
"log"
"runtime"
"testing"
"time"
"github.com/appleboy/gorush/gorush"
"github.com/appleboy/gorush/logx"
"github.com/appleboy/gorush/queue"
"github.com/stretchr/testify/assert"
)
type mockMessage struct {
msg string
}
func (m mockMessage) Bytes() []byte {
return []byte(m.msg)
}
func TestMain(m *testing.M) {
if err := logx.InitLog(
"debug",
"stdout",
"debug",
"stdout",
); err != nil {
log.Fatalf("Can't load log module, error: %v", err)
}
m.Run()
}
func TestQueueUsage(t *testing.T) {
w := NewWorker()
assert.Equal(t, runtime.NumCPU()<<1, w.Capacity())
@ -33,3 +58,27 @@ func TestMaxCapacity(t *testing.T) {
err := w.Queue(&gorush.PushNotification{})
assert.Equal(t, errMaxCapacity, err)
}
func TestCustomFuncAndWait(t *testing.T) {
m := mockMessage{
msg: "foo",
}
w := NewWorker(
WithRunFunc(func(msg queue.QueuedMessage) error {
logx.LogAccess.Infof("get message: %s", msg.Bytes())
time.Sleep(500 * time.Millisecond)
return nil
}),
)
q := queue.NewQueue(w, 2)
q.Start()
time.Sleep(100 * time.Millisecond)
q.Queue(m)
q.Queue(m)
q.Queue(m)
q.Queue(m)
time.Sleep(600 * time.Millisecond)
q.Shutdown()
q.Wait()
// you will see the execute time > 1000ms
}