diff --git a/go.mod b/go.mod index d5b1e8c..7a82b9c 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/appleboy/gin-status-api v1.1.0 github.com/appleboy/go-fcm v0.1.5 github.com/appleboy/gofight/v2 v2.1.2 - github.com/appleboy/queue v0.0.3 + github.com/appleboy/queue v0.0.4-0.20210725000413-1085ecaca645 github.com/asdine/storm/v3 v3.2.1 github.com/buger/jsonparser v1.1.1 github.com/dgraph-io/badger/v3 v3.2103.1 @@ -21,7 +21,6 @@ require ( github.com/mattn/go-isatty v0.0.12 github.com/mitchellh/mapstructure v1.4.1 github.com/msalihkarakasli/go-hms-push v0.0.0-20200616114002-91cd23dfeed4 - github.com/nsqio/go-nsq v1.0.8 github.com/prometheus/client_golang v1.10.0 github.com/rs/zerolog v1.23.0 github.com/sideshow/apns2 v0.20.0 diff --git a/go.sum b/go.sum index e535aa2..44e6d78 100644 --- a/go.sum +++ b/go.sum @@ -40,8 +40,8 @@ github.com/appleboy/go-fcm v0.1.5 h1:fKbcZf/7vwGsvDkcop8a+kCHnK+tt4wXX0X7uEzwI6E github.com/appleboy/go-fcm v0.1.5/go.mod h1:MSxZ4LqGRsnywOjnlXJXMqbjZrG4vf+0oHitfC9HRH0= github.com/appleboy/gofight/v2 v2.1.2 h1:VOy3jow4vIK8BRQJoC/I9muxyYlJ2yb9ht2hZoS3rf4= github.com/appleboy/gofight/v2 v2.1.2/go.mod h1:frW+U1QZEdDgixycTj4CygQ48yLTUhplt43+Wczp3rw= -github.com/appleboy/queue v0.0.3 h1:rntqVTm6ilh80VCVQjwA0vDMCl1cfveq6GS6X98fKwE= -github.com/appleboy/queue v0.0.3/go.mod h1:6Mn0z4hURZW/26huvRXG0SJ4o7pBdo6hOryRiegy/4Q= +github.com/appleboy/queue v0.0.4-0.20210725000413-1085ecaca645 h1:RpjX4HMSLsrQsxsCQN2pAbPDZWOtpX3dqLnoK0+UzLY= +github.com/appleboy/queue v0.0.4-0.20210725000413-1085ecaca645/go.mod h1:6Mn0z4hURZW/26huvRXG0SJ4o7pBdo6hOryRiegy/4Q= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= diff --git a/main.go b/main.go index 5fec549..b802fd8 100644 --- a/main.go +++ b/main.go @@ -2,7 +2,6 @@ package main import ( "context" - "encoding/json" "flag" "fmt" "log" @@ -26,7 +25,6 @@ import ( "github.com/appleboy/queue" "github.com/appleboy/queue/nsq" "github.com/appleboy/queue/simple" - n "github.com/nsqio/go-nsq" "golang.org/x/sync/errgroup" ) @@ -324,10 +322,7 @@ func main() { case core.LocalQueue: w = simple.NewWorker( simple.WithQueueNum(int(cfg.Core.QueueNum)), - simple.WithRunFunc(func(msg queue.QueuedMessage) error { - notify.SendNotification(msg) - return nil - }), + simple.WithRunFunc(notify.Run), ) case core.NSQ: w = nsq.NewWorker( @@ -335,19 +330,7 @@ func main() { nsq.WithTopic(cfg.Queue.NSQ.Topic), nsq.WithChannel(cfg.Queue.NSQ.Channel), nsq.WithMaxInFlight(int(cfg.Core.WorkerNum)), - nsq.WithRunFunc(func(msg *n.Message) error { - if len(msg.Body) == 0 { - // Returning nil will automatically send a FIN command to NSQ to mark the message as processed. - // In this case, a message with an empty body is simply ignored/discarded. - return nil - } - var notification *notify.PushNotification - if err := json.Unmarshal(msg.Body, ¬ification); err != nil { - return err - } - notify.SendNotification(notification) - return nil - }), + nsq.WithRunFunc(notify.Run), ) default: logx.LogError.Fatalf("we don't support queue engine: %s", cfg.Queue.Engine) diff --git a/notify/notification.go b/notify/notification.go index 0540756..f912636 100644 --- a/notify/notification.go +++ b/notify/notification.go @@ -254,7 +254,12 @@ func CheckPushConf(cfg config.ConfYaml) error { // SendNotification send notification func SendNotification(req queue.QueuedMessage) { - v, _ := req.(*PushNotification) + v, ok := req.(*PushNotification) + if !ok { + if err := json.Unmarshal(req.Bytes(), v); err != nil { + return + } + } defer func() { v.WaitDone() @@ -269,3 +274,9 @@ func SendNotification(req queue.QueuedMessage) { PushToHuawei(*v) } } + +// Run send notification +var Run = func(msg queue.QueuedMessage) error { + SendNotification(msg) + return nil +}