chore(NSQ): remove nsq package and update run func (#614)

Signed-off-by: Bo-Yi Wu <appleboy.tw@gmail.com>
This commit is contained in:
Bo-Yi Wu 2021-07-25 08:35:49 +08:00 committed by GitHub
parent 0b0f47e4f9
commit e08e4c09ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 17 additions and 24 deletions

3
go.mod
View File

@ -7,7 +7,7 @@ require (
github.com/appleboy/gin-status-api v1.1.0 github.com/appleboy/gin-status-api v1.1.0
github.com/appleboy/go-fcm v0.1.5 github.com/appleboy/go-fcm v0.1.5
github.com/appleboy/gofight/v2 v2.1.2 github.com/appleboy/gofight/v2 v2.1.2
github.com/appleboy/queue v0.0.3 github.com/appleboy/queue v0.0.4-0.20210725000413-1085ecaca645
github.com/asdine/storm/v3 v3.2.1 github.com/asdine/storm/v3 v3.2.1
github.com/buger/jsonparser v1.1.1 github.com/buger/jsonparser v1.1.1
github.com/dgraph-io/badger/v3 v3.2103.1 github.com/dgraph-io/badger/v3 v3.2103.1
@ -21,7 +21,6 @@ require (
github.com/mattn/go-isatty v0.0.12 github.com/mattn/go-isatty v0.0.12
github.com/mitchellh/mapstructure v1.4.1 github.com/mitchellh/mapstructure v1.4.1
github.com/msalihkarakasli/go-hms-push v0.0.0-20200616114002-91cd23dfeed4 github.com/msalihkarakasli/go-hms-push v0.0.0-20200616114002-91cd23dfeed4
github.com/nsqio/go-nsq v1.0.8
github.com/prometheus/client_golang v1.10.0 github.com/prometheus/client_golang v1.10.0
github.com/rs/zerolog v1.23.0 github.com/rs/zerolog v1.23.0
github.com/sideshow/apns2 v0.20.0 github.com/sideshow/apns2 v0.20.0

4
go.sum
View File

@ -40,8 +40,8 @@ github.com/appleboy/go-fcm v0.1.5 h1:fKbcZf/7vwGsvDkcop8a+kCHnK+tt4wXX0X7uEzwI6E
github.com/appleboy/go-fcm v0.1.5/go.mod h1:MSxZ4LqGRsnywOjnlXJXMqbjZrG4vf+0oHitfC9HRH0= github.com/appleboy/go-fcm v0.1.5/go.mod h1:MSxZ4LqGRsnywOjnlXJXMqbjZrG4vf+0oHitfC9HRH0=
github.com/appleboy/gofight/v2 v2.1.2 h1:VOy3jow4vIK8BRQJoC/I9muxyYlJ2yb9ht2hZoS3rf4= github.com/appleboy/gofight/v2 v2.1.2 h1:VOy3jow4vIK8BRQJoC/I9muxyYlJ2yb9ht2hZoS3rf4=
github.com/appleboy/gofight/v2 v2.1.2/go.mod h1:frW+U1QZEdDgixycTj4CygQ48yLTUhplt43+Wczp3rw= github.com/appleboy/gofight/v2 v2.1.2/go.mod h1:frW+U1QZEdDgixycTj4CygQ48yLTUhplt43+Wczp3rw=
github.com/appleboy/queue v0.0.3 h1:rntqVTm6ilh80VCVQjwA0vDMCl1cfveq6GS6X98fKwE= github.com/appleboy/queue v0.0.4-0.20210725000413-1085ecaca645 h1:RpjX4HMSLsrQsxsCQN2pAbPDZWOtpX3dqLnoK0+UzLY=
github.com/appleboy/queue v0.0.3/go.mod h1:6Mn0z4hURZW/26huvRXG0SJ4o7pBdo6hOryRiegy/4Q= github.com/appleboy/queue v0.0.4-0.20210725000413-1085ecaca645/go.mod h1:6Mn0z4hURZW/26huvRXG0SJ4o7pBdo6hOryRiegy/4Q=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=

21
main.go
View File

@ -2,7 +2,6 @@ package main
import ( import (
"context" "context"
"encoding/json"
"flag" "flag"
"fmt" "fmt"
"log" "log"
@ -26,7 +25,6 @@ import (
"github.com/appleboy/queue" "github.com/appleboy/queue"
"github.com/appleboy/queue/nsq" "github.com/appleboy/queue/nsq"
"github.com/appleboy/queue/simple" "github.com/appleboy/queue/simple"
n "github.com/nsqio/go-nsq"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
@ -324,10 +322,7 @@ func main() {
case core.LocalQueue: case core.LocalQueue:
w = simple.NewWorker( w = simple.NewWorker(
simple.WithQueueNum(int(cfg.Core.QueueNum)), simple.WithQueueNum(int(cfg.Core.QueueNum)),
simple.WithRunFunc(func(msg queue.QueuedMessage) error { simple.WithRunFunc(notify.Run),
notify.SendNotification(msg)
return nil
}),
) )
case core.NSQ: case core.NSQ:
w = nsq.NewWorker( w = nsq.NewWorker(
@ -335,19 +330,7 @@ func main() {
nsq.WithTopic(cfg.Queue.NSQ.Topic), nsq.WithTopic(cfg.Queue.NSQ.Topic),
nsq.WithChannel(cfg.Queue.NSQ.Channel), nsq.WithChannel(cfg.Queue.NSQ.Channel),
nsq.WithMaxInFlight(int(cfg.Core.WorkerNum)), nsq.WithMaxInFlight(int(cfg.Core.WorkerNum)),
nsq.WithRunFunc(func(msg *n.Message) error { nsq.WithRunFunc(notify.Run),
if len(msg.Body) == 0 {
// Returning nil will automatically send a FIN command to NSQ to mark the message as processed.
// In this case, a message with an empty body is simply ignored/discarded.
return nil
}
var notification *notify.PushNotification
if err := json.Unmarshal(msg.Body, &notification); err != nil {
return err
}
notify.SendNotification(notification)
return nil
}),
) )
default: default:
logx.LogError.Fatalf("we don't support queue engine: %s", cfg.Queue.Engine) logx.LogError.Fatalf("we don't support queue engine: %s", cfg.Queue.Engine)

View File

@ -254,7 +254,12 @@ func CheckPushConf(cfg config.ConfYaml) error {
// SendNotification send notification // SendNotification send notification
func SendNotification(req queue.QueuedMessage) { func SendNotification(req queue.QueuedMessage) {
v, _ := req.(*PushNotification) v, ok := req.(*PushNotification)
if !ok {
if err := json.Unmarshal(req.Bytes(), v); err != nil {
return
}
}
defer func() { defer func() {
v.WaitDone() v.WaitDone()
@ -269,3 +274,9 @@ func SendNotification(req queue.QueuedMessage) {
PushToHuawei(*v) PushToHuawei(*v)
} }
} }
// Run send notification
var Run = func(msg queue.QueuedMessage) error {
SendNotification(msg)
return nil
}