chore(queue): support Redis pub/sub (#652)

This commit is contained in:
Bo-Yi Wu 2022-01-01 23:04:13 +08:00 committed by GitHub
parent ccbe874bf4
commit 3f411ebe89
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 68 additions and 12 deletions

View File

@ -90,7 +90,7 @@ A push notification micro server using [Gin](https://github.com/gin-gonic/gin) f
- Support send notification through [RPC](https://en.wikipedia.org/wiki/Remote_procedure_call) protocol, we use [gRPC](https://grpc.io/) as default framework.
- Support running in Docker, [Kubernetes](https://kubernetes.io/) or [AWS Lambda](https://aws.amazon.com/lambda) ([Native Support in Golang](https://aws.amazon.com/blogs/compute/announcing-go-support-for-aws-lambda/))
- Support graceful shutdown that workers and queue have been sent to APNs/FCM before shutdown service.
- Support different Queue as backend like [NSQ](https://nsq.io/) or [NATS](https://nats.io/), defaut engine is local [Channel](https://tour.golang.org/concurrency/2).
- Support different Queue as backend like [NSQ](https://nsq.io/), [NATS](https://nats.io/) or [Redis Pub/Sub](https://redis.io/topics/pubsub), defaut engine is local [Channel](https://tour.golang.org/concurrency/2).
See the default [YAML config example](config/testdata/config.yml):
@ -148,7 +148,7 @@ huawei:
max_retry: 0 # resend fail notification, default value zero is disabled
queue:
engine: "local" # support "local", "nsq" and "nats " default value is "local"
engine: "local" # support "local", "nsq", "nats" and "redis" default value is "local"
nsq:
addr: 127.0.0.1:4150
topic: gorush
@ -157,6 +157,10 @@ queue:
addr: 127.0.0.1:4222
subj: gorush
queue: gorush
redis:
addr: 127.0.0.1:6379
channel: gorush
size: 1024
ios:
enabled: false

View File

@ -63,7 +63,7 @@ huawei:
max_retry: 0 # resend fail notification, default value zero is disabled
queue:
engine: "local" # support "local", "nsq", default value is "local"
engine: "local" # support "local", "nsq", "nats" and "redis" default value is "local"
nsq:
addr: 127.0.0.1:4150
topic: gorush
@ -72,6 +72,10 @@ queue:
addr: 127.0.0.1:4222
subj: gorush
queue: gorush
redis:
addr: 127.0.0.1:6379
channel: gorush
size: 1024
ios:
enabled: false
@ -219,6 +223,7 @@ type SectionQueue struct {
Engine string `yaml:"engine"`
NSQ SectionNSQ `yaml:"nsq"`
NATS SectionNATS `yaml:"nats"`
Redis SectionRedisQueue `yaml:"redis"`
}
// SectionNSQ is sub section of config.
@ -235,6 +240,13 @@ type SectionNATS struct {
Queue string `yaml:"queue"`
}
// SectionRedisQueue is sub section of config.
type SectionRedisQueue struct {
Addr string `yaml:"addr"`
Channel string `yaml:"channel"`
Size int `yaml:"size"`
}
// SectionRedis is sub section of config.
type SectionRedis struct {
Cluster bool `yaml:"cluster"`
@ -391,6 +403,9 @@ func LoadConf(confPath ...string) (*ConfYaml, error) {
conf.Queue.NATS.Addr = viper.GetString("queue.nats.addr")
conf.Queue.NATS.Subj = viper.GetString("queue.nats.subj")
conf.Queue.NATS.Queue = viper.GetString("queue.nats.queue")
conf.Queue.Redis.Addr = viper.GetString("queue.redis.addr")
conf.Queue.Redis.Channel = viper.GetString("queue.redis.channel")
conf.Queue.Redis.Size = viper.GetInt("queue.redis.size")
// Stat Engine
conf.Stat.Engine = viper.GetString("stat.engine")

View File

@ -107,6 +107,10 @@ func (suite *ConfigTestSuite) TestValidateConfDefault() {
assert.Equal(suite.T(), "gorush", suite.ConfGorushDefault.Queue.NATS.Subj)
assert.Equal(suite.T(), "gorush", suite.ConfGorushDefault.Queue.NATS.Queue)
assert.Equal(suite.T(), "127.0.0.1:6379", suite.ConfGorushDefault.Queue.Redis.Addr)
assert.Equal(suite.T(), "gorush", suite.ConfGorushDefault.Queue.Redis.Channel)
assert.Equal(suite.T(), 1024, suite.ConfGorushDefault.Queue.Redis.Size)
// log
assert.Equal(suite.T(), "string", suite.ConfGorushDefault.Log.Format)
assert.Equal(suite.T(), "stdout", suite.ConfGorushDefault.Log.AccessLog)

View File

@ -50,7 +50,7 @@ huawei:
max_retry: 0 # resend fail notification, default value zero is disabled
queue:
engine: "local" # support "local", "nsq" and "nats " default value is "local"
engine: "local" # support "local", "nsq", "nats" and "redis" default value is "local"
nsq:
addr: 127.0.0.1:4150
topic: gorush
@ -59,6 +59,10 @@ queue:
addr: 127.0.0.1:4222
subj: gorush
queue: gorush
redis:
addr: 127.0.0.1:6379
channel: gorush
size: 1024
ios:
enabled: false

View File

@ -10,6 +10,8 @@ var (
NSQ Queue = "nsq"
// NATS Connective Technology for Adaptive Edge & Distributed Systems
NATS Queue = "nats"
// Redis Pub/Sub
Redis Queue = "redis"
)
// IsLocalQueue check is Local Queue

7
go.mod
View File

@ -13,9 +13,10 @@ require (
github.com/gin-contrib/logger v0.2.0
github.com/gin-gonic/gin v1.7.4
github.com/go-redis/redis/v8 v8.11.3
github.com/golang-queue/nats v0.0.2
github.com/golang-queue/nsq v0.0.2
github.com/golang-queue/queue v0.0.7
github.com/golang-queue/nats v0.0.4
github.com/golang-queue/nsq v0.0.6
github.com/golang-queue/queue v0.0.10
github.com/golang-queue/redisdb v0.0.5
github.com/golang/glog v0.0.0-20210429001901-424d2337a529 // indirect
github.com/golang/protobuf v1.5.2
github.com/google/flatbuffers v2.0.0+incompatible // indirect

20
go.sum
View File

@ -161,10 +161,17 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang-queue/nats v0.0.2 h1:81Ege/02a9d2a1SDN2/t1S1XIr/JpY+s8xj/pLZIHXM=
github.com/golang-queue/nats v0.0.2/go.mod h1:dzXOwbx20CJ5oX4UiBIDyfedCiR5sBWWKJVXmiRlDYc=
github.com/golang-queue/nats v0.0.4 h1:in1fM5Aa5HYeuyXCC5A00zvWxvVvBs9gdCiIr429u7I=
github.com/golang-queue/nats v0.0.4/go.mod h1:P82IIiPlNT+hGUvfddwJCn+yXd8tPeKtS9UK4AU5+I4=
github.com/golang-queue/nsq v0.0.2 h1:kP4fMLl1K6TNlJGq3tJ4t07e703mDGiMYLLltT8F4/Q=
github.com/golang-queue/nsq v0.0.2/go.mod h1:1Q/8y4BclWLj03sn0dApJIObatC3qMX5gLjlbo0TwXs=
github.com/golang-queue/queue v0.0.7 h1:WENCPyPBcIWYgBFSAZ8USGtwmxeCeMkhjwuyM0MAi84=
github.com/golang-queue/nsq v0.0.6 h1:GXk9Dx9ex3/rQDSaK78RK2B0CBNc0ym45hclEjNDNEk=
github.com/golang-queue/nsq v0.0.6/go.mod h1:oKhZjEiAZ4scaQTePCSSnsmvyHb6ID0AsqE5rtKrAOE=
github.com/golang-queue/queue v0.0.7/go.mod h1:JS5tYJacahCjafcplU5idNLX2vkYioqh6wEDX5o9Nms=
github.com/golang-queue/queue v0.0.10 h1:cGqMgHMf2eamwdd3hmOzGcSQogGu9tMhhVYPQMrMC1g=
github.com/golang-queue/queue v0.0.10/go.mod h1:ku8iyjYffqYY6Duts+xl+QYfN3/KDK4MEvXMZUkHyio=
github.com/golang-queue/redisdb v0.0.5 h1:kW+zXopFVtBmd0/19aD3fZCWc1OoRbOV+MpXo2OBp+s=
github.com/golang-queue/redisdb v0.0.5/go.mod h1:3LzXV4ldTCNKT0LsZXiKEhbrOM5gGISLQjYuPip+geM=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/glog v0.0.0-20210429001901-424d2337a529 h1:2voWjNECnrZRbfwXxHB1/j8wa6xdKn85B5NzgVL/pTU=
github.com/golang/glog v0.0.0-20210429001901-424d2337a529/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
@ -519,6 +526,7 @@ github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.4 h1:hi1bXHMVrlQh6WwxAy+qZCV/SYIlqo+Ushwdpa4tAKg=
@ -534,6 +542,8 @@ go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
@ -565,6 +575,7 @@ golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTk
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20190409202823-959b441ac422/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20190909230951-414d861bb4ac/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
@ -573,6 +584,8 @@ golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@ -597,6 +610,7 @@ golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/
golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 h1:DzZ89McO9/gWPsQXS/FVKAlG02ZjaQ6AlZRBimEYOd0=
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@ -650,7 +664,9 @@ golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210309074719-68d13333faf2/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
@ -693,6 +709,8 @@ golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roY
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

10
main.go
View File

@ -25,6 +25,7 @@ import (
"github.com/golang-queue/nats"
"github.com/golang-queue/nsq"
"github.com/golang-queue/queue"
"github.com/golang-queue/redisdb"
"golang.org/x/sync/errgroup"
)
@ -346,6 +347,14 @@ func main() {
nats.WithRunFunc(notify.Run(cfg)),
nats.WithLogger(logx.QueueLogger()),
)
case core.Redis:
w = redisdb.NewWorker(
redisdb.WithAddr(cfg.Queue.Redis.Addr),
redisdb.WithChannel(cfg.Queue.Redis.Channel),
redisdb.WithChannelSize(cfg.Queue.Redis.Size),
redisdb.WithRunFunc(notify.Run(cfg)),
redisdb.WithLogger(logx.QueueLogger()),
)
default:
logx.LogError.Fatalf("we don't support queue engine: %s", cfg.Queue.Engine)
}
@ -456,7 +465,6 @@ Common Options:
// usage will print out the flag options for the server.
func usage() {
fmt.Printf("%s\n", usageStr)
os.Exit(0)
}
// handles pinging the endpoint and returns an error if the