diff --git a/gorush/notification.go b/gorush/notification.go index 1a09a12..0eea6c1 100644 --- a/gorush/notification.go +++ b/gorush/notification.go @@ -12,6 +12,7 @@ import ( "github.com/appleboy/gorush/config" "github.com/appleboy/gorush/core" "github.com/appleboy/gorush/logx" + "github.com/appleboy/gorush/queue" jsoniter "github.com/json-iterator/go" "github.com/msalihkarakasli/go-hms-push/push/model" @@ -252,17 +253,19 @@ func CheckPushConf(cfg config.ConfYaml) error { } // SendNotification send notification -func SendNotification(req PushNotification) { +func SendNotification(req queue.QueuedMessage) { + v, _ := req.(*PushNotification) + defer func() { - req.WaitDone() + v.WaitDone() }() - switch req.Platform { + switch v.Platform { case core.PlatFormIos: - PushToIOS(req) + PushToIOS(*v) case core.PlatFormAndroid: - PushToAndroid(req) + PushToAndroid(*v) case core.PlatFormHuawei: - PushToHuawei(req) + PushToHuawei(*v) } } diff --git a/queue/nsq/nsq.go b/queue/nsq/nsq.go index 562ae5a..3cf446d 100644 --- a/queue/nsq/nsq.go +++ b/queue/nsq/nsq.go @@ -102,7 +102,7 @@ func (s *Worker) Run(quit chan struct{}) error { s.q.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error { wg.Add(1) defer wg.Done() - var notification gorush.PushNotification + var notification *gorush.PushNotification if err := json.Unmarshal(msg.Body, ¬ification); err != nil { return err } @@ -136,8 +136,8 @@ func (s *Worker) Usage() int { } // Queue send notification to queue -func (s *Worker) Queue(job interface{}) error { - v, ok := job.(gorush.PushNotification) +func (s *Worker) Queue(job queue.QueuedMessage) error { + v, ok := job.(*gorush.PushNotification) if !ok { return errors.New("wrong type of job") } diff --git a/queue/queue.go b/queue/queue.go index e9c0c5f..e3476ac 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -59,7 +59,7 @@ func (q *Queue) Wait() { } // Queue to queue all job -func (q *Queue) Queue(job interface{}) error { +func (q *Queue) Queue(job QueuedMessage) error { return q.worker.Queue(job) } diff --git a/queue/simple/simple.go b/queue/simple/simple.go index 76df70d..2c27499 100644 --- a/queue/simple/simple.go +++ b/queue/simple/simple.go @@ -17,7 +17,7 @@ var errMaxCapacity = errors.New("max capacity reached") // Worker for simple queue using channel type Worker struct { - queueNotification chan gorush.PushNotification + queueNotification chan queue.QueuedMessage } // BeforeRun run script before start worker @@ -56,9 +56,9 @@ func (s *Worker) Usage() int { } // Queue send notification to queue -func (s *Worker) Queue(job interface{}) error { +func (s *Worker) Queue(job queue.QueuedMessage) error { select { - case s.queueNotification <- job.(gorush.PushNotification): + case s.queueNotification <- job: return nil default: return errMaxCapacity @@ -68,14 +68,14 @@ func (s *Worker) Queue(job interface{}) error { // WithQueueNum setup the capcity of queue func WithQueueNum(num int) Option { return func(w *Worker) { - w.queueNotification = make(chan gorush.PushNotification, num) + w.queueNotification = make(chan queue.QueuedMessage, num) } } // NewWorker for struc func NewWorker(opts ...Option) *Worker { w := &Worker{ - queueNotification: make(chan gorush.PushNotification, runtime.NumCPU()<<1), + queueNotification: make(chan queue.QueuedMessage, runtime.NumCPU()<<1), } // Loop through each option diff --git a/queue/simple/simple_test.go b/queue/simple/simple_test.go index b78c0f9..335908e 100644 --- a/queue/simple/simple_test.go +++ b/queue/simple/simple_test.go @@ -14,7 +14,7 @@ func TestQueueUsage(t *testing.T) { assert.Equal(t, runtime.NumCPU()<<1, w.Capacity()) assert.Equal(t, 0, w.Usage()) - w.Queue(gorush.PushNotification{}) + w.Queue(&gorush.PushNotification{}) assert.Equal(t, 1, w.Usage()) } @@ -23,13 +23,13 @@ func TestMaxCapacity(t *testing.T) { assert.Equal(t, 2, w.Capacity()) assert.Equal(t, 0, w.Usage()) - assert.NoError(t, w.Queue(gorush.PushNotification{})) + assert.NoError(t, w.Queue(&gorush.PushNotification{})) assert.Equal(t, 1, w.Usage()) - assert.NoError(t, w.Queue(gorush.PushNotification{})) + assert.NoError(t, w.Queue(&gorush.PushNotification{})) assert.Equal(t, 2, w.Usage()) - assert.Error(t, w.Queue(gorush.PushNotification{})) + assert.Error(t, w.Queue(&gorush.PushNotification{})) assert.Equal(t, 2, w.Usage()) - err := w.Queue(gorush.PushNotification{}) + err := w.Queue(&gorush.PushNotification{}) assert.Equal(t, errMaxCapacity, err) } diff --git a/queue/worker.go b/queue/worker.go index 3c3534d..2dace56 100644 --- a/queue/worker.go +++ b/queue/worker.go @@ -7,7 +7,12 @@ type Worker interface { AfterRun() error Shutdown() error - Queue(job interface{}) error + Queue(job QueuedMessage) error Capacity() int Usage() int } + +// QueuedMessage ... +type QueuedMessage interface { + Bytes() []byte +} diff --git a/router/server.go b/router/server.go index c1cce9e..8caf229 100644 --- a/router/server.go +++ b/router/server.go @@ -283,7 +283,7 @@ func handleNotification(ctx context.Context, cfg config.ConfYaml, req gorush.Req notification.AddWaitCount() } - if err := q.Queue(*notification); err != nil { + if err := q.Queue(notification); err != nil { markFailedNotification(cfg, notification, "max capacity reached") } diff --git a/rpc/server.go b/rpc/server.go index 61acabc..af76fda 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -102,7 +102,7 @@ func (s *Server) Send(ctx context.Context, in *proto.NotificationRequest) (*prot } } - go gorush.SendNotification(notification) + go gorush.SendNotification(¬ification) return &proto.NotificationReply{ Success: true,