Merge pull request #53 from appleboy/worker

Support worker and queue.
This commit is contained in:
Bo-Yi Wu 2016-04-14 10:17:37 -05:00
commit f7e801e44d
9 changed files with 52 additions and 20 deletions

View File

@ -7,6 +7,7 @@ go:
- 1.4 - 1.4
- 1.5 - 1.5
- 1.6 - 1.6
- 1.6.1
- tip - tip
services: services:

View File

@ -18,12 +18,15 @@ A push notification server using [Gin](https://github.com/gin-gonic/gin) framewo
* Support Web API to send push notification. * Support Web API to send push notification.
* Support zero downtime restarts for go servers using [endless](https://github.com/fvbock/endless). * Support zero downtime restarts for go servers using [endless](https://github.com/fvbock/endless).
* Support [HTTP/2](https://http2.github.io/) or HTTP/1.1 protocol. * Support [HTTP/2](https://http2.github.io/) or HTTP/1.1 protocol.
* Support notification queue and multiple workers.
See the [YAML config example](config/config.yml): See the [YAML config example](config/config.yml):
```yaml ```yaml
core: core:
port: "8088" port: "8088"
worker_num: 8
queue_num: 8192
max_notification: 100 max_notification: 100
mode: "release" mode: "release"
ssl: false ssl: false

View File

@ -1,5 +1,7 @@
core: core:
port: "8088" port: "8088"
worker_num: 8
queue_num: 8192
max_notification: 100 max_notification: 100
mode: "release" mode: "release"
ssl: false ssl: false

View File

@ -114,5 +114,6 @@ func main() {
} }
gorush.InitAPNSClient() gorush.InitAPNSClient()
gorush.InitWorkers(gorush.PushConf.Core.WorkerNum, gorush.PushConf.Core.QueueNum)
gorush.RunHTTPServer() gorush.RunHTTPServer()
} }

View File

