From 2d2a8a01105ee4b82e5753d13545d447699ac06a Mon Sep 17 00:00:00 2001 From: Bo-Yi Wu Date: Tue, 4 Feb 2020 13:27:27 +0800 Subject: [PATCH] feat(worker): support graceful shutdown (#459) * feat(worker): support graceful shutdown notifications workers and queue have been sent to APNs/FCM before shutdown a push notification. send buffered channel to signal.Notify to avoid blocking see: golang/lint#175 fixed: https://github.com/appleboy/gorush/issues/441 Signed-off-by: Bo-Yi Wu --- Makefile | 2 +- README.md | 1 + docker-compose.yml | 15 ++++++++++ gorush/notification_fcm_test.go | 7 ++++- gorush/server_lambda.go | 2 +- gorush/server_normal.go | 2 +- gorush/status.go | 3 +- gorush/worker.go | 13 +++++---- main.go | 50 +++++++++++++++++++++++++++------ rpc/server.go | 2 +- 10 files changed, 76 insertions(+), 21 deletions(-) create mode 100644 docker-compose.yml diff --git a/Makefile b/Makefile index 8f97827..c672d13 100644 --- a/Makefile +++ b/Makefile @@ -137,7 +137,7 @@ build_linux_lambda: CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -a -tags 'lambda' -ldflags '$(EXTLDFLAGS)-s -w $(LDFLAGS)' -o release/linux/lambda/$(DEPLOY_IMAGE) docker_image: - docker build -t $(DEPLOY_ACCOUNT)/$(DEPLOY_IMAGE) -f Dockerfile . + docker build -t $(DEPLOY_ACCOUNT)/$(DEPLOY_IMAGE) -f ./docker/Dockerfile.linux.amd64 . docker_release: docker_image diff --git a/README.md b/README.md index b7b0124..a38fad0 100644 --- a/README.md +++ b/README.md @@ -83,6 +83,7 @@ A push notification micro server using [Gin](https://github.com/gin-gonic/gin) f - Support install TLS certificates from [Let's Encrypt](https://letsencrypt.org/) automatically. - Support send notification through [RPC](https://en.wikipedia.org/wiki/Remote_procedure_call) protocol, we use [gRPC](https://grpc.io/) as default framework. - Support running in Docker, [Kubernetes](https://kubernetes.io/) or [AWS Lambda](https://aws.amazon.com/lambda) ([Native Support in Golang](https://aws.amazon.com/blogs/compute/announcing-go-support-for-aws-lambda/)) +- Support graceful shutdown that notifications workers and queue have are sent to APNs/FCM before a push notification service is shutdown. See the default [YAML config example](config/config.yml): diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..4af2909 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,15 @@ +version: '3' + +services: + gorush: + image: appleboy/gorush + restart: always + ports: + - "8080:8080" + - "9000:9000" + logging: + options: + max-size: "100k" + max-file: "3" + environment: + - GORUSH_CORE_QUEUE_NUM=512 diff --git a/gorush/notification_fcm_test.go b/gorush/notification_fcm_test.go index b999625..23494af 100644 --- a/gorush/notification_fcm_test.go +++ b/gorush/notification_fcm_test.go @@ -1,8 +1,10 @@ package gorush import ( + "context" "log" "os" + "sync" "testing" "github.com/appleboy/go-fcm" @@ -16,7 +18,10 @@ func init() { log.Fatal(err) } - InitWorkers(PushConf.Core.WorkerNum, PushConf.Core.QueueNum) + ctx := context.Background() + wg := &sync.WaitGroup{} + wg.Add(int(PushConf.Core.WorkerNum)) + InitWorkers(ctx, wg, PushConf.Core.WorkerNum, PushConf.Core.QueueNum) if err := InitAppStatus(); err != nil { log.Fatal(err) diff --git a/gorush/server_lambda.go b/gorush/server_lambda.go index b079753..852f243 100644 --- a/gorush/server_lambda.go +++ b/gorush/server_lambda.go @@ -13,7 +13,7 @@ func RunHTTPServer() error { return nil } - LogAccess.Debug("HTTPD server is running on " + PushConf.Core.Port + " port.") + LogAccess.Info("HTTPD server is running on " + PushConf.Core.Port + " port.") return gateway.ListenAndServe(PushConf.Core.Address+":"+PushConf.Core.Port, routerEngine()) } diff --git a/gorush/server_normal.go b/gorush/server_normal.go index a54a925..a96a61c 100644 --- a/gorush/server_normal.go +++ b/gorush/server_normal.go @@ -21,7 +21,7 @@ func RunHTTPServer() (err error) { Handler: routerEngine(), } - LogAccess.Debug("HTTPD server is running on " + PushConf.Core.Port + " port.") + LogAccess.Info("HTTPD server is running on " + PushConf.Core.Port + " port.") if PushConf.Core.AutoTLS.Enabled { return startServer(autoTLSServer()) } else if PushConf.Core.SSL { diff --git a/gorush/status.go b/gorush/status.go index 4ed8b00..7078d52 100644 --- a/gorush/status.go +++ b/gorush/status.go @@ -10,6 +10,7 @@ import ( "github.com/appleboy/gorush/storage/leveldb" "github.com/appleboy/gorush/storage/memory" "github.com/appleboy/gorush/storage/redis" + "github.com/gin-gonic/gin" "github.com/thoas/stats" ) @@ -41,7 +42,7 @@ type IosStatus struct { // InitAppStatus for initialize app status func InitAppStatus() error { - LogAccess.Debug("Init App Status Engine as ", PushConf.Stat.Engine) + LogAccess.Info("Init App Status Engine as ", PushConf.Stat.Engine) switch PushConf.Stat.Engine { case "memory": StatStorage = memory.New() diff --git a/gorush/worker.go b/gorush/worker.go index 85de59f..4ebf4c1 100644 --- a/gorush/worker.go +++ b/gorush/worker.go @@ -7,11 +7,11 @@ import ( ) // InitWorkers for initialize all workers. -func InitWorkers(workerNum int64, queueNum int64) { - LogAccess.Debug("worker number is ", workerNum, ", queue number is ", queueNum) +func InitWorkers(ctx context.Context, wg *sync.WaitGroup, workerNum int64, queueNum int64) { + LogAccess.Info("worker number is ", workerNum, ", queue number is ", queueNum) QueueNotification = make(chan PushNotification, queueNum) for i := int64(0); i < workerNum; i++ { - go startWorker() + go startWorker(ctx, wg, i) } } @@ -33,11 +33,12 @@ func SendNotification(req PushNotification) { } } -func startWorker() { - for { - notification := <-QueueNotification +func startWorker(ctx context.Context, wg *sync.WaitGroup, num int64) { + defer wg.Done() + for notification := range QueueNotification { SendNotification(notification) } + LogAccess.Info("closed the worker num ", num) } // markFailedNotification adds failure logs for all tokens in push notification diff --git a/main.go b/main.go index 1f40772..56371eb 100644 --- a/main.go +++ b/main.go @@ -1,14 +1,18 @@ package main import ( + "context" "flag" "fmt" "log" "net" "net/http" "os" + "os/signal" "path/filepath" "strconv" + "sync" + "syscall" "time" "github.com/appleboy/gorush/config" @@ -18,6 +22,24 @@ import ( "golang.org/x/sync/errgroup" ) +func withContextFunc(ctx context.Context, f func()) context.Context { + ctx, cancel := context.WithCancel(ctx) + go func() { + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) + defer signal.Stop(c) + + select { + case <-ctx.Done(): + case <-c: + cancel() + f() + } + }() + + return ctx +} + func main() { opts := config.ConfYaml{} @@ -223,20 +245,30 @@ func main() { } if err = gorush.InitAppStatus(); err != nil { - return + gorush.LogError.Fatal(err) } - gorush.InitWorkers(gorush.PushConf.Core.WorkerNum, gorush.PushConf.Core.QueueNum) + wg := &sync.WaitGroup{} + wg.Add(int(gorush.PushConf.Core.WorkerNum)) + ctx := withContextFunc(context.Background(), func() { + gorush.LogAccess.Info("close the notification queue channel") + close(gorush.QueueNotification) + wg.Wait() + gorush.LogAccess.Info("the notification queue has been clear") + }) + + gorush.InitWorkers(ctx, wg, gorush.PushConf.Core.WorkerNum, gorush.PushConf.Core.QueueNum) + + if err = gorush.InitAPNSClient(); err != nil { + gorush.LogError.Fatal(err) + } + + if _, err = gorush.InitFCMClient(gorush.PushConf.Android.APIKey); err != nil { + gorush.LogError.Fatal(err) + } var g errgroup.Group - g.Go(gorush.InitAPNSClient) - - g.Go(func() error { - _, err := gorush.InitFCMClient(gorush.PushConf.Android.APIKey) - return err - }) - g.Go(gorush.RunHTTPServer) // Run httpd server g.Go(rpc.RunGRPCServer) // Run gRPC internal server diff --git a/rpc/server.go b/rpc/server.go index 8c3f18e..efc7495 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -100,7 +100,7 @@ func (s *Server) Send(ctx context.Context, in *proto.NotificationRequest) (*prot // RunGRPCServer run gorush grpc server func RunGRPCServer() error { if !gorush.PushConf.GRPC.Enabled { - gorush.LogAccess.Debug("gRPC server is disabled.") + gorush.LogAccess.Info("gRPC server is disabled.") return nil }