Concurrent ios pushes (#497)
* Added the ability to concurrently call.push. Linter was complaining about "loop variable token captured by", need to determine if its a legitiment issue, as the lint message has now disappeared * resolved "loop variable token captured by func literal" issue * Ran make_fmt * Moved max_concurrent_pushes to iOS * Moved from limitgroup to sizedwaitgroup * Removed SizedWaitGroup. Using client pool of 1 and buffered channels * Fill client pool with the same client * MaxConcurrentPushes is now applied across all running pushes * Ran make fmt * Corrected TestPushToIOS test * Update README.md Co-Authored-By: Yaroslav "Zorg" Zborovsky <yaronius@users.noreply.github.com> * Added comment to config in all copies for max_concurrent_pushes * Updated TestPushToIOS to be driven from config Co-authored-by: Yaroslav "Zorg" Zborovsky <yaronius@users.noreply.github.com>
This commit is contained in:
parent
eec5dd3df7
commit
685a87c930
|
@ -145,6 +145,7 @@ ios:
|
||||||
key_type: "pem" # could be pem, p12 or p8 type
|
key_type: "pem" # could be pem, p12 or p8 type
|
||||||
password: "" # certificate password, default as empty string.
|
password: "" # certificate password, default as empty string.
|
||||||
production: false
|
production: false
|
||||||
|
max_concurrent_pushes: 100 # just for push ios notification
|
||||||
max_retry: 0 # resend fail notification, default value zero is disabled
|
max_retry: 0 # resend fail notification, default value zero is disabled
|
||||||
key_id: "" # KeyID from developer account (Certificates, Identifiers & Profiles -> Keys)
|
key_id: "" # KeyID from developer account (Certificates, Identifiers & Profiles -> Keys)
|
||||||
team_id: "" # TeamID from developer account (View Account -> Membership)
|
team_id: "" # TeamID from developer account (View Account -> Membership)
|
||||||
|
|
|
@ -63,6 +63,7 @@ ios:
|
||||||
key_type: "pem" # could be pem, p12 or p8 type
|
key_type: "pem" # could be pem, p12 or p8 type
|
||||||
password: "" # certificate password, default as empty string.
|
password: "" # certificate password, default as empty string.
|
||||||
production: false
|
production: false
|
||||||
|
max_concurrent_pushes: 100 # just for push ios notification
|
||||||
max_retry: 0 # resend fail notification, default value zero is disabled
|
max_retry: 0 # resend fail notification, default value zero is disabled
|
||||||
key_id: "" # KeyID from developer account (Certificates, Identifiers & Profiles -> Keys)
|
key_id: "" # KeyID from developer account (Certificates, Identifiers & Profiles -> Keys)
|
||||||
team_id: "" # TeamID from developer account (View Account -> Membership)
|
team_id: "" # TeamID from developer account (View Account -> Membership)
|
||||||
|
@ -153,15 +154,16 @@ type SectionAndroid struct {
|
||||||
|
|
||||||
// SectionIos is sub section of config.
|
// SectionIos is sub section of config.
|
||||||
type SectionIos struct {
|
type SectionIos struct {
|
||||||
Enabled bool `yaml:"enabled"`
|
Enabled bool `yaml:"enabled"`
|
||||||
KeyPath string `yaml:"key_path"`
|
KeyPath string `yaml:"key_path"`
|
||||||
KeyBase64 string `yaml:"key_base64"`
|
KeyBase64 string `yaml:"key_base64"`
|
||||||
KeyType string `yaml:"key_type"`
|
KeyType string `yaml:"key_type"`
|
||||||
Password string `yaml:"password"`
|
Password string `yaml:"password"`
|
||||||
Production bool `yaml:"production"`
|
Production bool `yaml:"production"`
|
||||||
MaxRetry int `yaml:"max_retry"`
|
MaxConcurrentPushes uint `yaml:"max_concurrent_pushes"`
|
||||||
KeyID string `yaml:"key_id"`
|
MaxRetry int `yaml:"max_retry"`
|
||||||
TeamID string `yaml:"team_id"`
|
KeyID string `yaml:"key_id"`
|
||||||
|
TeamID string `yaml:"team_id"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// SectionLog is sub section of config.
|
// SectionLog is sub section of config.
|
||||||
|
@ -308,6 +310,7 @@ func LoadConf(confPath string) (ConfYaml, error) {
|
||||||
conf.Ios.KeyType = viper.GetString("ios.key_type")
|
conf.Ios.KeyType = viper.GetString("ios.key_type")
|
||||||
conf.Ios.Password = viper.GetString("ios.password")
|
conf.Ios.Password = viper.GetString("ios.password")
|
||||||
conf.Ios.Production = viper.GetBool("ios.production")
|
conf.Ios.Production = viper.GetBool("ios.production")
|
||||||
|
conf.Ios.MaxConcurrentPushes = viper.GetUint("ios.max_concurrent_pushes")
|
||||||
conf.Ios.MaxRetry = viper.GetInt("ios.max_retry")
|
conf.Ios.MaxRetry = viper.GetInt("ios.max_retry")
|
||||||
conf.Ios.KeyID = viper.GetString("ios.key_id")
|
conf.Ios.KeyID = viper.GetString("ios.key_id")
|
||||||
conf.Ios.TeamID = viper.GetString("ios.team_id")
|
conf.Ios.TeamID = viper.GetString("ios.team_id")
|
||||||
|
|
|
@ -83,6 +83,7 @@ func (suite *ConfigTestSuite) TestValidateConfDefault() {
|
||||||
assert.Equal(suite.T(), "pem", suite.ConfGorushDefault.Ios.KeyType)
|
assert.Equal(suite.T(), "pem", suite.ConfGorushDefault.Ios.KeyType)
|
||||||
assert.Equal(suite.T(), "", suite.ConfGorushDefault.Ios.Password)
|
assert.Equal(suite.T(), "", suite.ConfGorushDefault.Ios.Password)
|
||||||
assert.Equal(suite.T(), false, suite.ConfGorushDefault.Ios.Production)
|
assert.Equal(suite.T(), false, suite.ConfGorushDefault.Ios.Production)
|
||||||
|
assert.Equal(suite.T(), uint(100), suite.ConfGorushDefault.Ios.MaxConcurrentPushes)
|
||||||
assert.Equal(suite.T(), 0, suite.ConfGorushDefault.Ios.MaxRetry)
|
assert.Equal(suite.T(), 0, suite.ConfGorushDefault.Ios.MaxRetry)
|
||||||
assert.Equal(suite.T(), "", suite.ConfGorushDefault.Ios.KeyID)
|
assert.Equal(suite.T(), "", suite.ConfGorushDefault.Ios.KeyID)
|
||||||
assert.Equal(suite.T(), "", suite.ConfGorushDefault.Ios.TeamID)
|
assert.Equal(suite.T(), "", suite.ConfGorushDefault.Ios.TeamID)
|
||||||
|
@ -159,6 +160,7 @@ func (suite *ConfigTestSuite) TestValidateConf() {
|
||||||
assert.Equal(suite.T(), "pem", suite.ConfGorush.Ios.KeyType)
|
assert.Equal(suite.T(), "pem", suite.ConfGorush.Ios.KeyType)
|
||||||
assert.Equal(suite.T(), "", suite.ConfGorush.Ios.Password)
|
assert.Equal(suite.T(), "", suite.ConfGorush.Ios.Password)
|
||||||
assert.Equal(suite.T(), false, suite.ConfGorush.Ios.Production)
|
assert.Equal(suite.T(), false, suite.ConfGorush.Ios.Production)
|
||||||
|
assert.Equal(suite.T(), uint(100), suite.ConfGorush.Ios.MaxConcurrentPushes)
|
||||||
assert.Equal(suite.T(), 0, suite.ConfGorush.Ios.MaxRetry)
|
assert.Equal(suite.T(), 0, suite.ConfGorush.Ios.MaxRetry)
|
||||||
assert.Equal(suite.T(), "", suite.ConfGorush.Ios.KeyID)
|
assert.Equal(suite.T(), "", suite.ConfGorush.Ios.KeyID)
|
||||||
assert.Equal(suite.T(), "", suite.ConfGorush.Ios.TeamID)
|
assert.Equal(suite.T(), "", suite.ConfGorush.Ios.TeamID)
|
||||||
|
|
|
@ -50,6 +50,7 @@ ios:
|
||||||
key_type: "pem" # could be pem, p12 or p8 type
|
key_type: "pem" # could be pem, p12 or p8 type
|
||||||
password: "" # certificate password, default as empty string.
|
password: "" # certificate password, default as empty string.
|
||||||
production: false
|
production: false
|
||||||
|
max_concurrent_pushes: 100 # just for push ios notification
|
||||||
max_retry: 0 # resend fail notification, default value zero is disabled
|
max_retry: 0 # resend fail notification, default value zero is disabled
|
||||||
key_id: "" # KeyID from developer account (Certificates, Identifiers & Profiles -> Keys)
|
key_id: "" # KeyID from developer account (Certificates, Identifiers & Profiles -> Keys)
|
||||||
team_id: "" # TeamID from developer account (View Account -> Membership)
|
team_id: "" # TeamID from developer account (View Account -> Membership)
|
||||||
|
|
|
@ -24,4 +24,6 @@ var (
|
||||||
LogError *logrus.Logger
|
LogError *logrus.Logger
|
||||||
// StatStorage implements the storage interface
|
// StatStorage implements the storage interface
|
||||||
StatStorage storage.Storage
|
StatStorage storage.Storage
|
||||||
|
// MaxConcurrentIOSPushes pool to limit the number of concurrent iOS pushes
|
||||||
|
MaxConcurrentIOSPushes chan struct{}
|
||||||
)
|
)
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"net/http"
|
"net/http"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/mitchellh/mapstructure"
|
"github.com/mitchellh/mapstructure"
|
||||||
|
@ -356,43 +357,52 @@ Retry:
|
||||||
notification := GetIOSNotification(req)
|
notification := GetIOSNotification(req)
|
||||||
client := getApnsClient(req)
|
client := getApnsClient(req)
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
for _, token := range req.Tokens {
|
for _, token := range req.Tokens {
|
||||||
notification.DeviceToken = token
|
// occupy push slot
|
||||||
|
MaxConcurrentIOSPushes <- struct{}{}
|
||||||
|
wg.Add(1)
|
||||||
|
go func(token string) {
|
||||||
|
notification.DeviceToken = token
|
||||||
|
|
||||||
// send ios notification
|
// send ios notification
|
||||||
res, err := client.Push(notification)
|
res, err := client.Push(notification)
|
||||||
|
|
||||||
if err != nil || res.StatusCode != 200 {
|
if err != nil || res.StatusCode != 200 {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
// error message:
|
// error message:
|
||||||
// ref: https://github.com/sideshow/apns2/blob/master/response.go#L14-L65
|
// ref: https://github.com/sideshow/apns2/blob/master/response.go#L14-L65
|
||||||
err = errors.New(res.Reason)
|
err = errors.New(res.Reason)
|
||||||
}
|
}
|
||||||
// apns server error
|
// apns server error
|
||||||
LogPush(FailedPush, token, req, err)
|
LogPush(FailedPush, token, req, err)
|
||||||
|
|
||||||
if PushConf.Core.Sync {
|
if PushConf.Core.Sync {
|
||||||
req.AddLog(getLogPushEntry(FailedPush, token, req, err))
|
req.AddLog(getLogPushEntry(FailedPush, token, req, err))
|
||||||
} else if PushConf.Core.FeedbackURL != "" {
|
} else if PushConf.Core.FeedbackURL != "" {
|
||||||
go func(logger *logrus.Logger, log LogPushEntry, url string, timeout int64) {
|
go func(logger *logrus.Logger, log LogPushEntry, url string, timeout int64) {
|
||||||
err := DispatchFeedback(log, url, timeout)
|
err := DispatchFeedback(log, url, timeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(err)
|
logger.Error(err)
|
||||||
}
|
}
|
||||||
}(LogError, getLogPushEntry(FailedPush, token, req, err), PushConf.Core.FeedbackURL, PushConf.Core.FeedbackTimeout)
|
}(LogError, getLogPushEntry(FailedPush, token, req, err), PushConf.Core.FeedbackURL, PushConf.Core.FeedbackTimeout)
|
||||||
|
}
|
||||||
|
|
||||||
|
StatStorage.AddIosError(1)
|
||||||
|
newTokens = append(newTokens, token)
|
||||||
|
isError = true
|
||||||
}
|
}
|
||||||
|
|
||||||
StatStorage.AddIosError(1)
|
if res.Sent() && !isError {
|
||||||
newTokens = append(newTokens, token)
|
LogPush(SucceededPush, token, req, nil)
|
||||||
isError = true
|
StatStorage.AddIosSuccess(1)
|
||||||
continue
|
}
|
||||||
}
|
// free push slot
|
||||||
|
<-MaxConcurrentIOSPushes
|
||||||
if res.Sent() {
|
wg.Done()
|
||||||
LogPush(SucceededPush, token, req, nil)
|
}(token)
|
||||||
StatStorage.AddIosSuccess(1)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
if isError && retryCount < maxRetry {
|
if isError && retryCount < maxRetry {
|
||||||
retryCount++
|
retryCount++
|
||||||
|
|
|
@ -678,6 +678,7 @@ func TestAPNSClientUseProxy(t *testing.T) {
|
||||||
|
|
||||||
func TestPushToIOS(t *testing.T) {
|
func TestPushToIOS(t *testing.T) {
|
||||||
PushConf, _ = config.LoadConf("")
|
PushConf, _ = config.LoadConf("")
|
||||||
|
MaxConcurrentIOSPushes = make(chan struct{}, PushConf.Ios.MaxConcurrentPushes)
|
||||||
|
|
||||||
PushConf.Ios.Enabled = true
|
PushConf.Ios.Enabled = true
|
||||||
PushConf.Ios.KeyPath = "../certificate/certificate-valid.pem"
|
PushConf.Ios.KeyPath = "../certificate/certificate-valid.pem"
|
||||||
|
|
3
main.go
3
main.go
|
@ -104,6 +104,9 @@ func main() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Initialize push slots for concurrent iOS pushes
|
||||||
|
gorush.MaxConcurrentIOSPushes = make(chan struct{}, gorush.PushConf.Ios.MaxConcurrentPushes)
|
||||||
|
|
||||||
if opts.Ios.KeyPath != "" {
|
if opts.Ios.KeyPath != "" {
|
||||||
gorush.PushConf.Ios.KeyPath = opts.Ios.KeyPath
|
gorush.PushConf.Ios.KeyPath = opts.Ios.KeyPath
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue