chore(queue): support NSQ as backend. (#600)
This commit is contained in:
parent
c7112a6eb3
commit
2e2dd9b8d6
|
@ -146,6 +146,13 @@ huawei:
|
|||
appid: "YOUR_APP_ID"
|
||||
max_retry: 0 # resend fail notification, default value zero is disabled
|
||||
|
||||
queue:
|
||||
engine: "nsq" # support "local", "nsq", default value is "local"
|
||||
nsq:
|
||||
addr: 127.0.0.1:4150
|
||||
topic: gorush
|
||||
channel: ch
|
||||
|
||||
ios:
|
||||
enabled: false
|
||||
key_path: "key.pem"
|
||||
|
|
|
@ -62,6 +62,13 @@ huawei:
|
|||
appid: "YOUR_APP_ID"
|
||||
max_retry: 0 # resend fail notification, default value zero is disabled
|
||||
|
||||
queue:
|
||||
engine: "local" # support "local", "nsq", default value is "local"
|
||||
nsq:
|
||||
addr: 127.0.0.1:4150
|
||||
topic: gorush
|
||||
channel: ch
|
||||
|
||||
ios:
|
||||
enabled: false
|
||||
key_path: ""
|
||||
|
@ -106,6 +113,7 @@ type ConfYaml struct {
|
|||
Android SectionAndroid `yaml:"android"`
|
||||
Huawei SectionHuawei `yaml:"huawei"`
|
||||
Ios SectionIos `yaml:"ios"`
|
||||
Queue SectionQueue `yaml:"queue"`
|
||||
Log SectionLog `yaml:"log"`
|
||||
Stat SectionStat `yaml:"stat"`
|
||||
GRPC SectionGRPC `yaml:"grpc"`
|
||||
|
@ -201,6 +209,19 @@ type SectionStat struct {
|
|||
BadgerDB SectionBadgerDB `yaml:"badgerdb"`
|
||||
}
|
||||
|
||||
// SectionQueue is sub section of config.
|
||||
type SectionQueue struct {
|
||||
Engine string `yaml:"engine"`
|
||||
NSQ SectionNSQ `yaml:"nsq"`
|
||||
}
|
||||
|
||||
// SectionNSQ is sub section of config.
|
||||
type SectionNSQ struct {
|
||||
Addr string `yaml:"addr"`
|
||||
Topic string `yaml:"topic"`
|
||||
Channel string `yaml:"channel"`
|
||||
}
|
||||
|
||||
// SectionRedis is sub section of config.
|
||||
type SectionRedis struct {
|
||||
Addr string `yaml:"addr"`
|
||||
|
@ -341,6 +362,12 @@ func LoadConf(confPath ...string) (ConfYaml, error) {
|
|||
conf.Log.ErrorLevel = viper.GetString("log.error_level")
|
||||
conf.Log.HideToken = viper.GetBool("log.hide_token")
|
||||
|
||||
// Queue Engine
|
||||
conf.Queue.Engine = viper.GetString("queue.engine")
|
||||
conf.Queue.NSQ.Addr = viper.GetString("queue.nsq.addr")
|
||||
conf.Queue.NSQ.Topic = viper.GetString("queue.nsq.topic")
|
||||
conf.Queue.NSQ.Channel = viper.GetString("queue.nsq.channel")
|
||||
|
||||
// Stat Engine
|
||||
conf.Stat.Engine = viper.GetString("stat.engine")
|
||||
conf.Stat.Redis.Addr = viper.GetString("stat.redis.addr")
|
||||
|
|
|
@ -88,6 +88,12 @@ func (suite *ConfigTestSuite) TestValidateConfDefault() {
|
|||
assert.Equal(suite.T(), "", suite.ConfGorushDefault.Ios.KeyID)
|
||||
assert.Equal(suite.T(), "", suite.ConfGorushDefault.Ios.TeamID)
|
||||
|
||||
// queue
|
||||
assert.Equal(suite.T(), "local", suite.ConfGorushDefault.Queue.Engine)
|
||||
assert.Equal(suite.T(), "127.0.0.1:4150", suite.ConfGorushDefault.Queue.NSQ.Addr)
|
||||
assert.Equal(suite.T(), "gorush", suite.ConfGorushDefault.Queue.NSQ.Topic)
|
||||
assert.Equal(suite.T(), "ch", suite.ConfGorushDefault.Queue.NSQ.Channel)
|
||||
|
||||
// log
|
||||
assert.Equal(suite.T(), "string", suite.ConfGorushDefault.Log.Format)
|
||||
assert.Equal(suite.T(), "stdout", suite.ConfGorushDefault.Log.AccessLog)
|
||||
|
|
|
@ -49,6 +49,13 @@ huawei:
|
|||
appid: "YOUR_APP_ID"
|
||||
max_retry: 0 # resend fail notification, default value zero is disabled
|
||||
|
||||
queue:
|
||||
engine: "nsq" # support "local", "nsq", default value is "local"
|
||||
nsq:
|
||||
addr: 127.0.0.1:4150
|
||||
topic: gorush
|
||||
channel: ch
|
||||
|
||||
ios:
|
||||
enabled: false
|
||||
key_path: "key.pem"
|
||||
|
|
|
@ -0,0 +1,18 @@
|
|||
package core
|
||||
|
||||
// Queue as backend
|
||||
type Queue string
|
||||
|
||||
var (
|
||||
// LocalQueue for channel in Go
|
||||
LocalQueue Queue = "local"
|
||||
// NSQ a realtime distributed messaging platform
|
||||
NSQ Queue = "nsq"
|
||||
// NATS Connective Technology for Adaptive Edge & Distributed Systems
|
||||
NATS Queue = "nats"
|
||||
)
|
||||
|
||||
// IsLocalQueue check is Local Queue
|
||||
func IsLocalQueue(q Queue) bool {
|
||||
return q == LocalQueue
|
||||
}
|
2
go.mod
2
go.mod
|
@ -13,12 +13,14 @@ require (
|
|||
github.com/gin-contrib/logger v0.2.0
|
||||
github.com/gin-gonic/gin v1.7.2
|
||||
github.com/go-redis/redis/v7 v7.4.0
|
||||
github.com/golang/glog v0.0.0-20210429001901-424d2337a529 // indirect
|
||||
github.com/golang/protobuf v1.5.1
|
||||
github.com/google/flatbuffers v2.0.0+incompatible // indirect
|
||||
github.com/json-iterator/go v1.1.10
|
||||
github.com/mattn/go-isatty v0.0.12
|
||||
github.com/mitchellh/mapstructure v1.4.1
|
||||
github.com/msalihkarakasli/go-hms-push v0.0.0-20200616114002-91cd23dfeed4
|
||||
github.com/nsqio/go-nsq v1.0.8
|
||||
github.com/prometheus/client_golang v1.10.0
|
||||
github.com/rs/zerolog v1.23.0
|
||||
github.com/sideshow/apns2 v0.20.0
|
||||
|
|
5
go.sum
5
go.sum
|
@ -154,8 +154,9 @@ github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a
|
|||
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
|
||||
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
|
||||
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
|
||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
|
||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
|
||||
github.com/golang/glog v0.0.0-20210429001901-424d2337a529 h1:2voWjNECnrZRbfwXxHB1/j8wa6xdKn85B5NzgVL/pTU=
|
||||
github.com/golang/glog v0.0.0-20210429001901-424d2337a529/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
|
||||
github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
|
||||
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
|
||||
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 h1:ZgQEtGgCBiWRM39fZuwSd1LwSqqSW0hOdXCYYDX0R3I=
|
||||
|
@ -318,6 +319,8 @@ github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzE
|
|||
github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
|
||||
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
|
||||
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
|
||||
github.com/nsqio/go-nsq v1.0.8 h1:3L2F8tNLlwXXlp2slDUrUWSBn2O3nMh8R1/KEDFTHPk=
|
||||
github.com/nsqio/go-nsq v1.0.8/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
|
||||
github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtbWGs=
|
||||
github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA=
|
||||
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
|
||||
|
|
|
@ -391,6 +391,10 @@ func getApnsClient(cfg config.ConfYaml, req PushNotification) (client *apns2.Cli
|
|||
func PushToIOS(req PushNotification) {
|
||||
logx.LogAccess.Debug("Start push notification for iOS")
|
||||
|
||||
if req.Cfg.Core.Sync && !core.IsLocalQueue(core.Queue(req.Cfg.Queue.Engine)) {
|
||||
req.Cfg.Core.Sync = false
|
||||
}
|
||||
|
||||
var (
|
||||
retryCount = 0
|
||||
maxRetry = req.Cfg.Ios.MaxRetry
|
||||
|
|
|
@ -109,6 +109,10 @@ func GetAndroidNotification(req PushNotification) *fcm.Message {
|
|||
func PushToAndroid(req PushNotification) {
|
||||
logx.LogAccess.Debug("Start push notification for Android")
|
||||
|
||||
if req.Cfg.Core.Sync && !core.IsLocalQueue(core.Queue(req.Cfg.Queue.Engine)) {
|
||||
req.Cfg.Core.Sync = false
|
||||
}
|
||||
|
||||
var (
|
||||
client *fcm.Client
|
||||
retryCount = 0
|
||||
|
|
|
@ -6,24 +6,25 @@ import (
|
|||
"sync"
|
||||
|
||||
"github.com/appleboy/gorush/config"
|
||||
"github.com/appleboy/gorush/core"
|
||||
"github.com/appleboy/gorush/logx"
|
||||
"github.com/appleboy/gorush/status"
|
||||
|
||||
c "github.com/msalihkarakasli/go-hms-push/push/config"
|
||||
"github.com/msalihkarakasli/go-hms-push/push/core"
|
||||
client "github.com/msalihkarakasli/go-hms-push/push/core"
|
||||
"github.com/msalihkarakasli/go-hms-push/push/model"
|
||||
)
|
||||
|
||||
var (
|
||||
pushError error
|
||||
pushClient *core.HMSClient
|
||||
pushClient *client.HMSClient
|
||||
once sync.Once
|
||||
)
|
||||
|
||||
// GetPushClient use for create HMS Push
|
||||
func GetPushClient(conf *c.Config) (*core.HMSClient, error) {
|
||||
func GetPushClient(conf *c.Config) (*client.HMSClient, error) {
|
||||
once.Do(func() {
|
||||
client, err := core.NewHttpClient(conf)
|
||||
client, err := client.NewHttpClient(conf)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -35,7 +36,7 @@ func GetPushClient(conf *c.Config) (*core.HMSClient, error) {
|
|||
}
|
||||
|
||||
// InitHMSClient use for initialize HMS Client.
|
||||
func InitHMSClient(cfg config.ConfYaml, appSecret, appID string) (*core.HMSClient, error) {
|
||||
func InitHMSClient(cfg config.ConfYaml, appSecret, appID string) (*client.HMSClient, error) {
|
||||
if appSecret == "" {
|
||||
return nil, errors.New("Missing Huawei App Secret")
|
||||
}
|
||||
|
@ -167,12 +168,15 @@ func GetHuaweiNotification(req PushNotification) (*model.MessageRequest, error)
|
|||
// PushToHuawei provide send notification to Android server.
|
||||
func PushToHuawei(req PushNotification) bool {
|
||||
logx.LogAccess.Debug("Start push notification for Huawei")
|
||||
cfg := req.Cfg
|
||||
|
||||
if req.Cfg.Core.Sync && !core.IsLocalQueue(core.Queue(req.Cfg.Queue.Engine)) {
|
||||
req.Cfg.Core.Sync = false
|
||||
}
|
||||
|
||||
var (
|
||||
client *core.HMSClient
|
||||
client *client.HMSClient
|
||||
retryCount = 0
|
||||
maxRetry = cfg.Huawei.MaxRetry
|
||||
maxRetry = req.Cfg.Huawei.MaxRetry
|
||||
)
|
||||
|
||||
if req.Retry > 0 && req.Retry < maxRetry {
|
||||
|
@ -191,7 +195,7 @@ Retry:
|
|||
|
||||
notification, _ := GetHuaweiNotification(req)
|
||||
|
||||
client, err = InitHMSClient(cfg, cfg.Huawei.AppSecret, cfg.Huawei.AppID)
|
||||
client, err = InitHMSClient(req.Cfg, req.Cfg.Huawei.AppSecret, req.Cfg.Huawei.AppID)
|
||||
|
||||
if err != nil {
|
||||
// HMS server error
|
||||
|
|
12
main.go
12
main.go
|
@ -19,6 +19,7 @@ import (
|
|||
"github.com/appleboy/gorush/gorush"
|
||||
"github.com/appleboy/gorush/logx"
|
||||
"github.com/appleboy/gorush/queue"
|
||||
"github.com/appleboy/gorush/queue/nsq"
|
||||
"github.com/appleboy/gorush/queue/simple"
|
||||
"github.com/appleboy/gorush/router"
|
||||
"github.com/appleboy/gorush/rpc"
|
||||
|
@ -316,7 +317,16 @@ func main() {
|
|||
logx.LogError.Fatal(err)
|
||||
}
|
||||
|
||||
w := simple.NewWorker(simple.WithQueueNum(int(cfg.Core.QueueNum)))
|
||||
var w queue.Worker
|
||||
switch core.Queue(cfg.Queue.Engine) {
|
||||
case core.LocalQueue:
|
||||
w = simple.NewWorker(simple.WithQueueNum(int(cfg.Core.QueueNum)))
|
||||
case core.NSQ:
|
||||
w = nsq.NewWorker()
|
||||
default:
|
||||
logx.LogError.Fatalf("we don't support queue engine: %s", cfg.Queue.Engine)
|
||||
}
|
||||
|
||||
q := queue.NewQueue(w, int(cfg.Core.WorkerNum))
|
||||
q.Start()
|
||||
|
||||
|
|
|
@ -0,0 +1,145 @@
|
|||
package nsq
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/appleboy/gorush/gorush"
|
||||
"github.com/appleboy/gorush/queue"
|
||||
|
||||
"github.com/nsqio/go-nsq"
|
||||
)
|
||||
|
||||
var _ queue.Worker = (*Worker)(nil)
|
||||
|
||||
// Option for queue system
|
||||
type Option func(*Worker)
|
||||
|
||||
// Worker for NSQ
|
||||
type Worker struct {
|
||||
q *nsq.Consumer
|
||||
p *nsq.Producer
|
||||
once sync.Once
|
||||
addr string
|
||||
topic string
|
||||
channel string
|
||||
}
|
||||
|
||||
// WithAddr setup the addr of NSQ
|
||||
func WithAddr(addr string) Option {
|
||||
return func(w *Worker) {
|
||||
w.addr = addr
|
||||
}
|
||||
}
|
||||
|
||||
// WithTopic setup the topic of NSQ
|
||||
func WithTopic(topic string) Option {
|
||||
return func(w *Worker) {
|
||||
w.topic = topic
|
||||
}
|
||||
}
|
||||
|
||||
// WithChannel setup the channel of NSQ
|
||||
func WithChannel(channel string) Option {
|
||||
return func(w *Worker) {
|
||||
w.channel = channel
|
||||
}
|
||||
}
|
||||
|
||||
// NewWorker for struc
|
||||
func NewWorker(opts ...Option) *Worker {
|
||||
w := &Worker{
|
||||
addr: "127.0.0.1:4150",
|
||||
topic: "gorush",
|
||||
channel: "ch",
|
||||
}
|
||||
|
||||
// Loop through each option
|
||||
for _, opt := range opts {
|
||||
// Call the option giving the instantiated
|
||||
opt(w)
|
||||
}
|
||||
|
||||
cfg := nsq.NewConfig()
|
||||
q, err := nsq.NewConsumer(w.topic, w.channel, cfg)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
w.q = q
|
||||
|
||||
p, err := nsq.NewProducer(w.addr, cfg)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
w.p = p
|
||||
|
||||
return w
|
||||
}
|
||||
|
||||
// BeforeRun run script before start worker
|
||||
func (s *Worker) BeforeRun() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// AfterRun run script after start worker
|
||||
func (s *Worker) AfterRun() error {
|
||||
s.once.Do(func() {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
err := s.q.ConnectToNSQLookupd(s.addr)
|
||||
if err != nil {
|
||||
panic("Could not connect nsq server: " + err.Error())
|
||||
}
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Run start the worker
|
||||
func (s *Worker) Run(quit chan struct{}) error {
|
||||
s.q.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error {
|
||||
var notification gorush.PushNotification
|
||||
if err := json.Unmarshal(msg.Body, ¬ification); err != nil {
|
||||
return err
|
||||
}
|
||||
gorush.SendNotification(notification)
|
||||
return nil
|
||||
}))
|
||||
|
||||
select {
|
||||
case <-quit:
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Shutdown worker
|
||||
func (s *Worker) Shutdown() error {
|
||||
s.q.Stop()
|
||||
s.p.Stop()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Capacity for channel
|
||||
func (s *Worker) Capacity() int {
|
||||
return 0
|
||||
}
|
||||
|
||||
// Usage for count of channel usage
|
||||
func (s *Worker) Usage() int {
|
||||
return 0
|
||||
}
|
||||
|
||||
// Queue send notification to queue
|
||||
func (s *Worker) Queue(job interface{}) error {
|
||||
v, ok := job.(gorush.PushNotification)
|
||||
if !ok {
|
||||
return errors.New("wrong type of job")
|
||||
}
|
||||
err := s.p.Publish(s.topic, v.Bytes())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -64,6 +64,7 @@ func (q *Queue) Queue(job interface{}) error {
|
|||
}
|
||||
|
||||
func (q *Queue) work(num int) {
|
||||
q.worker.BeforeRun()
|
||||
q.routineGroup.Run(func() {
|
||||
// to handle panic cases from inside the worker
|
||||
// in such case, we start a new goroutine
|
||||
|
@ -78,6 +79,7 @@ func (q *Queue) work(num int) {
|
|||
q.worker.Run(q.quit)
|
||||
logx.LogAccess.Info("closed the worker num ", num)
|
||||
})
|
||||
q.worker.AfterRun()
|
||||
}
|
||||
|
||||
func (q *Queue) startWorker() {
|
||||
|
|
|
@ -20,6 +20,16 @@ type Worker struct {
|
|||
queueNotification chan gorush.PushNotification
|
||||
}
|
||||
|
||||
// BeforeRun run script before start worker
|
||||
func (s *Worker) BeforeRun() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// AfterRun run script after start worker
|
||||
func (s *Worker) AfterRun() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Run start the worker
|
||||
func (s *Worker) Run(_ chan struct{}) error {
|
||||
for notification := range s.queueNotification {
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"testing"
|
||||
|
||||
"github.com/appleboy/gorush/gorush"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
|
|
|
@ -2,7 +2,10 @@ package queue
|
|||
|
||||
// Worker interface
|
||||
type Worker interface {
|
||||
BeforeRun() error
|
||||
Run(chan struct{}) error
|
||||
AfterRun() error
|
||||
|
||||
Shutdown() error
|
||||
Queue(job interface{}) error
|
||||
Capacity() int
|
||||
|
|
|
@ -250,6 +250,11 @@ func handleNotification(ctx context.Context, cfg config.ConfYaml, req gorush.Req
|
|||
var count int
|
||||
wg := sync.WaitGroup{}
|
||||
newNotification := []*gorush.PushNotification{}
|
||||
|
||||
if cfg.Core.Sync && !core.IsLocalQueue(core.Queue(cfg.Queue.Engine)) {
|
||||
cfg.Core.Sync = false
|
||||
}
|
||||
|
||||
for i := range req.Notifications {
|
||||
notification := &req.Notifications[i]
|
||||
switch notification.Platform {
|
||||
|
|
|
@ -17,7 +17,7 @@ import (
|
|||
)
|
||||
|
||||
// Stats provide response time, status code count, etc.
|
||||
var Stats = stats.New()
|
||||
var Stats *stats.Stats
|
||||
|
||||
// StatStorage implements the storage interface
|
||||
var StatStorage storage.Storage
|
||||
|
@ -78,5 +78,7 @@ func InitAppStatus(conf config.ConfYaml) error {
|
|||
return err
|
||||
}
|
||||
|
||||
Stats = stats.New()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue