chore(queue): add queue message interface (#602)

Signed-off-by: Bo-Yi Wu <appleboy.tw@gmail.com>
This commit is contained in:
Bo-Yi Wu 2021-07-18 01:58:46 +08:00 committed by GitHub
parent d6b4a0ae39
commit 380162a38a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 31 additions and 23 deletions

View File

@ -12,6 +12,7 @@ import (
"github.com/appleboy/gorush/config" "github.com/appleboy/gorush/config"
"github.com/appleboy/gorush/core" "github.com/appleboy/gorush/core"
"github.com/appleboy/gorush/logx" "github.com/appleboy/gorush/logx"
"github.com/appleboy/gorush/queue"
jsoniter "github.com/json-iterator/go" jsoniter "github.com/json-iterator/go"
"github.com/msalihkarakasli/go-hms-push/push/model" "github.com/msalihkarakasli/go-hms-push/push/model"
@ -252,17 +253,19 @@ func CheckPushConf(cfg config.ConfYaml) error {
} }
// SendNotification send notification // SendNotification send notification
func SendNotification(req PushNotification) { func SendNotification(req queue.QueuedMessage) {
v, _ := req.(*PushNotification)
defer func() { defer func() {
req.WaitDone() v.WaitDone()
}() }()
switch req.Platform { switch v.Platform {
case core.PlatFormIos: case core.PlatFormIos:
PushToIOS(req) PushToIOS(*v)
case core.PlatFormAndroid: case core.PlatFormAndroid:
PushToAndroid(req) PushToAndroid(*v)
case core.PlatFormHuawei: case core.PlatFormHuawei:
PushToHuawei(req) PushToHuawei(*v)
} }
} }

View File

@ -102,7 +102,7 @@ func (s *Worker) Run(quit chan struct{}) error {
s.q.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error { s.q.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error {
wg.Add(1) wg.Add(1)
defer wg.Done() defer wg.Done()
var notification gorush.PushNotification var notification *gorush.PushNotification
if err := json.Unmarshal(msg.Body, &notification); err != nil { if err := json.Unmarshal(msg.Body, &notification); err != nil {
return err return err
} }
@ -136,8 +136,8 @@ func (s *Worker) Usage() int {
} }
// Queue send notification to queue // Queue send notification to queue
func (s *Worker) Queue(job interface{}) error { func (s *Worker) Queue(job queue.QueuedMessage) error {
v, ok := job.(gorush.PushNotification) v, ok := job.(*gorush.PushNotification)
if !ok { if !ok {
return errors.New("wrong type of job") return errors.New("wrong type of job")
} }

View File

@ -59,7 +59,7 @@ func (q *Queue) Wait() {
} }
// Queue to queue all job // Queue to queue all job
func (q *Queue) Queue(job interface{}) error { func (q *Queue) Queue(job QueuedMessage) error {
return q.worker.Queue(job) return q.worker.Queue(job)
} }

View File

@ -17,7 +17,7 @@ var errMaxCapacity = errors.New("max capacity reached")
// Worker for simple queue using channel // Worker for simple queue using channel
type Worker struct { type Worker struct {
queueNotification chan gorush.PushNotification queueNotification chan queue.QueuedMessage
} }
// BeforeRun run script before start worker // BeforeRun run script before start worker
@ -56,9 +56,9 @@ func (s *Worker) Usage() int {
} }
// Queue send notification to queue // Queue send notification to queue
func (s *Worker) Queue(job interface{}) error { func (s *Worker) Queue(job queue.QueuedMessage) error {
select { select {
case s.queueNotification <- job.(gorush.PushNotification): case s.queueNotification <- job:
return nil return nil
default: default:
return errMaxCapacity return errMaxCapacity
@ -68,14 +68,14 @@ func (s *Worker) Queue(job interface{}) error {
// WithQueueNum setup the capcity of queue // WithQueueNum setup the capcity of queue
func WithQueueNum(num int) Option { func WithQueueNum(num int) Option {
return func(w *Worker) { return func(w *Worker) {
w.queueNotification = make(chan gorush.PushNotification, num) w.queueNotification = make(chan queue.QueuedMessage, num)
} }
} }
// NewWorker for struc // NewWorker for struc
func NewWorker(opts ...Option) *Worker { func NewWorker(opts ...Option) *Worker {
w := &Worker{ w := &Worker{
queueNotification: make(chan gorush.PushNotification, runtime.NumCPU()<<1), queueNotification: make(chan queue.QueuedMessage, runtime.NumCPU()<<1),
} }
// Loop through each option // Loop through each option

View File

@ -14,7 +14,7 @@ func TestQueueUsage(t *testing.T) {
assert.Equal(t, runtime.NumCPU()<<1, w.Capacity()) assert.Equal(t, runtime.NumCPU()<<1, w.Capacity())
assert.Equal(t, 0, w.Usage()) assert.Equal(t, 0, w.Usage())
w.Queue(gorush.PushNotification{}) w.Queue(&gorush.PushNotification{})
assert.Equal(t, 1, w.Usage()) assert.Equal(t, 1, w.Usage())
} }
@ -23,13 +23,13 @@ func TestMaxCapacity(t *testing.T) {
assert.Equal(t, 2, w.Capacity()) assert.Equal(t, 2, w.Capacity())
assert.Equal(t, 0, w.Usage()) assert.Equal(t, 0, w.Usage())
assert.NoError(t, w.Queue(gorush.PushNotification{})) assert.NoError(t, w.Queue(&gorush.PushNotification{}))
assert.Equal(t, 1, w.Usage()) assert.Equal(t, 1, w.Usage())
assert.NoError(t, w.Queue(gorush.PushNotification{})) assert.NoError(t, w.Queue(&gorush.PushNotification{}))
assert.Equal(t, 2, w.Usage()) assert.Equal(t, 2, w.Usage())
assert.Error(t, w.Queue(gorush.PushNotification{})) assert.Error(t, w.Queue(&gorush.PushNotification{}))
assert.Equal(t, 2, w.Usage()) assert.Equal(t, 2, w.Usage())
err := w.Queue(gorush.PushNotification{}) err := w.Queue(&gorush.PushNotification{})
assert.Equal(t, errMaxCapacity, err) assert.Equal(t, errMaxCapacity, err)
} }

View File

@ -7,7 +7,12 @@ type Worker interface {
AfterRun() error AfterRun() error
Shutdown() error Shutdown() error
Queue(job interface{}) error Queue(job QueuedMessage) error
Capacity() int Capacity() int
Usage() int Usage() int
} }
// QueuedMessage ...
type QueuedMessage interface {
Bytes() []byte
}

View File

@ -283,7 +283,7 @@ func handleNotification(ctx context.Context, cfg config.ConfYaml, req gorush.Req
notification.AddWaitCount() notification.AddWaitCount()
} }
if err := q.Queue(*notification); err != nil { if err := q.Queue(notification); err != nil {
markFailedNotification(cfg, notification, "max capacity reached") markFailedNotification(cfg, notification, "max capacity reached")
} }

View File

@ -102,7 +102,7 @@ func (s *Server) Send(ctx context.Context, in *proto.NotificationRequest) (*prot
} }
} }
go gorush.SendNotification(notification) go gorush.SendNotification(&notification)
return &proto.NotificationReply{ return &proto.NotificationReply{
Success: true, Success: true,