From 685a87c9309865ec151051ef70c5c9893a0f9cf1 Mon Sep 17 00:00:00 2001 From: Emmet McPoland Date: Thu, 23 Apr 2020 14:02:53 +0100 Subject: [PATCH] Concurrent ios pushes (#497) * Added the ability to concurrently call.push. Linter was complaining about "loop variable token captured by", need to determine if its a legitiment issue, as the lint message has now disappeared * resolved "loop variable token captured by func literal" issue * Ran make_fmt * Moved max_concurrent_pushes to iOS * Moved from limitgroup to sizedwaitgroup * Removed SizedWaitGroup. Using client pool of 1 and buffered channels * Fill client pool with the same client * MaxConcurrentPushes is now applied across all running pushes * Ran make fmt * Corrected TestPushToIOS test * Update README.md Co-Authored-By: Yaroslav "Zorg" Zborovsky * Added comment to config in all copies for max_concurrent_pushes * Updated TestPushToIOS to be driven from config Co-authored-by: Yaroslav "Zorg" Zborovsky --- README.md | 1 + config/config.go | 21 ++++++---- config/config_test.go | 2 + config/testdata/config.yml | 1 + gorush/global.go | 2 + gorush/notification_apns.go | 70 ++++++++++++++++++-------------- gorush/notification_apns_test.go | 1 + main.go | 3 ++ 8 files changed, 62 insertions(+), 39 deletions(-) diff --git a/README.md b/README.md index 9b1d755..16c2f27 100644 --- a/README.md +++ b/README.md @@ -145,6 +145,7 @@ ios: key_type: "pem" # could be pem, p12 or p8 type password: "" # certificate password, default as empty string. production: false + max_concurrent_pushes: 100 # just for push ios notification max_retry: 0 # resend fail notification, default value zero is disabled key_id: "" # KeyID from developer account (Certificates, Identifiers & Profiles -> Keys) team_id: "" # TeamID from developer account (View Account -> Membership) diff --git a/config/config.go b/config/config.go index 905f5d5..c4c32be 100644 --- a/config/config.go +++ b/config/config.go @@ -63,6 +63,7 @@ ios: key_type: "pem" # could be pem, p12 or p8 type password: "" # certificate password, default as empty string. production: false + max_concurrent_pushes: 100 # just for push ios notification max_retry: 0 # resend fail notification, default value zero is disabled key_id: "" # KeyID from developer account (Certificates, Identifiers & Profiles -> Keys) team_id: "" # TeamID from developer account (View Account -> Membership) @@ -153,15 +154,16 @@ type SectionAndroid struct { // SectionIos is sub section of config. type SectionIos struct { - Enabled bool `yaml:"enabled"` - KeyPath string `yaml:"key_path"` - KeyBase64 string `yaml:"key_base64"` - KeyType string `yaml:"key_type"` - Password string `yaml:"password"` - Production bool `yaml:"production"` - MaxRetry int `yaml:"max_retry"` - KeyID string `yaml:"key_id"` - TeamID string `yaml:"team_id"` + Enabled bool `yaml:"enabled"` + KeyPath string `yaml:"key_path"` + KeyBase64 string `yaml:"key_base64"` + KeyType string `yaml:"key_type"` + Password string `yaml:"password"` + Production bool `yaml:"production"` + MaxConcurrentPushes uint `yaml:"max_concurrent_pushes"` + MaxRetry int `yaml:"max_retry"` + KeyID string `yaml:"key_id"` + TeamID string `yaml:"team_id"` } // SectionLog is sub section of config. @@ -308,6 +310,7 @@ func LoadConf(confPath string) (ConfYaml, error) { conf.Ios.KeyType = viper.GetString("ios.key_type") conf.Ios.Password = viper.GetString("ios.password") conf.Ios.Production = viper.GetBool("ios.production") + conf.Ios.MaxConcurrentPushes = viper.GetUint("ios.max_concurrent_pushes") conf.Ios.MaxRetry = viper.GetInt("ios.max_retry") conf.Ios.KeyID = viper.GetString("ios.key_id") conf.Ios.TeamID = viper.GetString("ios.team_id") diff --git a/config/config_test.go b/config/config_test.go index 54b1363..ef7e5d3 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -83,6 +83,7 @@ func (suite *ConfigTestSuite) TestValidateConfDefault() { assert.Equal(suite.T(), "pem", suite.ConfGorushDefault.Ios.KeyType) assert.Equal(suite.T(), "", suite.ConfGorushDefault.Ios.Password) assert.Equal(suite.T(), false, suite.ConfGorushDefault.Ios.Production) + assert.Equal(suite.T(), uint(100), suite.ConfGorushDefault.Ios.MaxConcurrentPushes) assert.Equal(suite.T(), 0, suite.ConfGorushDefault.Ios.MaxRetry) assert.Equal(suite.T(), "", suite.ConfGorushDefault.Ios.KeyID) assert.Equal(suite.T(), "", suite.ConfGorushDefault.Ios.TeamID) @@ -159,6 +160,7 @@ func (suite *ConfigTestSuite) TestValidateConf() { assert.Equal(suite.T(), "pem", suite.ConfGorush.Ios.KeyType) assert.Equal(suite.T(), "", suite.ConfGorush.Ios.Password) assert.Equal(suite.T(), false, suite.ConfGorush.Ios.Production) + assert.Equal(suite.T(), uint(100), suite.ConfGorush.Ios.MaxConcurrentPushes) assert.Equal(suite.T(), 0, suite.ConfGorush.Ios.MaxRetry) assert.Equal(suite.T(), "", suite.ConfGorush.Ios.KeyID) assert.Equal(suite.T(), "", suite.ConfGorush.Ios.TeamID) diff --git a/config/testdata/config.yml b/config/testdata/config.yml index 4cbb214..9680aa8 100644 --- a/config/testdata/config.yml +++ b/config/testdata/config.yml @@ -50,6 +50,7 @@ ios: key_type: "pem" # could be pem, p12 or p8 type password: "" # certificate password, default as empty string. production: false + max_concurrent_pushes: 100 # just for push ios notification max_retry: 0 # resend fail notification, default value zero is disabled key_id: "" # KeyID from developer account (Certificates, Identifiers & Profiles -> Keys) team_id: "" # TeamID from developer account (View Account -> Membership) diff --git a/gorush/global.go b/gorush/global.go index 92237aa..b97130d 100644 --- a/gorush/global.go +++ b/gorush/global.go @@ -24,4 +24,6 @@ var ( LogError *logrus.Logger // StatStorage implements the storage interface StatStorage storage.Storage + // MaxConcurrentIOSPushes pool to limit the number of concurrent iOS pushes + MaxConcurrentIOSPushes chan struct{} ) diff --git a/gorush/notification_apns.go b/gorush/notification_apns.go index ed1eb18..c4fe903 100644 --- a/gorush/notification_apns.go +++ b/gorush/notification_apns.go @@ -7,6 +7,7 @@ import ( "errors" "net/http" "path/filepath" + "sync" "time" "github.com/mitchellh/mapstructure" @@ -356,43 +357,52 @@ Retry: notification := GetIOSNotification(req) client := getApnsClient(req) + var wg sync.WaitGroup for _, token := range req.Tokens { - notification.DeviceToken = token + // occupy push slot + MaxConcurrentIOSPushes <- struct{}{} + wg.Add(1) + go func(token string) { + notification.DeviceToken = token - // send ios notification - res, err := client.Push(notification) + // send ios notification + res, err := client.Push(notification) - if err != nil || res.StatusCode != 200 { - if err == nil { - // error message: - // ref: https://github.com/sideshow/apns2/blob/master/response.go#L14-L65 - err = errors.New(res.Reason) - } - // apns server error - LogPush(FailedPush, token, req, err) + if err != nil || res.StatusCode != 200 { + if err == nil { + // error message: + // ref: https://github.com/sideshow/apns2/blob/master/response.go#L14-L65 + err = errors.New(res.Reason) + } + // apns server error + LogPush(FailedPush, token, req, err) - if PushConf.Core.Sync { - req.AddLog(getLogPushEntry(FailedPush, token, req, err)) - } else if PushConf.Core.FeedbackURL != "" { - go func(logger *logrus.Logger, log LogPushEntry, url string, timeout int64) { - err := DispatchFeedback(log, url, timeout) - if err != nil { - logger.Error(err) - } - }(LogError, getLogPushEntry(FailedPush, token, req, err), PushConf.Core.FeedbackURL, PushConf.Core.FeedbackTimeout) + if PushConf.Core.Sync { + req.AddLog(getLogPushEntry(FailedPush, token, req, err)) + } else if PushConf.Core.FeedbackURL != "" { + go func(logger *logrus.Logger, log LogPushEntry, url string, timeout int64) { + err := DispatchFeedback(log, url, timeout) + if err != nil { + logger.Error(err) + } + }(LogError, getLogPushEntry(FailedPush, token, req, err), PushConf.Core.FeedbackURL, PushConf.Core.FeedbackTimeout) + } + + StatStorage.AddIosError(1) + newTokens = append(newTokens, token) + isError = true } - StatStorage.AddIosError(1) - newTokens = append(newTokens, token) - isError = true - continue - } - - if res.Sent() { - LogPush(SucceededPush, token, req, nil) - StatStorage.AddIosSuccess(1) - } + if res.Sent() && !isError { + LogPush(SucceededPush, token, req, nil) + StatStorage.AddIosSuccess(1) + } + // free push slot + <-MaxConcurrentIOSPushes + wg.Done() + }(token) } + wg.Wait() if isError && retryCount < maxRetry { retryCount++ diff --git a/gorush/notification_apns_test.go b/gorush/notification_apns_test.go index 582c0c4..083df17 100644 --- a/gorush/notification_apns_test.go +++ b/gorush/notification_apns_test.go @@ -678,6 +678,7 @@ func TestAPNSClientUseProxy(t *testing.T) { func TestPushToIOS(t *testing.T) { PushConf, _ = config.LoadConf("") + MaxConcurrentIOSPushes = make(chan struct{}, PushConf.Ios.MaxConcurrentPushes) PushConf.Ios.Enabled = true PushConf.Ios.KeyPath = "../certificate/certificate-valid.pem" diff --git a/main.go b/main.go index c29b4e9..aa5a274 100644 --- a/main.go +++ b/main.go @@ -104,6 +104,9 @@ func main() { return } + // Initialize push slots for concurrent iOS pushes + gorush.MaxConcurrentIOSPushes = make(chan struct{}, gorush.PushConf.Ios.MaxConcurrentPushes) + if opts.Ios.KeyPath != "" { gorush.PushConf.Ios.KeyPath = opts.Ios.KeyPath }