diff --git a/.travis.yml b/.travis.yml index a5097ad..cf8ba4b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,6 +7,7 @@ go: - 1.4 - 1.5 - 1.6 + - 1.6.1 - tip services: diff --git a/README.md b/README.md index c1e3d8d..b4d9078 100644 --- a/README.md +++ b/README.md @@ -18,12 +18,15 @@ A push notification server using [Gin](https://github.com/gin-gonic/gin) framewo * Support Web API to send push notification. * Support zero downtime restarts for go servers using [endless](https://github.com/fvbock/endless). * Support [HTTP/2](https://http2.github.io/) or HTTP/1.1 protocol. +* Support notification queue and multiple workers. See the [YAML config example](config/config.yml): ```yaml core: port: "8088" + worker_num: 8 + queue_num: 8192 max_notification: 100 mode: "release" ssl: false diff --git a/config/config.yml b/config/config.yml index 66eeafe..b512f2d 100644 --- a/config/config.yml +++ b/config/config.yml @@ -1,5 +1,7 @@ core: port: "8088" + worker_num: 8 + queue_num: 8192 max_notification: 100 mode: "release" ssl: false diff --git a/gorush.go b/gorush.go index a4ccaab..3bdaf46 100644 --- a/gorush.go +++ b/gorush.go @@ -114,5 +114,6 @@ func main() { } gorush.InitAPNSClient() + gorush.InitWorkers(gorush.PushConf.Core.WorkerNum, gorush.PushConf.Core.QueueNum) gorush.RunHTTPServer() } diff --git a/gorush/config.go b/gorush/config.go index e78fd87..9298e34 100644 --- a/gorush/config.go +++ b/gorush/config.go @@ -3,6 +3,7 @@ package gorush import ( "gopkg.in/yaml.v2" "io/ioutil" + "runtime" ) // ConfYaml is config structure. @@ -18,6 +19,8 @@ type ConfYaml struct { type SectionCore struct { Port string `yaml:"port"` MaxNotification int `yaml:"max_notification"` + WorkerNum int `yaml:"worker_num"` + QueueNum int `yaml:"queue_num"` Mode string `yaml:"mode"` SSL bool `yaml:"ssl"` CertPath string `yaml:"cert_path"` @@ -59,6 +62,8 @@ func BuildDefaultPushConf() ConfYaml { // Core conf.Core.Port = "8088" + conf.Core.WorkerNum = runtime.NumCPU() + conf.Core.QueueNum = 8192 conf.Core.Mode = "release" conf.Core.SSL = false conf.Core.CertPath = "cert.pem" diff --git a/gorush/global.go b/gorush/global.go index 1af3e26..7688bc4 100644 --- a/gorush/global.go +++ b/gorush/global.go @@ -9,6 +9,8 @@ import ( var ( // PushConf is gorush config PushConf ConfYaml + // QueueNotification is chan type + QueueNotification chan PushNotification // CertificatePemIos is ios certificate file CertificatePemIos tls.Certificate // ApnsClient is apns client diff --git a/gorush/notification.go b/gorush/notification.go index 3a9ee9d..820e30d 100644 --- a/gorush/notification.go +++ b/gorush/notification.go @@ -121,8 +121,29 @@ func InitAPNSClient() error { return nil } -// SendNotification provide send all push request. -func SendNotification(req RequestPush) int { +// InitWorkers for initialize all workers. +func InitWorkers(workerNum, queueNum int) { + LogAccess.Debug("worker number is ", workerNum, ", queue number is ", queueNum) + QueueNotification = make(chan PushNotification, queueNum) + for i := 0; i < workerNum; i++ { + go startWorker() + } +} + +func startWorker() { + for { + notification := <-QueueNotification + switch notification.Platform { + case PlatFormIos: + PushToIOS(notification) + case PlatFormAndroid: + PushToAndroid(notification) + } + } +} + +// queueNotification add notification to queue list. +func queueNotification(req RequestPush) int { var count int for _, notification := range req.Notifications { switch notification.Platform { @@ -130,17 +151,14 @@ func SendNotification(req RequestPush) int { if !PushConf.Ios.Enabled { continue } - - count++ - go PushToIOS(notification) case PlatFormAndroid: if !PushConf.Android.Enabled { continue } - - count++ - go PushToAndroid(notification) } + QueueNotification <- notification + + count++ } return count @@ -242,7 +260,7 @@ func GetIOSNotification(req PushNotification) *apns.Notification { } // PushToIOS provide send notification to APNs server. -func PushToIOS(req PushNotification) bool { +func PushToIOS(req PushNotification) { notification := GetIOSNotification(req) @@ -256,7 +274,7 @@ func PushToIOS(req PushNotification) bool { // apns server error LogPush(FailedPush, token, req, err) - return false + continue } if res.StatusCode != 200 { @@ -264,15 +282,13 @@ func PushToIOS(req PushNotification) bool { // ref: https://github.com/sideshow/apns2/blob/master/response.go#L14-L65 LogPush(FailedPush, token, req, errors.New(res.Reason)) - return false + continue } if res.Sent() { LogPush(SucceededPush, token, req, nil) } } - - return true } // GetAndroidNotification use for define Android notificaiton. diff --git a/gorush/notification_test.go b/gorush/notification_test.go index fe199ce..70e7be6 100644 --- a/gorush/notification_test.go +++ b/gorush/notification_test.go @@ -229,8 +229,8 @@ func TestPushToIOS(t *testing.T) { Message: "Welcome", } - success := PushToIOS(req) - assert.False(t, success) + PushToIOS(req) + // assert.False(t, success) } func TestPushToAndroidWrongAPIKey(t *testing.T) { @@ -326,6 +326,8 @@ func TestOverwriteAndroidAPIKey(t *testing.T) { func TestSenMultipleNotifications(t *testing.T) { PushConf = BuildDefaultPushConf() + InitWorkers(2, 2) + PushConf.Ios.Enabled = true PushConf.Ios.PemKeyPath = "../certificate/certificate-valid.pem" InitAPNSClient() @@ -352,7 +354,7 @@ func TestSenMultipleNotifications(t *testing.T) { }, } - count := SendNotification(req) + count := queueNotification(req) assert.Equal(t, 2, count) } @@ -385,7 +387,7 @@ func TestDisabledAndroidNotifications(t *testing.T) { }, } - count := SendNotification(req) + count := queueNotification(req) assert.Equal(t, 1, count) } @@ -418,7 +420,7 @@ func TestDisabledIosNotifications(t *testing.T) { }, } - count := SendNotification(req) + count := queueNotification(req) assert.Equal(t, 1, count) } diff --git a/gorush/server.go b/gorush/server.go index d6eba77..3a41f84 100644 --- a/gorush/server.go +++ b/gorush/server.go @@ -47,8 +47,8 @@ func pushHandler(c *gin.Context) { return } - // process notification. - go SendNotification(form) + // queue notification. + go queueNotification(form) c.JSON(http.StatusOK, gin.H{ "success": "ok",