@ -3,6 +3,7 @@ package gorush
import ( import (
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"
"io/ioutil" "io/ioutil"
"runtime"
) )
// ConfYaml is config structure. // ConfYaml is config structure.
@ -18,6 +19,8 @@ type ConfYaml struct {
type SectionCore struct { type SectionCore struct {
Port string `yaml:"port"` Port string `yaml:"port"`
MaxNotification int `yaml:"max_notification"` MaxNotification int `yaml:"max_notification"`
WorkerNum int `yaml:"worker_num"`
QueueNum int `yaml:"queue_num"`
Mode string `yaml:"mode"` Mode string `yaml:"mode"`
SSL bool `yaml:"ssl"` SSL bool `yaml:"ssl"`
CertPath string `yaml:"cert_path"` CertPath string `yaml:"cert_path"`
@ -59,6 +62,8 @@ func BuildDefaultPushConf() ConfYaml {
// Core // Core
conf.Core.Port = "8088" conf.Core.Port = "8088"
conf.Core.WorkerNum = runtime.NumCPU()
conf.Core.QueueNum = 8192
conf.Core.Mode = "release" conf.Core.Mode = "release"
conf.Core.SSL = false conf.Core.SSL = false
conf.Core.CertPath = "cert.pem" conf.Core.CertPath = "cert.pem"

View File

@ -9,6 +9,8 @@ import (
var ( var (
// PushConf is gorush config // PushConf is gorush config
PushConf ConfYaml PushConf ConfYaml
// QueueNotification is chan type
QueueNotification chan PushNotification
// CertificatePemIos is ios certificate file // CertificatePemIos is ios certificate file
CertificatePemIos tls.Certificate CertificatePemIos tls.Certificate
// ApnsClient is apns client // ApnsClient is apns client

View File

@ -121,8 +121,29 @@ func InitAPNSClient() error {
return nil return nil
} }
// SendNotification provide send all push request. // InitWorkers for initialize all workers.
func SendNotification(req RequestPush) int { func InitWorkers(workerNum, queueNum int) {
LogAccess.Debug("worker number is ", workerNum, ", queue number is ", queueNum)
QueueNotification = make(chan PushNotification, queueNum)
for i := 0; i < workerNum; i++ {
go startWorker()
}
}
func startWorker() {
for {
notification := <-QueueNotification
switch notification.Platform {
case PlatFormIos:
PushToIOS(notification)
case PlatFormAndroid:
PushToAndroid(notification)
}
}
}
// queueNotification add notification to queue list.
func queueNotification(req RequestPush) int {
var count int var count int
for _, notification := range req.Notifications { for _, notification := range req.Notifications {
switch notification.Platform { switch notification.Platform {
@ -130,17 +151,14 @@ func SendNotification(req RequestPush) int {
if !PushConf.Ios.Enabled { if !PushConf.Ios.Enabled {
continue continue
} }
count++
go PushToIOS(notification)
case PlatFormAndroid: case PlatFormAndroid:
if !PushConf.Android.Enabled { if !PushConf.Android.Enabled {
continue continue
} }
}
QueueNotification <- notification
count++ count++
go PushToAndroid(notification)
}
} }
return count return count
@ -242,7 +260,7 @@ func GetIOSNotification(req PushNotification) *apns.Notification {
} }
// PushToIOS provide send notification to APNs server. // PushToIOS provide send notification to APNs server.
func PushToIOS(req PushNotification) bool { func PushToIOS(req PushNotification) {
notification := GetIOSNotification(req) notification := GetIOSNotification(req)
@ -256,7 +274,7 @@ func PushToIOS(req PushNotification) bool {
// apns server error // apns server error
LogPush(FailedPush, token, req, err) LogPush(FailedPush, token, req, err)
return false continue
} }
if res.StatusCode != 200 { if res.StatusCode != 200 {
@ -264,15 +282,13 @@ func PushToIOS(req PushNotification) bool {
// ref: https://github.com/sideshow/apns2/blob/master/response.go#L14-L65 // ref: https://github.com/sideshow/apns2/blob/master/response.go#L14-L65
LogPush(FailedPush, token, req, errors.New(res.Reason)) LogPush(FailedPush, token, req, errors.New(res.Reason))
return false continue
} }
if res.Sent() { if res.Sent() {
LogPush(SucceededPush, token, req, nil) LogPush(SucceededPush, token, req, nil)
} }
} }
return true
} }
// GetAndroidNotification use for define Android notificaiton. // GetAndroidNotification use for define Android notificaiton.

View File

@ -229,8 +229,8 @@ func TestPushToIOS(t *testing.T) {
Message: "Welcome", Message: "Welcome",
} }
success := PushToIOS(req) PushToIOS(req)
assert.False(t, success) // assert.False(t, success)
} }
func TestPushToAndroidWrongAPIKey(t *testing.T) { func TestPushToAndroidWrongAPIKey(t *testing.T) {
@ -326,6 +326,8 @@ func TestOverwriteAndroidAPIKey(t *testing.T) {
func TestSenMultipleNotifications(t *testing.T) { func TestSenMultipleNotifications(t *testing.T) {
PushConf = BuildDefaultPushConf() PushConf = BuildDefaultPushConf()
InitWorkers(2, 2)
PushConf.Ios.Enabled = true PushConf.Ios.Enabled = true
PushConf.Ios.PemKeyPath = "../certificate/certificate-valid.pem" PushConf.Ios.PemKeyPath = "../certificate/certificate-valid.pem"
InitAPNSClient() InitAPNSClient()
@ -352,7 +354,7 @@ func TestSenMultipleNotifications(t *testing.T) {
}, },
} }
count := SendNotification(req) count := queueNotification(req)
assert.Equal(t, 2, count) assert.Equal(t, 2, count)
} }
@ -385,7 +387,7 @@ func TestDisabledAndroidNotifications(t *testing.T) {
}, },
} }
count := SendNotification(req) count := queueNotification(req)
assert.Equal(t, 1, count) assert.Equal(t, 1, count)
} }
@ -418,7 +420,7 @@ func TestDisabledIosNotifications(t *testing.T) {
}, },
} }
count := SendNotification(req) count := queueNotification(req)
assert.Equal(t, 1, count) assert.Equal(t, 1, count)
} }

View File

@ -47,8 +47,8 @@ func pushHandler(c *gin.Context) {
return return
} }
// process notification. // queue notification.
go SendNotification(form) go queueNotification(form)
c.JSON(http.StatusOK, gin.H{ c.JSON(http.StatusOK, gin.H{
"success": "ok", "success": "ok",