chore(queue): Support NATS as backend (#618)
Signed-off-by: Bo-Yi Wu <appleboy.tw@gmail.com>
This commit is contained in:
parent
8d03ac5f1e
commit
c51b4b2b65
|
@ -90,6 +90,7 @@ A push notification micro server using [Gin](https://github.com/gin-gonic/gin) f
|
||||||
- Support send notification through [RPC](https://en.wikipedia.org/wiki/Remote_procedure_call) protocol, we use [gRPC](https://grpc.io/) as default framework.
|
- Support send notification through [RPC](https://en.wikipedia.org/wiki/Remote_procedure_call) protocol, we use [gRPC](https://grpc.io/) as default framework.
|
||||||
- Support running in Docker, [Kubernetes](https://kubernetes.io/) or [AWS Lambda](https://aws.amazon.com/lambda) ([Native Support in Golang](https://aws.amazon.com/blogs/compute/announcing-go-support-for-aws-lambda/))
|
- Support running in Docker, [Kubernetes](https://kubernetes.io/) or [AWS Lambda](https://aws.amazon.com/lambda) ([Native Support in Golang](https://aws.amazon.com/blogs/compute/announcing-go-support-for-aws-lambda/))
|
||||||
- Support graceful shutdown that workers and queue have been sent to APNs/FCM before shutdown service.
|
- Support graceful shutdown that workers and queue have been sent to APNs/FCM before shutdown service.
|
||||||
|
- Support different Queue as backend like [NSQ](https://nsq.io/) or [NATS](https://nats.io/), defaut engine is local [Channel](https://tour.golang.org/concurrency/2).
|
||||||
|
|
||||||
See the default [YAML config example](config/testdata/config.yml):
|
See the default [YAML config example](config/testdata/config.yml):
|
||||||
|
|
||||||
|
@ -147,11 +148,15 @@ huawei:
|
||||||
max_retry: 0 # resend fail notification, default value zero is disabled
|
max_retry: 0 # resend fail notification, default value zero is disabled
|
||||||
|
|
||||||
queue:
|
queue:
|
||||||
engine: "local" # support "local", "nsq", default value is "local"
|
engine: "local" # support "local", "nsq" and "nats " default value is "local"
|
||||||
nsq:
|
nsq:
|
||||||
addr: 127.0.0.1:4150
|
addr: 127.0.0.1:4150
|
||||||
topic: gorush
|
topic: gorush
|
||||||
channel: ch
|
channel: gorush
|
||||||
|
nats:
|
||||||
|
addr: 127.0.0.1:4222
|
||||||
|
subj: gorush
|
||||||
|
queue: gorush
|
||||||
|
|
||||||
ios:
|
ios:
|
||||||
enabled: false
|
enabled: false
|
||||||
|
|
|
@ -67,7 +67,11 @@ queue:
|
||||||
nsq:
|
nsq:
|
||||||
addr: 127.0.0.1:4150
|
addr: 127.0.0.1:4150
|
||||||
topic: gorush
|
topic: gorush
|
||||||
channel: ch
|
channel: gorush
|
||||||
|
nats:
|
||||||
|
addr: 127.0.0.1:4222
|
||||||
|
subj: gorush
|
||||||
|
queue: gorush
|
||||||
|
|
||||||
ios:
|
ios:
|
||||||
enabled: false
|
enabled: false
|
||||||
|
@ -213,6 +217,7 @@ type SectionStat struct {
|
||||||
type SectionQueue struct {
|
type SectionQueue struct {
|
||||||
Engine string `yaml:"engine"`
|
Engine string `yaml:"engine"`
|
||||||
NSQ SectionNSQ `yaml:"nsq"`
|
NSQ SectionNSQ `yaml:"nsq"`
|
||||||
|
NATS SectionNATS `yaml:"nats"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// SectionNSQ is sub section of config.
|
// SectionNSQ is sub section of config.
|
||||||
|
@ -222,6 +227,13 @@ type SectionNSQ struct {
|
||||||
Channel string `yaml:"channel"`
|
Channel string `yaml:"channel"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SectionNATS is sub section of config.
|
||||||
|
type SectionNATS struct {
|
||||||
|
Addr string `yaml:"addr"`
|
||||||
|
Subj string `yaml:"subj"`
|
||||||
|
Queue string `yaml:"queue"`
|
||||||
|
}
|
||||||
|
|
||||||
// SectionRedis is sub section of config.
|
// SectionRedis is sub section of config.
|
||||||
type SectionRedis struct {
|
type SectionRedis struct {
|
||||||
Addr string `yaml:"addr"`
|
Addr string `yaml:"addr"`
|
||||||
|
@ -367,6 +379,9 @@ func LoadConf(confPath ...string) (ConfYaml, error) {
|
||||||
conf.Queue.NSQ.Addr = viper.GetString("queue.nsq.addr")
|
conf.Queue.NSQ.Addr = viper.GetString("queue.nsq.addr")
|
||||||
conf.Queue.NSQ.Topic = viper.GetString("queue.nsq.topic")
|
conf.Queue.NSQ.Topic = viper.GetString("queue.nsq.topic")
|
||||||
conf.Queue.NSQ.Channel = viper.GetString("queue.nsq.channel")
|
conf.Queue.NSQ.Channel = viper.GetString("queue.nsq.channel")
|
||||||
|
conf.Queue.NATS.Addr = viper.GetString("queue.nats.addr")
|
||||||
|
conf.Queue.NATS.Subj = viper.GetString("queue.nats.subj")
|
||||||
|
conf.Queue.NATS.Queue = viper.GetString("queue.nats.queue")
|
||||||
|
|
||||||
// Stat Engine
|
// Stat Engine
|
||||||
conf.Stat.Engine = viper.GetString("stat.engine")
|
conf.Stat.Engine = viper.GetString("stat.engine")
|
||||||
|
|
|
@ -92,7 +92,11 @@ func (suite *ConfigTestSuite) TestValidateConfDefault() {
|
||||||
assert.Equal(suite.T(), "local", suite.ConfGorushDefault.Queue.Engine)
|
assert.Equal(suite.T(), "local", suite.ConfGorushDefault.Queue.Engine)
|
||||||
assert.Equal(suite.T(), "127.0.0.1:4150", suite.ConfGorushDefault.Queue.NSQ.Addr)
|
assert.Equal(suite.T(), "127.0.0.1:4150", suite.ConfGorushDefault.Queue.NSQ.Addr)
|
||||||
assert.Equal(suite.T(), "gorush", suite.ConfGorushDefault.Queue.NSQ.Topic)
|
assert.Equal(suite.T(), "gorush", suite.ConfGorushDefault.Queue.NSQ.Topic)
|
||||||
assert.Equal(suite.T(), "ch", suite.ConfGorushDefault.Queue.NSQ.Channel)
|
assert.Equal(suite.T(), "gorush", suite.ConfGorushDefault.Queue.NSQ.Channel)
|
||||||
|
|
||||||
|
assert.Equal(suite.T(), "127.0.0.1:4222", suite.ConfGorushDefault.Queue.NATS.Addr)
|
||||||
|
assert.Equal(suite.T(), "gorush", suite.ConfGorushDefault.Queue.NATS.Subj)
|
||||||
|
assert.Equal(suite.T(), "gorush", suite.ConfGorushDefault.Queue.NATS.Queue)
|
||||||
|
|
||||||
// log
|
// log
|
||||||
assert.Equal(suite.T(), "string", suite.ConfGorushDefault.Log.Format)
|
assert.Equal(suite.T(), "string", suite.ConfGorushDefault.Log.Format)
|
||||||
|
|
|
@ -50,11 +50,15 @@ huawei:
|
||||||
max_retry: 0 # resend fail notification, default value zero is disabled
|
max_retry: 0 # resend fail notification, default value zero is disabled
|
||||||
|
|
||||||
queue:
|
queue:
|
||||||
engine: "local" # support "local", "nsq", default value is "local"
|
engine: "local" # support "local", "nsq" and "nats " default value is "local"
|
||||||
nsq:
|
nsq:
|
||||||
addr: 127.0.0.1:4150
|
addr: 127.0.0.1:4150
|
||||||
topic: gorush
|
topic: gorush
|
||||||
channel: ch
|
channel: gorush
|
||||||
|
nats:
|
||||||
|
addr: 127.0.0.1:4222
|
||||||
|
subj: gorush
|
||||||
|
queue: gorush
|
||||||
|
|
||||||
ios:
|
ios:
|
||||||
enabled: false
|
enabled: false
|
||||||
|
|
10
go.sum
10
go.sum
|
@ -40,8 +40,6 @@ github.com/appleboy/go-fcm v0.1.5 h1:fKbcZf/7vwGsvDkcop8a+kCHnK+tt4wXX0X7uEzwI6E
|
||||||
github.com/appleboy/go-fcm v0.1.5/go.mod h1:MSxZ4LqGRsnywOjnlXJXMqbjZrG4vf+0oHitfC9HRH0=
|
github.com/appleboy/go-fcm v0.1.5/go.mod h1:MSxZ4LqGRsnywOjnlXJXMqbjZrG4vf+0oHitfC9HRH0=
|
||||||
github.com/appleboy/gofight/v2 v2.1.2 h1:VOy3jow4vIK8BRQJoC/I9muxyYlJ2yb9ht2hZoS3rf4=
|
github.com/appleboy/gofight/v2 v2.1.2 h1:VOy3jow4vIK8BRQJoC/I9muxyYlJ2yb9ht2hZoS3rf4=
|
||||||
github.com/appleboy/gofight/v2 v2.1.2/go.mod h1:frW+U1QZEdDgixycTj4CygQ48yLTUhplt43+Wczp3rw=
|
github.com/appleboy/gofight/v2 v2.1.2/go.mod h1:frW+U1QZEdDgixycTj4CygQ48yLTUhplt43+Wczp3rw=
|
||||||
github.com/appleboy/queue v0.0.4 h1:WuRPuiYs4aj7cZfPbzgJBxvDyrJkOy0evNpkNvLS+JM=
|
|
||||||
github.com/appleboy/queue v0.0.4/go.mod h1:cEQW2y7dduAUqUGnGJEK9oM5bZLlc0+3HI9bTUL0+Ek=
|
|
||||||
github.com/appleboy/queue v0.0.5 h1:4wrS83ktdxg3U0YGx1EC9mU8vXD4BCAswAliqCEaxHw=
|
github.com/appleboy/queue v0.0.5 h1:4wrS83ktdxg3U0YGx1EC9mU8vXD4BCAswAliqCEaxHw=
|
||||||
github.com/appleboy/queue v0.0.5/go.mod h1:cEQW2y7dduAUqUGnGJEK9oM5bZLlc0+3HI9bTUL0+Ek=
|
github.com/appleboy/queue v0.0.5/go.mod h1:cEQW2y7dduAUqUGnGJEK9oM5bZLlc0+3HI9bTUL0+Ek=
|
||||||
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
|
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
|
||||||
|
@ -297,6 +295,7 @@ github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzp
|
||||||
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
|
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
|
||||||
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
|
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
|
||||||
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
|
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
|
||||||
|
github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0=
|
||||||
github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
|
github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
|
||||||
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
|
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
|
||||||
github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
|
github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
|
||||||
|
@ -320,16 +319,22 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW
|
||||||
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
|
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
|
||||||
github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg=
|
github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg=
|
||||||
github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU=
|
github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU=
|
||||||
|
github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
|
||||||
github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
|
github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
|
||||||
|
github.com/nats-io/jwt/v2 v2.0.2 h1:ejVCLO8gu6/4bOKIHQpmB5UhhUJfAQw55yvLWpfmKjI=
|
||||||
github.com/nats-io/jwt/v2 v2.0.2/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY=
|
github.com/nats-io/jwt/v2 v2.0.2/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY=
|
||||||
github.com/nats-io/nats-server/v2 v2.1.2/go.mod h1:Afk+wRZqkMQs/p45uXdrVLuab3gwv3Z8C4HTBu8GD/k=
|
github.com/nats-io/nats-server/v2 v2.1.2/go.mod h1:Afk+wRZqkMQs/p45uXdrVLuab3gwv3Z8C4HTBu8GD/k=
|
||||||
|
github.com/nats-io/nats-server/v2 v2.3.2 h1:SGJLWrjBHsl0DsdY8PeTR3YKEfiUEYVVq2STw9d8MSY=
|
||||||
github.com/nats-io/nats-server/v2 v2.3.2/go.mod h1:dUf7Cm5z5LbciFVwWx54owyCKm8x4/hL6p7rrljhLFY=
|
github.com/nats-io/nats-server/v2 v2.3.2/go.mod h1:dUf7Cm5z5LbciFVwWx54owyCKm8x4/hL6p7rrljhLFY=
|
||||||
github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w=
|
github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w=
|
||||||
|
github.com/nats-io/nats.go v1.11.1-0.20210623165838-4b75fc59ae30 h1:9GqilBhZaR3xYis0JgMlJjNw933WIobdjKhilXm+Vls=
|
||||||
github.com/nats-io/nats.go v1.11.1-0.20210623165838-4b75fc59ae30/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
github.com/nats-io/nats.go v1.11.1-0.20210623165838-4b75fc59ae30/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
||||||
github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
|
github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
|
||||||
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
|
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
|
||||||
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
|
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
|
||||||
|
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
|
||||||
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
|
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
|
||||||
|
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
||||||
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
|
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
|
||||||
github.com/nsqio/go-nsq v1.0.8 h1:3L2F8tNLlwXXlp2slDUrUWSBn2O3nMh8R1/KEDFTHPk=
|
github.com/nsqio/go-nsq v1.0.8 h1:3L2F8tNLlwXXlp2slDUrUWSBn2O3nMh8R1/KEDFTHPk=
|
||||||
github.com/nsqio/go-nsq v1.0.8/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
|
github.com/nsqio/go-nsq v1.0.8/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
|
||||||
|
@ -637,6 +642,7 @@ golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxb
|
||||||
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||||
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||||
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||||
|
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI=
|
||||||
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||||
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||||
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||||
|
|
8
main.go
8
main.go
|
@ -23,6 +23,7 @@ import (
|
||||||
"github.com/appleboy/gorush/status"
|
"github.com/appleboy/gorush/status"
|
||||||
|
|
||||||
"github.com/appleboy/queue"
|
"github.com/appleboy/queue"
|
||||||
|
"github.com/appleboy/queue/nats"
|
||||||
"github.com/appleboy/queue/nsq"
|
"github.com/appleboy/queue/nsq"
|
||||||
"github.com/appleboy/queue/simple"
|
"github.com/appleboy/queue/simple"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
|
@ -329,6 +330,13 @@ func main() {
|
||||||
nsq.WithMaxInFlight(int(cfg.Core.WorkerNum)),
|
nsq.WithMaxInFlight(int(cfg.Core.WorkerNum)),
|
||||||
nsq.WithRunFunc(notify.Run(cfg)),
|
nsq.WithRunFunc(notify.Run(cfg)),
|
||||||
)
|
)
|
||||||
|
case core.NATS:
|
||||||
|
w = nats.NewWorker(
|
||||||
|
nats.WithAddr(cfg.Queue.NATS.Addr),
|
||||||
|
nats.WithSubj(cfg.Queue.NATS.Subj),
|
||||||
|
nats.WithQueue(cfg.Queue.NATS.Queue),
|
||||||
|
nats.WithRunFunc(notify.Run(cfg)),
|
||||||
|
)
|
||||||
default:
|
default:
|
||||||
logx.LogError.Fatalf("we don't support queue engine: %s", cfg.Queue.Engine)
|
logx.LogError.Fatalf("we don't support queue engine: %s", cfg.Queue.Engine)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue