feat(worker): support graceful shutdown (#459)
* feat(worker): support graceful shutdown notifications workers and queue have been sent to APNs/FCM before shutdown a push notification. send buffered channel to signal.Notify to avoid blocking see: golang/lint#175 fixed: https://github.com/appleboy/gorush/issues/441 Signed-off-by: Bo-Yi Wu <appleboy.tw@gmail.com>
This commit is contained in:
		
							parent
							
								
									bcf1c0cd03
								
							
						
					
					
						commit
						2d2a8a0110
					
				
							
								
								
									
										2
									
								
								Makefile
								
								
								
								
							
							
						
						
									
										2
									
								
								Makefile
								
								
								
								
							| 
						 | 
				
			
			@ -137,7 +137,7 @@ build_linux_lambda:
 | 
			
		|||
	CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -a -tags 'lambda' -ldflags '$(EXTLDFLAGS)-s -w $(LDFLAGS)' -o release/linux/lambda/$(DEPLOY_IMAGE)
 | 
			
		||||
 | 
			
		||||
docker_image:
 | 
			
		||||
	docker build -t $(DEPLOY_ACCOUNT)/$(DEPLOY_IMAGE) -f Dockerfile .
 | 
			
		||||
	docker build -t $(DEPLOY_ACCOUNT)/$(DEPLOY_IMAGE) -f ./docker/Dockerfile.linux.amd64 .
 | 
			
		||||
 | 
			
		||||
docker_release: docker_image
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -83,6 +83,7 @@ A push notification micro server using [Gin](https://github.com/gin-gonic/gin) f
 | 
			
		|||
- Support install TLS certificates from [Let's Encrypt](https://letsencrypt.org/) automatically.
 | 
			
		||||
- Support send notification through [RPC](https://en.wikipedia.org/wiki/Remote_procedure_call) protocol, we use [gRPC](https://grpc.io/) as default framework.
 | 
			
		||||
- Support running in Docker, [Kubernetes](https://kubernetes.io/) or [AWS Lambda](https://aws.amazon.com/lambda) ([Native Support in Golang](https://aws.amazon.com/blogs/compute/announcing-go-support-for-aws-lambda/))
 | 
			
		||||
- Support graceful shutdown that notifications workers and queue have are sent to APNs/FCM before a push notification service is shutdown.
 | 
			
		||||
 | 
			
		||||
See the default [YAML config example](config/config.yml):
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,15 @@
 | 
			
		|||
version: '3'
 | 
			
		||||
 | 
			
		||||
services:
 | 
			
		||||
  gorush:
 | 
			
		||||
    image: appleboy/gorush
 | 
			
		||||
    restart: always
 | 
			
		||||
    ports:
 | 
			
		||||
      - "8080:8080"
 | 
			
		||||
      - "9000:9000"
 | 
			
		||||
    logging:
 | 
			
		||||
      options:
 | 
			
		||||
        max-size: "100k"
 | 
			
		||||
        max-file: "3"
 | 
			
		||||
    environment:
 | 
			
		||||
      - GORUSH_CORE_QUEUE_NUM=512
 | 
			
		||||
| 
						 | 
				
			
			@ -1,8 +1,10 @@
 | 
			
		|||
package gorush
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"log"
 | 
			
		||||
	"os"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"testing"
 | 
			
		||||
 | 
			
		||||
	"github.com/appleboy/go-fcm"
 | 
			
		||||
| 
						 | 
				
			
			@ -16,7 +18,10 @@ func init() {
 | 
			
		|||
		log.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	InitWorkers(PushConf.Core.WorkerNum, PushConf.Core.QueueNum)
 | 
			
		||||
	ctx := context.Background()
 | 
			
		||||
	wg := &sync.WaitGroup{}
 | 
			
		||||
	wg.Add(int(PushConf.Core.WorkerNum))
 | 
			
		||||
	InitWorkers(ctx, wg, PushConf.Core.WorkerNum, PushConf.Core.QueueNum)
 | 
			
		||||
 | 
			
		||||
	if err := InitAppStatus(); err != nil {
 | 
			
		||||
		log.Fatal(err)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -13,7 +13,7 @@ func RunHTTPServer() error {
 | 
			
		|||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	LogAccess.Debug("HTTPD server is running on " + PushConf.Core.Port + " port.")
 | 
			
		||||
	LogAccess.Info("HTTPD server is running on " + PushConf.Core.Port + " port.")
 | 
			
		||||
 | 
			
		||||
	return gateway.ListenAndServe(PushConf.Core.Address+":"+PushConf.Core.Port, routerEngine())
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -21,7 +21,7 @@ func RunHTTPServer() (err error) {
 | 
			
		|||
		Handler: routerEngine(),
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	LogAccess.Debug("HTTPD server is running on " + PushConf.Core.Port + " port.")
 | 
			
		||||
	LogAccess.Info("HTTPD server is running on " + PushConf.Core.Port + " port.")
 | 
			
		||||
	if PushConf.Core.AutoTLS.Enabled {
 | 
			
		||||
		return startServer(autoTLSServer())
 | 
			
		||||
	} else if PushConf.Core.SSL {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -10,6 +10,7 @@ import (
 | 
			
		|||
	"github.com/appleboy/gorush/storage/leveldb"
 | 
			
		||||
	"github.com/appleboy/gorush/storage/memory"
 | 
			
		||||
	"github.com/appleboy/gorush/storage/redis"
 | 
			
		||||
 | 
			
		||||
	"github.com/gin-gonic/gin"
 | 
			
		||||
	"github.com/thoas/stats"
 | 
			
		||||
)
 | 
			
		||||
| 
						 | 
				
			
			@ -41,7 +42,7 @@ type IosStatus struct {
 | 
			
		|||
 | 
			
		||||
// InitAppStatus for initialize app status
 | 
			
		||||
func InitAppStatus() error {
 | 
			
		||||
	LogAccess.Debug("Init App Status Engine as ", PushConf.Stat.Engine)
 | 
			
		||||
	LogAccess.Info("Init App Status Engine as ", PushConf.Stat.Engine)
 | 
			
		||||
	switch PushConf.Stat.Engine {
 | 
			
		||||
	case "memory":
 | 
			
		||||
		StatStorage = memory.New()
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -7,11 +7,11 @@ import (
 | 
			
		|||
)
 | 
			
		||||
 | 
			
		||||
// InitWorkers for initialize all workers.
 | 
			
		||||
func InitWorkers(workerNum int64, queueNum int64) {
 | 
			
		||||
	LogAccess.Debug("worker number is ", workerNum, ", queue number is ", queueNum)
 | 
			
		||||
func InitWorkers(ctx context.Context, wg *sync.WaitGroup, workerNum int64, queueNum int64) {
 | 
			
		||||
	LogAccess.Info("worker number is ", workerNum, ", queue number is ", queueNum)
 | 
			
		||||
	QueueNotification = make(chan PushNotification, queueNum)
 | 
			
		||||
	for i := int64(0); i < workerNum; i++ {
 | 
			
		||||
		go startWorker()
 | 
			
		||||
		go startWorker(ctx, wg, i)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -33,11 +33,12 @@ func SendNotification(req PushNotification) {
 | 
			
		|||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func startWorker() {
 | 
			
		||||
	for {
 | 
			
		||||
		notification := <-QueueNotification
 | 
			
		||||
func startWorker(ctx context.Context, wg *sync.WaitGroup, num int64) {
 | 
			
		||||
	defer wg.Done()
 | 
			
		||||
	for notification := range QueueNotification {
 | 
			
		||||
		SendNotification(notification)
 | 
			
		||||
	}
 | 
			
		||||
	LogAccess.Info("closed the worker num ", num)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// markFailedNotification adds failure logs for all tokens in push notification
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
							
								
								
									
										50
									
								
								main.go
								
								
								
								
							
							
						
						
									
										50
									
								
								main.go
								
								
								
								
							| 
						 | 
				
			
			@ -1,14 +1,18 @@
 | 
			
		|||
package main
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"flag"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"log"
 | 
			
		||||
	"net"
 | 
			
		||||
	"net/http"
 | 
			
		||||
	"os"
 | 
			
		||||
	"os/signal"
 | 
			
		||||
	"path/filepath"
 | 
			
		||||
	"strconv"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"syscall"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/appleboy/gorush/config"
 | 
			
		||||
| 
						 | 
				
			
			@ -18,6 +22,24 @@ import (
 | 
			
		|||
	"golang.org/x/sync/errgroup"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func withContextFunc(ctx context.Context, f func()) context.Context {
 | 
			
		||||
	ctx, cancel := context.WithCancel(ctx)
 | 
			
		||||
	go func() {
 | 
			
		||||
		c := make(chan os.Signal, 1)
 | 
			
		||||
		signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
 | 
			
		||||
		defer signal.Stop(c)
 | 
			
		||||
 | 
			
		||||
		select {
 | 
			
		||||
		case <-ctx.Done():
 | 
			
		||||
		case <-c:
 | 
			
		||||
			cancel()
 | 
			
		||||
			f()
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	return ctx
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func main() {
 | 
			
		||||
	opts := config.ConfYaml{}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -223,20 +245,30 @@ func main() {
 | 
			
		|||
	}
 | 
			
		||||
 | 
			
		||||
	if err = gorush.InitAppStatus(); err != nil {
 | 
			
		||||
		return
 | 
			
		||||
		gorush.LogError.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	gorush.InitWorkers(gorush.PushConf.Core.WorkerNum, gorush.PushConf.Core.QueueNum)
 | 
			
		||||
	wg := &sync.WaitGroup{}
 | 
			
		||||
	wg.Add(int(gorush.PushConf.Core.WorkerNum))
 | 
			
		||||
	ctx := withContextFunc(context.Background(), func() {
 | 
			
		||||
		gorush.LogAccess.Info("close the notification queue channel")
 | 
			
		||||
		close(gorush.QueueNotification)
 | 
			
		||||
		wg.Wait()
 | 
			
		||||
		gorush.LogAccess.Info("the notification queue has been clear")
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	gorush.InitWorkers(ctx, wg, gorush.PushConf.Core.WorkerNum, gorush.PushConf.Core.QueueNum)
 | 
			
		||||
 | 
			
		||||
	if err = gorush.InitAPNSClient(); err != nil {
 | 
			
		||||
		gorush.LogError.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if _, err = gorush.InitFCMClient(gorush.PushConf.Android.APIKey); err != nil {
 | 
			
		||||
		gorush.LogError.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var g errgroup.Group
 | 
			
		||||
 | 
			
		||||
	g.Go(gorush.InitAPNSClient)
 | 
			
		||||
 | 
			
		||||
	g.Go(func() error {
 | 
			
		||||
		_, err := gorush.InitFCMClient(gorush.PushConf.Android.APIKey)
 | 
			
		||||
		return err
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	g.Go(gorush.RunHTTPServer) // Run httpd server
 | 
			
		||||
	g.Go(rpc.RunGRPCServer)    // Run gRPC internal server
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -100,7 +100,7 @@ func (s *Server) Send(ctx context.Context, in *proto.NotificationRequest) (*prot
 | 
			
		|||
// RunGRPCServer run gorush grpc server
 | 
			
		||||
func RunGRPCServer() error {
 | 
			
		||||
	if !gorush.PushConf.GRPC.Enabled {
 | 
			
		||||
		gorush.LogAccess.Debug("gRPC server is disabled.")
 | 
			
		||||
		gorush.LogAccess.Info("gRPC server is disabled.")
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue