support notification queue and worker
Signed-off-by: Bo-Yi Wu <appleboy.tw@gmail.com>
This commit is contained in:
parent
8645e17060
commit
6efe23923c
|
@ -114,5 +114,6 @@ func main() {
|
|||
}
|
||||
|
||||
gorush.InitAPNSClient()
|
||||
gorush.InitWorkers(gorush.PushConf.Core.WorkerNum, gorush.PushConf.Core.QueueNum)
|
||||
gorush.RunHTTPServer()
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package gorush
|
|||
import (
|
||||
"gopkg.in/yaml.v2"
|
||||
"io/ioutil"
|
||||
"runtime"
|
||||
)
|
||||
|
||||
// ConfYaml is config structure.
|
||||
|
@ -18,6 +19,8 @@ type ConfYaml struct {
|
|||
type SectionCore struct {
|
||||
Port string `yaml:"port"`
|
||||
MaxNotification int `yaml:"max_notification"`
|
||||
WorkerNum int `yaml:"worker_num"`
|
||||
QueueNum int `yaml:"queue_num"`
|
||||
Mode string `yaml:"mode"`
|
||||
SSL bool `yaml:"ssl"`
|
||||
CertPath string `yaml:"cert_path"`
|
||||
|
@ -59,6 +62,8 @@ func BuildDefaultPushConf() ConfYaml {
|
|||
|
||||
// Core
|
||||
conf.Core.Port = "8088"
|
||||
conf.Core.WorkerNum = runtime.NumCPU()
|
||||
conf.Core.QueueNum = 8192
|
||||
conf.Core.Mode = "release"
|
||||
conf.Core.SSL = false
|
||||
conf.Core.CertPath = "cert.pem"
|
||||
|
|
|
@ -9,6 +9,8 @@ import (
|
|||
var (
|
||||
// PushConf is gorush config
|
||||
PushConf ConfYaml
|
||||
// QueueNotification is chan type
|
||||
QueueNotification chan PushNotification
|
||||
// CertificatePemIos is ios certificate file
|
||||
CertificatePemIos tls.Certificate
|
||||
// ApnsClient is apns client
|
||||
|
|
|
@ -121,8 +121,29 @@ func InitAPNSClient() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// SendNotification provide send all push request.
|
||||
func SendNotification(req RequestPush) int {
|
||||
// InitWorkers for initialize all workers.
|
||||
func InitWorkers(workerNum, queueNum int) {
|
||||
LogAccess.Debug("worker number is ", workerNum, ", queue number is ", queueNum)
|
||||
QueueNotification = make(chan PushNotification, queueNum)
|
||||
for i := 0; i < workerNum; i++ {
|
||||
go startWorker()
|
||||
}
|
||||
}
|
||||
|
||||
func startWorker() {
|
||||
for {
|
||||
notification := <-QueueNotification
|
||||
switch notification.Platform {
|
||||
case PlatFormIos:
|
||||
PushToIOS(notification)
|
||||
case PlatFormAndroid:
|
||||
PushToAndroid(notification)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// queueNotification add notification to queue list.
|
||||
func queueNotification(req RequestPush) int {
|
||||
var count int
|
||||
for _, notification := range req.Notifications {
|
||||
switch notification.Platform {
|
||||
|
@ -130,17 +151,14 @@ func SendNotification(req RequestPush) int {
|
|||
if !PushConf.Ios.Enabled {
|
||||
continue
|
||||
}
|
||||
|
||||
count++
|
||||
go PushToIOS(notification)
|
||||
case PlatFormAndroid:
|
||||
if !PushConf.Android.Enabled {
|
||||
continue
|
||||
}
|
||||
}
|
||||
QueueNotification <- notification
|
||||
|
||||
count++
|
||||
go PushToAndroid(notification)
|
||||
}
|
||||
}
|
||||
|
||||
return count
|
||||
|
@ -242,7 +260,7 @@ func GetIOSNotification(req PushNotification) *apns.Notification {
|
|||
}
|
||||
|
||||
// PushToIOS provide send notification to APNs server.
|
||||
func PushToIOS(req PushNotification) bool {
|
||||
func PushToIOS(req PushNotification) {
|
||||
|
||||
notification := GetIOSNotification(req)
|
||||
|
||||
|
@ -256,7 +274,7 @@ func PushToIOS(req PushNotification) bool {
|
|||
// apns server error
|
||||
LogPush(FailedPush, token, req, err)
|
||||
|
||||
return false
|
||||
continue
|
||||
}
|
||||
|
||||
if res.StatusCode != 200 {
|
||||
|
@ -264,15 +282,13 @@ func PushToIOS(req PushNotification) bool {
|
|||
// ref: https://github.com/sideshow/apns2/blob/master/response.go#L14-L65
|
||||
LogPush(FailedPush, token, req, errors.New(res.Reason))
|
||||
|
||||
return false
|
||||
continue
|
||||
}
|
||||
|
||||
if res.Sent() {
|
||||
LogPush(SucceededPush, token, req, nil)
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// GetAndroidNotification use for define Android notificaiton.
|
||||
|
|
|
@ -229,8 +229,8 @@ func TestPushToIOS(t *testing.T) {
|
|||
Message: "Welcome",
|
||||
}
|
||||
|
||||
success := PushToIOS(req)
|
||||
assert.False(t, success)
|
||||
PushToIOS(req)
|
||||
// assert.False(t, success)
|
||||
}
|
||||
|
||||
func TestPushToAndroidWrongAPIKey(t *testing.T) {
|
||||
|
@ -326,6 +326,8 @@ func TestOverwriteAndroidAPIKey(t *testing.T) {
|
|||
func TestSenMultipleNotifications(t *testing.T) {
|
||||
PushConf = BuildDefaultPushConf()
|
||||
|
||||
InitWorkers(2, 2)
|
||||
|
||||
PushConf.Ios.Enabled = true
|
||||
PushConf.Ios.PemKeyPath = "../certificate/certificate-valid.pem"
|
||||
InitAPNSClient()
|
||||
|
@ -352,7 +354,7 @@ func TestSenMultipleNotifications(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
count := SendNotification(req)
|
||||
count := queueNotification(req)
|
||||
assert.Equal(t, 2, count)
|
||||
}
|
||||
|
||||
|
@ -385,7 +387,7 @@ func TestDisabledAndroidNotifications(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
count := SendNotification(req)
|
||||
count := queueNotification(req)
|
||||
assert.Equal(t, 1, count)
|
||||
}
|
||||
|
||||
|
@ -418,7 +420,7 @@ func TestDisabledIosNotifications(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
count := SendNotification(req)
|
||||
count := queueNotification(req)
|
||||
assert.Equal(t, 1, count)
|
||||
}
|
||||
|
||||
|
|
|
@ -47,8 +47,8 @@ func pushHandler(c *gin.Context) {
|
|||
return
|
||||
}
|
||||
|
||||
// process notification.
|
||||
go SendNotification(form)
|
||||
// queue notification.
|
||||
go queueNotification(form)
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"success": "ok",
|
||||
|
|
Loading…
Reference in New Issue