feat: support sync mode for http response. (#208)
* feat: support sync mode for http response. * fix: update readme. * fix: check wg exist. * fix: testing sync mode.
This commit is contained in:
parent
fcdd369cec
commit
b6997ea792
|
@ -63,6 +63,7 @@ core:
|
||||||
worker_num: 0 # default worker number is runtime.NumCPU()
|
worker_num: 0 # default worker number is runtime.NumCPU()
|
||||||
queue_num: 0 # default queue number is 8192
|
queue_num: 0 # default queue number is 8192
|
||||||
max_notification: 100
|
max_notification: 100
|
||||||
|
sync: false # set true if you get http response after finish send notification.
|
||||||
mode: "release"
|
mode: "release"
|
||||||
ssl: false
|
ssl: false
|
||||||
cert_path: "cert.pem"
|
cert_path: "cert.pem"
|
||||||
|
|
|
@ -24,6 +24,7 @@ type SectionCore struct {
|
||||||
WorkerNum int64 `yaml:"worker_num"`
|
WorkerNum int64 `yaml:"worker_num"`
|
||||||
QueueNum int64 `yaml:"queue_num"`
|
QueueNum int64 `yaml:"queue_num"`
|
||||||
Mode string `yaml:"mode"`
|
Mode string `yaml:"mode"`
|
||||||
|
Sync bool `yaml:"sync"`
|
||||||
SSL bool `yaml:"ssl"`
|
SSL bool `yaml:"ssl"`
|
||||||
CertPath string `yaml:"cert_path"`
|
CertPath string `yaml:"cert_path"`
|
||||||
KeyPath string `yaml:"key_path"`
|
KeyPath string `yaml:"key_path"`
|
||||||
|
@ -123,6 +124,7 @@ func BuildDefaultPushConf() ConfYaml {
|
||||||
conf.Core.WorkerNum = int64(runtime.NumCPU())
|
conf.Core.WorkerNum = int64(runtime.NumCPU())
|
||||||
conf.Core.QueueNum = int64(8192)
|
conf.Core.QueueNum = int64(8192)
|
||||||
conf.Core.Mode = "release"
|
conf.Core.Mode = "release"
|
||||||
|
conf.Core.Sync = false
|
||||||
conf.Core.SSL = false
|
conf.Core.SSL = false
|
||||||
conf.Core.CertPath = "cert.pem"
|
conf.Core.CertPath = "cert.pem"
|
||||||
conf.Core.KeyPath = "key.pem"
|
conf.Core.KeyPath = "key.pem"
|
||||||
|
|
|
@ -3,6 +3,7 @@ core:
|
||||||
worker_num: 0 # default worker number is runtime.NumCPU()
|
worker_num: 0 # default worker number is runtime.NumCPU()
|
||||||
queue_num: 0 # default queue number is 8192
|
queue_num: 0 # default queue number is 8192
|
||||||
max_notification: 100
|
max_notification: 100
|
||||||
|
sync: false # set true if you get http response after finish send notification.
|
||||||
mode: "release"
|
mode: "release"
|
||||||
ssl: false
|
ssl: false
|
||||||
cert_path: "cert.pem"
|
cert_path: "cert.pem"
|
||||||
|
|
|
@ -59,6 +59,7 @@ func (suite *ConfigTestSuite) TestValidateConfDefault() {
|
||||||
assert.Equal(suite.T(), int64(runtime.NumCPU()), suite.ConfGorushDefault.Core.WorkerNum)
|
assert.Equal(suite.T(), int64(runtime.NumCPU()), suite.ConfGorushDefault.Core.WorkerNum)
|
||||||
assert.Equal(suite.T(), int64(8192), suite.ConfGorushDefault.Core.QueueNum)
|
assert.Equal(suite.T(), int64(8192), suite.ConfGorushDefault.Core.QueueNum)
|
||||||
assert.Equal(suite.T(), "release", suite.ConfGorushDefault.Core.Mode)
|
assert.Equal(suite.T(), "release", suite.ConfGorushDefault.Core.Mode)
|
||||||
|
assert.Equal(suite.T(), false, suite.ConfGorushDefault.Core.Sync)
|
||||||
assert.Equal(suite.T(), false, suite.ConfGorushDefault.Core.SSL)
|
assert.Equal(suite.T(), false, suite.ConfGorushDefault.Core.SSL)
|
||||||
assert.Equal(suite.T(), "cert.pem", suite.ConfGorushDefault.Core.CertPath)
|
assert.Equal(suite.T(), "cert.pem", suite.ConfGorushDefault.Core.CertPath)
|
||||||
assert.Equal(suite.T(), "key.pem", suite.ConfGorushDefault.Core.KeyPath)
|
assert.Equal(suite.T(), "key.pem", suite.ConfGorushDefault.Core.KeyPath)
|
||||||
|
@ -118,6 +119,7 @@ func (suite *ConfigTestSuite) TestValidateConf() {
|
||||||
assert.Equal(suite.T(), int64(runtime.NumCPU()), suite.ConfGorush.Core.WorkerNum)
|
assert.Equal(suite.T(), int64(runtime.NumCPU()), suite.ConfGorush.Core.WorkerNum)
|
||||||
assert.Equal(suite.T(), int64(8192), suite.ConfGorush.Core.QueueNum)
|
assert.Equal(suite.T(), int64(8192), suite.ConfGorush.Core.QueueNum)
|
||||||
assert.Equal(suite.T(), "release", suite.ConfGorush.Core.Mode)
|
assert.Equal(suite.T(), "release", suite.ConfGorush.Core.Mode)
|
||||||
|
assert.Equal(suite.T(), false, suite.ConfGorush.Core.Sync)
|
||||||
assert.Equal(suite.T(), false, suite.ConfGorush.Core.SSL)
|
assert.Equal(suite.T(), false, suite.ConfGorush.Core.SSL)
|
||||||
assert.Equal(suite.T(), "cert.pem", suite.ConfGorush.Core.CertPath)
|
assert.Equal(suite.T(), "cert.pem", suite.ConfGorush.Core.CertPath)
|
||||||
assert.Equal(suite.T(), "key.pem", suite.ConfGorush.Core.KeyPath)
|
assert.Equal(suite.T(), "key.pem", suite.ConfGorush.Core.KeyPath)
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/google/go-gcm"
|
"github.com/google/go-gcm"
|
||||||
|
@ -63,6 +64,7 @@ type PushNotification struct {
|
||||||
Sound string `json:"sound,omitempty"`
|
Sound string `json:"sound,omitempty"`
|
||||||
Data D `json:"data,omitempty"`
|
Data D `json:"data,omitempty"`
|
||||||
Retry int `json:"retry,omitempty"`
|
Retry int `json:"retry,omitempty"`
|
||||||
|
wg *sync.WaitGroup
|
||||||
|
|
||||||
// Android
|
// Android
|
||||||
APIKey string `json:"api_key,omitempty"`
|
APIKey string `json:"api_key,omitempty"`
|
||||||
|
@ -85,6 +87,13 @@ type PushNotification struct {
|
||||||
MutableContent bool `json:"mutable-content,omitempty"`
|
MutableContent bool `json:"mutable-content,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Done decrements the WaitGroup counter.
|
||||||
|
func (p *PushNotification) Done() {
|
||||||
|
if p.wg != nil {
|
||||||
|
p.wg.Done()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// CheckMessage for check request message
|
// CheckMessage for check request message
|
||||||
func CheckMessage(req PushNotification) error {
|
func CheckMessage(req PushNotification) error {
|
||||||
var msg string
|
var msg string
|
||||||
|
@ -214,6 +223,7 @@ func startWorker() {
|
||||||
// queueNotification add notification to queue list.
|
// queueNotification add notification to queue list.
|
||||||
func queueNotification(req RequestPush) int {
|
func queueNotification(req RequestPush) int {
|
||||||
var count int
|
var count int
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
for _, notification := range req.Notifications {
|
for _, notification := range req.Notifications {
|
||||||
switch notification.Platform {
|
switch notification.Platform {
|
||||||
case PlatFormIos:
|
case PlatFormIos:
|
||||||
|
@ -225,11 +235,16 @@ func queueNotification(req RequestPush) int {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
wg.Add(1)
|
||||||
|
notification.wg = &wg
|
||||||
QueueNotification <- notification
|
QueueNotification <- notification
|
||||||
|
|
||||||
count += len(notification.Tokens)
|
count += len(notification.Tokens)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if PushConf.Core.Sync {
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
StatStorage.AddTotalCount(int64(count))
|
StatStorage.AddTotalCount(int64(count))
|
||||||
|
|
||||||
return count
|
return count
|
||||||
|
@ -351,7 +366,7 @@ func GetIOSNotification(req PushNotification) *apns.Notification {
|
||||||
// PushToIOS provide send notification to APNs server.
|
// PushToIOS provide send notification to APNs server.
|
||||||
func PushToIOS(req PushNotification) bool {
|
func PushToIOS(req PushNotification) bool {
|
||||||
LogAccess.Debug("Start push notification for iOS")
|
LogAccess.Debug("Start push notification for iOS")
|
||||||
|
defer req.Done()
|
||||||
var retryCount = 0
|
var retryCount = 0
|
||||||
var maxRetry = PushConf.Ios.MaxRetry
|
var maxRetry = PushConf.Ios.MaxRetry
|
||||||
|
|
||||||
|
@ -456,7 +471,7 @@ func GetAndroidNotification(req PushNotification) gcm.HttpMessage {
|
||||||
// PushToAndroid provide send notification to Android server.
|
// PushToAndroid provide send notification to Android server.
|
||||||
func PushToAndroid(req PushNotification) bool {
|
func PushToAndroid(req PushNotification) bool {
|
||||||
LogAccess.Debug("Start push notification for Android")
|
LogAccess.Debug("Start push notification for Android")
|
||||||
|
defer req.Done()
|
||||||
var APIKey string
|
var APIKey string
|
||||||
var retryCount = 0
|
var retryCount = 0
|
||||||
var maxRetry = PushConf.Android.MaxRetry
|
var maxRetry = PushConf.Android.MaxRetry
|
||||||
|
@ -486,7 +501,6 @@ Retry:
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// GCM server error
|
// GCM server error
|
||||||
LogError.Error("GCM server error: " + err.Error())
|
LogError.Error("GCM server error: " + err.Error())
|
||||||
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -604,6 +604,42 @@ func TestDisabledAndroidNotifications(t *testing.T) {
|
||||||
assert.Equal(t, 1, count)
|
assert.Equal(t, 1, count)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSyncModeForNotifications(t *testing.T) {
|
||||||
|
PushConf = config.BuildDefaultPushConf()
|
||||||
|
|
||||||
|
PushConf.Ios.Enabled = true
|
||||||
|
PushConf.Ios.KeyPath = "../certificate/certificate-valid.pem"
|
||||||
|
InitAPNSClient()
|
||||||
|
|
||||||
|
PushConf.Android.Enabled = true
|
||||||
|
PushConf.Android.APIKey = os.Getenv("ANDROID_API_KEY")
|
||||||
|
|
||||||
|
// enable sync mode
|
||||||
|
PushConf.Core.Sync = true
|
||||||
|
|
||||||
|
androidToken := os.Getenv("ANDROID_TEST_TOKEN")
|
||||||
|
|
||||||
|
req := RequestPush{
|
||||||
|
Notifications: []PushNotification{
|
||||||
|
//ios
|
||||||
|
{
|
||||||
|
Tokens: []string{"11aa01229f15f0f0c52029d8cf8cd0aeaf2365fe4cebc4af26cd6d76b7919ef7"},
|
||||||
|
Platform: PlatFormIos,
|
||||||
|
Message: "Welcome",
|
||||||
|
},
|
||||||
|
// android
|
||||||
|
{
|
||||||
|
Tokens: []string{androidToken, "bbbbb"},
|
||||||
|
Platform: PlatFormAndroid,
|
||||||
|
Message: "Welcome",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
count := queueNotification(req)
|
||||||
|
assert.Equal(t, 3, count)
|
||||||
|
}
|
||||||
|
|
||||||
func TestDisabledIosNotifications(t *testing.T) {
|
func TestDisabledIosNotifications(t *testing.T) {
|
||||||
PushConf = config.BuildDefaultPushConf()
|
PushConf = config.BuildDefaultPushConf()
|
||||||
|
|
||||||
|
|
|
@ -56,11 +56,11 @@ func pushHandler(c *gin.Context) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// queue notification.
|
counts := queueNotification(form)
|
||||||
go queueNotification(form)
|
|
||||||
|
|
||||||
c.JSON(http.StatusOK, gin.H{
|
c.JSON(http.StatusOK, gin.H{
|
||||||
"success": "ok",
|
"success": "ok",
|
||||||
|
"counts": counts,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue