diff --git a/README.md b/README.md index 3176342..a5f827b 100644 --- a/README.md +++ b/README.md @@ -146,6 +146,13 @@ huawei: appid: "YOUR_APP_ID" max_retry: 0 # resend fail notification, default value zero is disabled +queue: + engine: "nsq" # support "local", "nsq", default value is "local" + nsq: + addr: 127.0.0.1:4150 + topic: gorush + channel: ch + ios: enabled: false key_path: "key.pem" diff --git a/config/config.go b/config/config.go index 5d3a9a9..0e6b752 100644 --- a/config/config.go +++ b/config/config.go @@ -62,6 +62,13 @@ huawei: appid: "YOUR_APP_ID" max_retry: 0 # resend fail notification, default value zero is disabled +queue: + engine: "local" # support "local", "nsq", default value is "local" + nsq: + addr: 127.0.0.1:4150 + topic: gorush + channel: ch + ios: enabled: false key_path: "" @@ -106,6 +113,7 @@ type ConfYaml struct { Android SectionAndroid `yaml:"android"` Huawei SectionHuawei `yaml:"huawei"` Ios SectionIos `yaml:"ios"` + Queue SectionQueue `yaml:"queue"` Log SectionLog `yaml:"log"` Stat SectionStat `yaml:"stat"` GRPC SectionGRPC `yaml:"grpc"` @@ -201,6 +209,19 @@ type SectionStat struct { BadgerDB SectionBadgerDB `yaml:"badgerdb"` } +// SectionQueue is sub section of config. +type SectionQueue struct { + Engine string `yaml:"engine"` + NSQ SectionNSQ `yaml:"nsq"` +} + +// SectionNSQ is sub section of config. +type SectionNSQ struct { + Addr string `yaml:"addr"` + Topic string `yaml:"topic"` + Channel string `yaml:"channel"` +} + // SectionRedis is sub section of config. type SectionRedis struct { Addr string `yaml:"addr"` @@ -341,6 +362,12 @@ func LoadConf(confPath ...string) (ConfYaml, error) { conf.Log.ErrorLevel = viper.GetString("log.error_level") conf.Log.HideToken = viper.GetBool("log.hide_token") + // Queue Engine + conf.Queue.Engine = viper.GetString("queue.engine") + conf.Queue.NSQ.Addr = viper.GetString("queue.nsq.addr") + conf.Queue.NSQ.Topic = viper.GetString("queue.nsq.topic") + conf.Queue.NSQ.Channel = viper.GetString("queue.nsq.channel") + // Stat Engine conf.Stat.Engine = viper.GetString("stat.engine") conf.Stat.Redis.Addr = viper.GetString("stat.redis.addr") diff --git a/config/config_test.go b/config/config_test.go index 5405040..251393e 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -88,6 +88,12 @@ func (suite *ConfigTestSuite) TestValidateConfDefault() { assert.Equal(suite.T(), "", suite.ConfGorushDefault.Ios.KeyID) assert.Equal(suite.T(), "", suite.ConfGorushDefault.Ios.TeamID) + // queue + assert.Equal(suite.T(), "local", suite.ConfGorushDefault.Queue.Engine) + assert.Equal(suite.T(), "127.0.0.1:4150", suite.ConfGorushDefault.Queue.NSQ.Addr) + assert.Equal(suite.T(), "gorush", suite.ConfGorushDefault.Queue.NSQ.Topic) + assert.Equal(suite.T(), "ch", suite.ConfGorushDefault.Queue.NSQ.Channel) + // log assert.Equal(suite.T(), "string", suite.ConfGorushDefault.Log.Format) assert.Equal(suite.T(), "stdout", suite.ConfGorushDefault.Log.AccessLog) diff --git a/config/testdata/config.yml b/config/testdata/config.yml index 1b04997..293a6f5 100644 --- a/config/testdata/config.yml +++ b/config/testdata/config.yml @@ -49,6 +49,13 @@ huawei: appid: "YOUR_APP_ID" max_retry: 0 # resend fail notification, default value zero is disabled +queue: + engine: "nsq" # support "local", "nsq", default value is "local" + nsq: + addr: 127.0.0.1:4150 + topic: gorush + channel: ch + ios: enabled: false key_path: "key.pem" diff --git a/core/queue.go b/core/queue.go new file mode 100644 index 0000000..2e1efab --- /dev/null +++ b/core/queue.go @@ -0,0 +1,18 @@ +package core + +// Queue as backend +type Queue string + +var ( + // LocalQueue for channel in Go + LocalQueue Queue = "local" + // NSQ a realtime distributed messaging platform + NSQ Queue = "nsq" + // NATS Connective Technology for Adaptive Edge & Distributed Systems + NATS Queue = "nats" +) + +// IsLocalQueue check is Local Queue +func IsLocalQueue(q Queue) bool { + return q == LocalQueue +} diff --git a/go.mod b/go.mod index 51e53f1..dd72e49 100644 --- a/go.mod +++ b/go.mod @@ -13,12 +13,14 @@ require ( github.com/gin-contrib/logger v0.2.0 github.com/gin-gonic/gin v1.7.2 github.com/go-redis/redis/v7 v7.4.0 + github.com/golang/glog v0.0.0-20210429001901-424d2337a529 // indirect github.com/golang/protobuf v1.5.1 github.com/google/flatbuffers v2.0.0+incompatible // indirect github.com/json-iterator/go v1.1.10 github.com/mattn/go-isatty v0.0.12 github.com/mitchellh/mapstructure v1.4.1 github.com/msalihkarakasli/go-hms-push v0.0.0-20200616114002-91cd23dfeed4 + github.com/nsqio/go-nsq v1.0.8 github.com/prometheus/client_golang v1.10.0 github.com/rs/zerolog v1.23.0 github.com/sideshow/apns2 v0.20.0 diff --git a/go.sum b/go.sum index 5bd0cce..4b7e037 100644 --- a/go.sum +++ b/go.sum @@ -154,8 +154,9 @@ github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= -github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/glog v0.0.0-20210429001901-424d2337a529 h1:2voWjNECnrZRbfwXxHB1/j8wa6xdKn85B5NzgVL/pTU= +github.com/golang/glog v0.0.0-20210429001901-424d2337a529/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 h1:ZgQEtGgCBiWRM39fZuwSd1LwSqqSW0hOdXCYYDX0R3I= @@ -318,6 +319,8 @@ github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzE github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/nsqio/go-nsq v1.0.8 h1:3L2F8tNLlwXXlp2slDUrUWSBn2O3nMh8R1/KEDFTHPk= +github.com/nsqio/go-nsq v1.0.8/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY= github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtbWGs= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= diff --git a/gorush/notification_apns.go b/gorush/notification_apns.go index 049cb9b..507db21 100644 --- a/gorush/notification_apns.go +++ b/gorush/notification_apns.go @@ -391,6 +391,10 @@ func getApnsClient(cfg config.ConfYaml, req PushNotification) (client *apns2.Cli func PushToIOS(req PushNotification) { logx.LogAccess.Debug("Start push notification for iOS") + if req.Cfg.Core.Sync && !core.IsLocalQueue(core.Queue(req.Cfg.Queue.Engine)) { + req.Cfg.Core.Sync = false + } + var ( retryCount = 0 maxRetry = req.Cfg.Ios.MaxRetry diff --git a/gorush/notification_fcm.go b/gorush/notification_fcm.go index 97db86c..22b45ef 100644 --- a/gorush/notification_fcm.go +++ b/gorush/notification_fcm.go @@ -109,6 +109,10 @@ func GetAndroidNotification(req PushNotification) *fcm.Message { func PushToAndroid(req PushNotification) { logx.LogAccess.Debug("Start push notification for Android") + if req.Cfg.Core.Sync && !core.IsLocalQueue(core.Queue(req.Cfg.Queue.Engine)) { + req.Cfg.Core.Sync = false + } + var ( client *fcm.Client retryCount = 0 diff --git a/gorush/notification_hms.go b/gorush/notification_hms.go index c4f315d..7ce95ed 100644 --- a/gorush/notification_hms.go +++ b/gorush/notification_hms.go @@ -6,24 +6,25 @@ import ( "sync" "github.com/appleboy/gorush/config" + "github.com/appleboy/gorush/core" "github.com/appleboy/gorush/logx" "github.com/appleboy/gorush/status" c "github.com/msalihkarakasli/go-hms-push/push/config" - "github.com/msalihkarakasli/go-hms-push/push/core" + client "github.com/msalihkarakasli/go-hms-push/push/core" "github.com/msalihkarakasli/go-hms-push/push/model" ) var ( pushError error - pushClient *core.HMSClient + pushClient *client.HMSClient once sync.Once ) // GetPushClient use for create HMS Push -func GetPushClient(conf *c.Config) (*core.HMSClient, error) { +func GetPushClient(conf *c.Config) (*client.HMSClient, error) { once.Do(func() { - client, err := core.NewHttpClient(conf) + client, err := client.NewHttpClient(conf) if err != nil { panic(err) } @@ -35,7 +36,7 @@ func GetPushClient(conf *c.Config) (*core.HMSClient, error) { } // InitHMSClient use for initialize HMS Client. -func InitHMSClient(cfg config.ConfYaml, appSecret, appID string) (*core.HMSClient, error) { +func InitHMSClient(cfg config.ConfYaml, appSecret, appID string) (*client.HMSClient, error) { if appSecret == "" { return nil, errors.New("Missing Huawei App Secret") } @@ -167,12 +168,15 @@ func GetHuaweiNotification(req PushNotification) (*model.MessageRequest, error) // PushToHuawei provide send notification to Android server. func PushToHuawei(req PushNotification) bool { logx.LogAccess.Debug("Start push notification for Huawei") - cfg := req.Cfg + + if req.Cfg.Core.Sync && !core.IsLocalQueue(core.Queue(req.Cfg.Queue.Engine)) { + req.Cfg.Core.Sync = false + } var ( - client *core.HMSClient + client *client.HMSClient retryCount = 0 - maxRetry = cfg.Huawei.MaxRetry + maxRetry = req.Cfg.Huawei.MaxRetry ) if req.Retry > 0 && req.Retry < maxRetry { @@ -191,7 +195,7 @@ Retry: notification, _ := GetHuaweiNotification(req) - client, err = InitHMSClient(cfg, cfg.Huawei.AppSecret, cfg.Huawei.AppID) + client, err = InitHMSClient(req.Cfg, req.Cfg.Huawei.AppSecret, req.Cfg.Huawei.AppID) if err != nil { // HMS server error diff --git a/main.go b/main.go index a2c5a94..6bd06fc 100644 --- a/main.go +++ b/main.go @@ -19,6 +19,7 @@ import ( "github.com/appleboy/gorush/gorush" "github.com/appleboy/gorush/logx" "github.com/appleboy/gorush/queue" + "github.com/appleboy/gorush/queue/nsq" "github.com/appleboy/gorush/queue/simple" "github.com/appleboy/gorush/router" "github.com/appleboy/gorush/rpc" @@ -316,7 +317,16 @@ func main() { logx.LogError.Fatal(err) } - w := simple.NewWorker(simple.WithQueueNum(int(cfg.Core.QueueNum))) + var w queue.Worker + switch core.Queue(cfg.Queue.Engine) { + case core.LocalQueue: + w = simple.NewWorker(simple.WithQueueNum(int(cfg.Core.QueueNum))) + case core.NSQ: + w = nsq.NewWorker() + default: + logx.LogError.Fatalf("we don't support queue engine: %s", cfg.Queue.Engine) + } + q := queue.NewQueue(w, int(cfg.Core.WorkerNum)) q.Start() diff --git a/queue/nsq/nsq.go b/queue/nsq/nsq.go new file mode 100644 index 0000000..b6c7fbe --- /dev/null +++ b/queue/nsq/nsq.go @@ -0,0 +1,145 @@ +package nsq + +import ( + "encoding/json" + "errors" + "sync" + "time" + + "github.com/appleboy/gorush/gorush" + "github.com/appleboy/gorush/queue" + + "github.com/nsqio/go-nsq" +) + +var _ queue.Worker = (*Worker)(nil) + +// Option for queue system +type Option func(*Worker) + +// Worker for NSQ +type Worker struct { + q *nsq.Consumer + p *nsq.Producer + once sync.Once + addr string + topic string + channel string +} + +// WithAddr setup the addr of NSQ +func WithAddr(addr string) Option { + return func(w *Worker) { + w.addr = addr + } +} + +// WithTopic setup the topic of NSQ +func WithTopic(topic string) Option { + return func(w *Worker) { + w.topic = topic + } +} + +// WithChannel setup the channel of NSQ +func WithChannel(channel string) Option { + return func(w *Worker) { + w.channel = channel + } +} + +// NewWorker for struc +func NewWorker(opts ...Option) *Worker { + w := &Worker{ + addr: "127.0.0.1:4150", + topic: "gorush", + channel: "ch", + } + + // Loop through each option + for _, opt := range opts { + // Call the option giving the instantiated + opt(w) + } + + cfg := nsq.NewConfig() + q, err := nsq.NewConsumer(w.topic, w.channel, cfg) + if err != nil { + panic(err) + } + w.q = q + + p, err := nsq.NewProducer(w.addr, cfg) + if err != nil { + panic(err) + } + w.p = p + + return w +} + +// BeforeRun run script before start worker +func (s *Worker) BeforeRun() error { + return nil +} + +// AfterRun run script after start worker +func (s *Worker) AfterRun() error { + s.once.Do(func() { + time.Sleep(100 * time.Millisecond) + err := s.q.ConnectToNSQLookupd(s.addr) + if err != nil { + panic("Could not connect nsq server: " + err.Error()) + } + }) + + return nil +} + +// Run start the worker +func (s *Worker) Run(quit chan struct{}) error { + s.q.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error { + var notification gorush.PushNotification + if err := json.Unmarshal(msg.Body, ¬ification); err != nil { + return err + } + gorush.SendNotification(notification) + return nil + })) + + select { + case <-quit: + } + return nil +} + +// Shutdown worker +func (s *Worker) Shutdown() error { + s.q.Stop() + s.p.Stop() + return nil +} + +// Capacity for channel +func (s *Worker) Capacity() int { + return 0 +} + +// Usage for count of channel usage +func (s *Worker) Usage() int { + return 0 +} + +// Queue send notification to queue +func (s *Worker) Queue(job interface{}) error { + v, ok := job.(gorush.PushNotification) + if !ok { + return errors.New("wrong type of job") + } + err := s.p.Publish(s.topic, v.Bytes()) + if err != nil { + return err + } + + return nil +} diff --git a/queue/queue.go b/queue/queue.go index 38600cd..4a7c432 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -64,6 +64,7 @@ func (q *Queue) Queue(job interface{}) error { } func (q *Queue) work(num int) { + q.worker.BeforeRun() q.routineGroup.Run(func() { // to handle panic cases from inside the worker // in such case, we start a new goroutine @@ -78,6 +79,7 @@ func (q *Queue) work(num int) { q.worker.Run(q.quit) logx.LogAccess.Info("closed the worker num ", num) }) + q.worker.AfterRun() } func (q *Queue) startWorker() { diff --git a/queue/simple/simple.go b/queue/simple/simple.go index ea8d611..76df70d 100644 --- a/queue/simple/simple.go +++ b/queue/simple/simple.go @@ -20,6 +20,16 @@ type Worker struct { queueNotification chan gorush.PushNotification } +// BeforeRun run script before start worker +func (s *Worker) BeforeRun() error { + return nil +} + +// AfterRun run script after start worker +func (s *Worker) AfterRun() error { + return nil +} + // Run start the worker func (s *Worker) Run(_ chan struct{}) error { for notification := range s.queueNotification { diff --git a/queue/simple/simple_test.go b/queue/simple/simple_test.go index c739ae8..b78c0f9 100644 --- a/queue/simple/simple_test.go +++ b/queue/simple/simple_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/appleboy/gorush/gorush" + "github.com/stretchr/testify/assert" ) diff --git a/queue/worker.go b/queue/worker.go index e32d3d9..3c3534d 100644 --- a/queue/worker.go +++ b/queue/worker.go @@ -2,7 +2,10 @@ package queue // Worker interface type Worker interface { + BeforeRun() error Run(chan struct{}) error + AfterRun() error + Shutdown() error Queue(job interface{}) error Capacity() int diff --git a/router/server.go b/router/server.go index c28e804..c1cce9e 100644 --- a/router/server.go +++ b/router/server.go @@ -250,6 +250,11 @@ func handleNotification(ctx context.Context, cfg config.ConfYaml, req gorush.Req var count int wg := sync.WaitGroup{} newNotification := []*gorush.PushNotification{} + + if cfg.Core.Sync && !core.IsLocalQueue(core.Queue(cfg.Queue.Engine)) { + cfg.Core.Sync = false + } + for i := range req.Notifications { notification := &req.Notifications[i] switch notification.Platform { diff --git a/status/status.go b/status/status.go index 518d6c7..061edd5 100644 --- a/status/status.go +++ b/status/status.go @@ -17,7 +17,7 @@ import ( ) // Stats provide response time, status code count, etc. -var Stats = stats.New() +var Stats *stats.Stats // StatStorage implements the storage interface var StatStorage storage.Storage @@ -78,5 +78,7 @@ func InitAppStatus(conf config.ConfYaml) error { return err } + Stats = stats.New() + return nil }