From 8d03ac5f1ee2daec96a839caf6e433765dc8e316 Mon Sep 17 00:00:00 2001 From: Bo-Yi Wu Date: Sun, 1 Aug 2021 19:48:57 +0800 Subject: [PATCH] refactor: remove config from notification struct. (#617) * refactor: remove config from notification struct. * chore: update Signed-off-by: Bo-Yi Wu --- main.go | 13 ++++----- notify/notification.go | 18 ++++++------ notify/notification_apns.go | 20 ++++++------- notify/notification_apns_test.go | 3 +- notify/notification_fcm.go | 48 ++++++++++++++++---------------- notify/notification_fcm_test.go | 15 ++++------ notify/notification_hms.go | 12 ++++---- router/server.go | 7 ++--- router/server_test.go | 12 ++++++-- rpc/server.go | 3 +- 10 files changed, 73 insertions(+), 78 deletions(-) diff --git a/main.go b/main.go index a360653..5e77c2b 100644 --- a/main.go +++ b/main.go @@ -194,7 +194,6 @@ func main() { if opts.Android.Enabled { cfg.Android.Enabled = opts.Android.Enabled req := notify.PushNotification{ - Cfg: cfg, Platform: core.PlatFormAndroid, Message: message, Title: title, @@ -219,7 +218,7 @@ func main() { return } - notify.PushToAndroid(req) + notify.PushToAndroid(req, cfg) return } @@ -228,7 +227,6 @@ func main() { if opts.Huawei.Enabled { cfg.Huawei.Enabled = opts.Huawei.Enabled req := notify.PushNotification{ - Cfg: cfg, Platform: core.PlatFormHuawei, Message: message, Title: title, @@ -253,7 +251,7 @@ func main() { return } - notify.PushToHuawei(req) + notify.PushToHuawei(req, cfg) return } @@ -266,7 +264,6 @@ func main() { cfg.Ios.Enabled = opts.Ios.Enabled req := notify.PushNotification{ - Cfg: cfg, Platform: core.PlatFormIos, Message: message, Title: title, @@ -294,7 +291,7 @@ func main() { if err := notify.InitAPNSClient(cfg); err != nil { return } - notify.PushToIOS(req) + notify.PushToIOS(req, cfg) return } @@ -322,7 +319,7 @@ func main() { case core.LocalQueue: w = simple.NewWorker( simple.WithQueueNum(int(cfg.Core.QueueNum)), - simple.WithRunFunc(notify.Run), + simple.WithRunFunc(notify.Run(cfg)), ) case core.NSQ: w = nsq.NewWorker( @@ -330,7 +327,7 @@ func main() { nsq.WithTopic(cfg.Queue.NSQ.Topic), nsq.WithChannel(cfg.Queue.NSQ.Channel), nsq.WithMaxInFlight(int(cfg.Core.WorkerNum)), - nsq.WithRunFunc(notify.Run), + nsq.WithRunFunc(notify.Run(cfg)), ) default: logx.LogError.Fatalf("we don't support queue engine: %s", cfg.Queue.Engine) diff --git a/notify/notification.go b/notify/notification.go index f47f057..f2eeb66 100644 --- a/notify/notification.go +++ b/notify/notification.go @@ -65,8 +65,6 @@ type ResponsePush struct { // PushNotification is single notification request type PushNotification struct { - Cfg config.ConfYaml - // Common ID string `json:"notif_id,omitempty"` Tokens []string `json:"tokens" binding:"required"` @@ -235,7 +233,7 @@ func CheckPushConf(cfg config.ConfYaml) error { } // SendNotification send notification -func SendNotification(req queue.QueuedMessage) (resp *ResponsePush, err error) { +func SendNotification(req queue.QueuedMessage, cfg config.ConfYaml) (resp *ResponsePush, err error) { v, ok := req.(*PushNotification) if !ok { if err = json.Unmarshal(req.Bytes(), &v); err != nil { @@ -245,18 +243,20 @@ func SendNotification(req queue.QueuedMessage) (resp *ResponsePush, err error) { switch v.Platform { case core.PlatFormIos: - resp, err = PushToIOS(*v) + resp, err = PushToIOS(*v, cfg) case core.PlatFormAndroid: - resp, err = PushToAndroid(*v) + resp, err = PushToAndroid(*v, cfg) case core.PlatFormHuawei: - resp, err = PushToHuawei(*v) + resp, err = PushToHuawei(*v, cfg) } return } // Run send notification -var Run = func(ctx context.Context, msg queue.QueuedMessage) error { - _, err := SendNotification(msg) - return err +var Run = func(cfg config.ConfYaml) func(ctx context.Context, msg queue.QueuedMessage) error { + return func(ctx context.Context, msg queue.QueuedMessage) error { + _, err := SendNotification(msg, cfg) + return err + } } diff --git a/notify/notification_apns.go b/notify/notification_apns.go index e59ca86..e434c51 100644 --- a/notify/notification_apns.go +++ b/notify/notification_apns.go @@ -388,16 +388,16 @@ func getApnsClient(cfg config.ConfYaml, req PushNotification) (client *apns2.Cli } // PushToIOS provide send notification to APNs server. -func PushToIOS(req PushNotification) (resp *ResponsePush, err error) { +func PushToIOS(req PushNotification, cfg config.ConfYaml) (resp *ResponsePush, err error) { logx.LogAccess.Debug("Start push notification for iOS") - if req.Cfg.Core.Sync && !core.IsLocalQueue(core.Queue(req.Cfg.Queue.Engine)) { - req.Cfg.Core.Sync = false + if cfg.Core.Sync && !core.IsLocalQueue(core.Queue(cfg.Queue.Engine)) { + cfg.Core.Sync = false } var ( retryCount = 0 - maxRetry = req.Cfg.Ios.MaxRetry + maxRetry = cfg.Ios.MaxRetry ) if req.Retry > 0 && req.Retry < maxRetry { @@ -410,7 +410,7 @@ Retry: var newTokens []string notification := GetIOSNotification(req) - client := getApnsClient(req.Cfg, req) + client := getApnsClient(cfg, req) var wg sync.WaitGroup for _, token := range req.Tokens { @@ -430,16 +430,16 @@ Retry: } // apns server error - errLog := logPush(req.Cfg, core.FailedPush, token, req, err) - if req.Cfg.Core.Sync { + errLog := logPush(cfg, core.FailedPush, token, req, err) + if cfg.Core.Sync { resp.Logs = append(resp.Logs, errLog) - } else if req.Cfg.Core.FeedbackURL != "" { + } else if cfg.Core.FeedbackURL != "" { go func(logger *logrus.Logger, log logx.LogPushEntry, url string, timeout int64) { err := DispatchFeedback(log, url, timeout) if err != nil { logger.Error(err) } - }(logx.LogError, errLog, req.Cfg.Core.FeedbackURL, req.Cfg.Core.FeedbackTimeout) + }(logx.LogError, errLog, cfg.Core.FeedbackURL, cfg.Core.FeedbackTimeout) } status.StatStorage.AddIosError(1) @@ -451,7 +451,7 @@ Retry: } if res != nil && res.Sent() { - logPush(req.Cfg, core.SucceededPush, token, req, nil) + logPush(cfg, core.SucceededPush, token, req, nil) status.StatStorage.AddIosSuccess(1) } diff --git a/notify/notification_apns_test.go b/notify/notification_apns_test.go index 14fa1bf..4e146be 100644 --- a/notify/notification_apns_test.go +++ b/notify/notification_apns_test.go @@ -726,14 +726,13 @@ func TestPushToIOS(t *testing.T) { assert.Nil(t, err) req := PushNotification{ - Cfg: cfg, Tokens: []string{"11aa01229f15f0f0c52029d8cf8cd0aeaf2365fe4cebc4af26cd6d76b7919ef7", "11aa01229f15f0f0c52029d8cf8cd0aeaf2365fe4cebc4af26cd6d76b7919ef1"}, Platform: 1, Message: "Welcome", } // send fail - PushToIOS(req) + PushToIOS(req, cfg) } func TestApnsHostFromRequest(t *testing.T) { diff --git a/notify/notification_fcm.go b/notify/notification_fcm.go index 762ca51..2401b13 100644 --- a/notify/notification_fcm.go +++ b/notify/notification_fcm.go @@ -106,17 +106,17 @@ func GetAndroidNotification(req PushNotification) *fcm.Message { } // PushToAndroid provide send notification to Android server. -func PushToAndroid(req PushNotification) (resp *ResponsePush, err error) { +func PushToAndroid(req PushNotification, cfg config.ConfYaml) (resp *ResponsePush, err error) { logx.LogAccess.Debug("Start push notification for Android") - if req.Cfg.Core.Sync && !core.IsLocalQueue(core.Queue(req.Cfg.Queue.Engine)) { - req.Cfg.Core.Sync = false + if cfg.Core.Sync && !core.IsLocalQueue(core.Queue(cfg.Queue.Engine)) { + cfg.Core.Sync = false } var ( client *fcm.Client retryCount = 0 - maxRetry = req.Cfg.Android.MaxRetry + maxRetry = cfg.Android.MaxRetry ) if req.Retry > 0 && req.Retry < maxRetry { @@ -136,9 +136,9 @@ Retry: notification := GetAndroidNotification(req) if req.APIKey != "" { - client, err = InitFCMClient(req.Cfg, req.APIKey) + client, err = InitFCMClient(cfg, req.APIKey) } else { - client, err = InitFCMClient(req.Cfg, req.Cfg.Android.APIKey) + client, err = InitFCMClient(cfg, cfg.Android.APIKey) } if err != nil { @@ -153,30 +153,30 @@ Retry: logx.LogError.Error("FCM server send message error: " + err.Error()) if req.IsTopic() { - errLog := logPush(req.Cfg, core.FailedPush, req.To, req, err) - if req.Cfg.Core.Sync { + errLog := logPush(cfg, core.FailedPush, req.To, req, err) + if cfg.Core.Sync { resp.Logs = append(resp.Logs, errLog) - } else if req.Cfg.Core.FeedbackURL != "" { + } else if cfg.Core.FeedbackURL != "" { go func(logger *logrus.Logger, log logx.LogPushEntry, url string, timeout int64) { err := DispatchFeedback(log, url, timeout) if err != nil { logger.Error(err) } - }(logx.LogError, errLog, req.Cfg.Core.FeedbackURL, req.Cfg.Core.FeedbackTimeout) + }(logx.LogError, errLog, cfg.Core.FeedbackURL, cfg.Core.FeedbackTimeout) } status.StatStorage.AddAndroidError(1) } else { for _, token := range req.Tokens { - errLog := logPush(req.Cfg, core.FailedPush, token, req, err) - if req.Cfg.Core.Sync { + errLog := logPush(cfg, core.FailedPush, token, req, err) + if cfg.Core.Sync { resp.Logs = append(resp.Logs, errLog) - } else if req.Cfg.Core.FeedbackURL != "" { + } else if cfg.Core.FeedbackURL != "" { go func(logger *logrus.Logger, log logx.LogPushEntry, url string, timeout int64) { err := DispatchFeedback(log, url, timeout) if err != nil { logger.Error(err) } - }(logx.LogError, errLog, req.Cfg.Core.FeedbackURL, req.Cfg.Core.FeedbackTimeout) + }(logx.LogError, errLog, cfg.Core.FeedbackURL, cfg.Core.FeedbackTimeout) } } status.StatStorage.AddAndroidError(int64(len(req.Tokens))) @@ -208,21 +208,21 @@ Retry: newTokens = append(newTokens, to) } - errLog := logPush(req.Cfg, core.FailedPush, to, req, result.Error) - if req.Cfg.Core.Sync { + errLog := logPush(cfg, core.FailedPush, to, req, result.Error) + if cfg.Core.Sync { resp.Logs = append(resp.Logs, errLog) - } else if req.Cfg.Core.FeedbackURL != "" { + } else if cfg.Core.FeedbackURL != "" { go func(logger *logrus.Logger, log logx.LogPushEntry, url string, timeout int64) { err := DispatchFeedback(log, url, timeout) if err != nil { logger.Error(err) } - }(logx.LogError, errLog, req.Cfg.Core.FeedbackURL, req.Cfg.Core.FeedbackTimeout) + }(logx.LogError, errLog, cfg.Core.FeedbackURL, cfg.Core.FeedbackTimeout) } continue } - logPush(req.Cfg, core.SucceededPush, to, req, nil) + logPush(cfg, core.SucceededPush, to, req, nil) } // result from Send messages to topics @@ -236,11 +236,11 @@ Retry: logx.LogAccess.Debug("Send Topic Message: ", to) // Success if res.MessageID != 0 { - logPush(req.Cfg, core.SucceededPush, to, req, nil) + logPush(cfg, core.SucceededPush, to, req, nil) } else { // failure - errLog := logPush(req.Cfg, core.FailedPush, to, req, res.Error) - if req.Cfg.Core.Sync { + errLog := logPush(cfg, core.FailedPush, to, req, res.Error) + if cfg.Core.Sync { resp.Logs = append(resp.Logs, errLog) } } @@ -250,8 +250,8 @@ Retry: if len(res.FailedRegistrationIDs) > 0 { newTokens = append(newTokens, res.FailedRegistrationIDs...) - errLog := logPush(req.Cfg, core.FailedPush, notification.To, req, errors.New("device group: partial success or all fails")) - if req.Cfg.Core.Sync { + errLog := logPush(cfg, core.FailedPush, notification.To, req, errors.New("device group: partial success or all fails")) + if cfg.Core.Sync { resp.Logs = append(resp.Logs, errLog) } } diff --git a/notify/notification_fcm_test.go b/notify/notification_fcm_test.go index 1e57872..0bcb9b8 100644 --- a/notify/notification_fcm_test.go +++ b/notify/notification_fcm_test.go @@ -40,14 +40,13 @@ func TestPushToAndroidWrongToken(t *testing.T) { cfg.Android.APIKey = os.Getenv("ANDROID_API_KEY") req := PushNotification{ - Cfg: cfg, Tokens: []string{"aaaaaa", "bbbbb"}, Platform: core.PlatFormAndroid, Message: "Welcome", } // Android Success count: 0, Failure count: 2 - PushToAndroid(req) + PushToAndroid(req, cfg) } func TestPushToAndroidRightTokenForJSONLog(t *testing.T) { @@ -61,13 +60,12 @@ func TestPushToAndroidRightTokenForJSONLog(t *testing.T) { androidToken := os.Getenv("ANDROID_TEST_TOKEN") req := PushNotification{ - Cfg: cfg, Tokens: []string{androidToken}, Platform: core.PlatFormAndroid, Message: "Welcome", } - PushToAndroid(req) + PushToAndroid(req, cfg) } func TestPushToAndroidRightTokenForStringLog(t *testing.T) { @@ -79,13 +77,12 @@ func TestPushToAndroidRightTokenForStringLog(t *testing.T) { androidToken := os.Getenv("ANDROID_TEST_TOKEN") req := PushNotification{ - Cfg: cfg, Tokens: []string{androidToken}, Platform: core.PlatFormAndroid, Message: "Welcome", } - PushToAndroid(req) + PushToAndroid(req, cfg) } func TestOverwriteAndroidAPIKey(t *testing.T) { @@ -98,7 +95,6 @@ func TestOverwriteAndroidAPIKey(t *testing.T) { androidToken := os.Getenv("ANDROID_TEST_TOKEN") req := PushNotification{ - Cfg: cfg, Tokens: []string{androidToken, "bbbbb"}, Platform: core.PlatFormAndroid, Message: "Welcome", @@ -107,7 +103,7 @@ func TestOverwriteAndroidAPIKey(t *testing.T) { } // FCM server error: 401 error: 401 Unauthorized (Wrong API Key) - resp, err := PushToAndroid(req) + resp, err := PushToAndroid(req, cfg) assert.Error(t, err) assert.Len(t, resp.Logs, 2) @@ -199,14 +195,13 @@ func TestCheckAndroidMessage(t *testing.T) { timeToLive := uint(2419201) req := PushNotification{ - Cfg: cfg, Tokens: []string{"aaaaaa", "bbbbb"}, Platform: core.PlatFormAndroid, Message: "Welcome", TimeToLive: &timeToLive, } - PushToAndroid(req) + PushToAndroid(req, cfg) } func TestAndroidNotificationStructure(t *testing.T) { diff --git a/notify/notification_hms.go b/notify/notification_hms.go index 1099a15..1960067 100644 --- a/notify/notification_hms.go +++ b/notify/notification_hms.go @@ -166,17 +166,17 @@ func GetHuaweiNotification(req PushNotification) (*model.MessageRequest, error) } // PushToHuawei provide send notification to Android server. -func PushToHuawei(req PushNotification) (resp *ResponsePush, err error) { +func PushToHuawei(req PushNotification, cfg config.ConfYaml) (resp *ResponsePush, err error) { logx.LogAccess.Debug("Start push notification for Huawei") - if req.Cfg.Core.Sync && !core.IsLocalQueue(core.Queue(req.Cfg.Queue.Engine)) { - req.Cfg.Core.Sync = false + if cfg.Core.Sync && !core.IsLocalQueue(core.Queue(cfg.Queue.Engine)) { + cfg.Core.Sync = false } var ( client *client.HMSClient retryCount = 0 - maxRetry = req.Cfg.Huawei.MaxRetry + maxRetry = cfg.Huawei.MaxRetry ) if req.Retry > 0 && req.Retry < maxRetry { @@ -190,7 +190,7 @@ func PushToHuawei(req PushNotification) (resp *ResponsePush, err error) { return } - client, err = InitHMSClient(req.Cfg, req.Cfg.Huawei.AppSecret, req.Cfg.Huawei.AppID) + client, err = InitHMSClient(cfg, cfg.Huawei.AppSecret, cfg.Huawei.AppID) if err != nil { // HMS server error @@ -208,7 +208,7 @@ Retry: res, err := client.SendMessage(context.Background(), notification) if err != nil { // Send Message error - errLog := logPush(req.Cfg, core.FailedPush, req.To, req, err) + errLog := logPush(cfg, core.FailedPush, req.To, req, err) resp.Logs = append(resp.Logs, errLog) logx.LogError.Error("HMS server send message error: " + err.Error()) return diff --git a/router/server.go b/router/server.go index f2eeb5f..02b7ada 100644 --- a/router/server.go +++ b/router/server.go @@ -273,7 +273,6 @@ func handleNotification(ctx context.Context, cfg config.ConfYaml, req notify.Req continue } } - notification.Cfg = cfg newNotification = append(newNotification, notification) } @@ -284,10 +283,10 @@ func handleNotification(ctx context.Context, cfg config.ConfYaml, req notify.Req } if core.IsLocalQueue(core.Queue(cfg.Queue.Engine)) && cfg.Core.Sync { - func(msg *notify.PushNotification) { + func(msg *notify.PushNotification, cfg config.ConfYaml) { q.QueueTask(func(ctx context.Context) error { defer wg.Done() - resp, err := notify.SendNotification(msg) + resp, err := notify.SendNotification(msg, cfg) if err != nil { return err } @@ -299,7 +298,7 @@ func handleNotification(ctx context.Context, cfg config.ConfYaml, req notify.Req return nil }) - }(notification) + }(notification, cfg) } else if err := q.Queue(notification); err != nil { resp := markFailedNotification(cfg, notification, "max capacity reached") // add log diff --git a/router/server_test.go b/router/server_test.go index d782c4d..79db24b 100644 --- a/router/server_test.go +++ b/router/server_test.go @@ -36,10 +36,17 @@ func TestMain(m *testing.M) { log.Fatal(err) } + cfg.Android.Enabled = true + cfg.Android.APIKey = os.Getenv("ANDROID_API_KEY") + + if _, err := notify.InitFCMClient(cfg, ""); err != nil { + log.Fatal(err) + } + w = simple.NewWorker( simple.WithRunFunc(func(ctx context.Context, msg queue.QueuedMessage) error { - notify.SendNotification(msg) - return nil + _, err := notify.SendNotification(msg, cfg) + return err }), ) q, _ = queue.NewQueue( @@ -600,7 +607,6 @@ func TestSyncModeForTopicNotification(t *testing.T) { // android { // success - Cfg: cfg, Condition: "'dogs' in topics || 'cats' in topics", Platform: core.PlatFormAndroid, Message: "This is a Firebase Cloud Messaging Topic Message!", diff --git a/rpc/server.go b/rpc/server.go index 821ebc5..595e73d 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -56,7 +56,6 @@ func (s *Server) Check(ctx context.Context, in *proto.HealthCheckRequest) (*prot func (s *Server) Send(ctx context.Context, in *proto.NotificationRequest) (*proto.NotificationReply, error) { badge := int(in.Badge) notification := notify.PushNotification{ - Cfg: s.cfg, Platform: int(in.Platform), Tokens: in.Tokens, Message: in.Message, @@ -102,7 +101,7 @@ func (s *Server) Send(ctx context.Context, in *proto.NotificationRequest) (*prot } } - go notify.SendNotification(¬ification) + go notify.SendNotification(¬ification, s.cfg) return &proto.NotificationReply{ Success: true,