* solve https://github.com/appleboy/gorush/issues/476

* add logging to start gRPC server (tcp listener)
change grpc.Server.Stop -> grpc.Server.GracefulStop
add grpc server_test
remove async functions from rpc/server.go

* add logging to err in rpc/server.go
This commit is contained in:
Slava Romanov 2020-03-10 16:09:03 +03:00 committed by GitHub
parent eb136d137a
commit d06e4a2337
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 60 additions and 19 deletions

View File

@ -2,13 +2,11 @@ package rpc
import ( import (
"context" "context"
"net/http" "net"
"sync" "sync"
"time"
"github.com/appleboy/gorush/gorush" "github.com/appleboy/gorush/gorush"
"github.com/appleboy/gorush/rpc/proto" "github.com/appleboy/gorush/rpc/proto"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
@ -110,27 +108,24 @@ func RunGRPCServer(ctx context.Context) error {
rpcSrv := NewServer() rpcSrv := NewServer()
proto.RegisterGorushServer(s, rpcSrv) proto.RegisterGorushServer(s, rpcSrv)
proto.RegisterHealthServer(s, rpcSrv) proto.RegisterHealthServer(s, rpcSrv)
// Register reflection service on gRPC server. // Register reflection service on gRPC server.
reflection.Register(s) reflection.Register(s)
gorush.LogAccess.Info("gRPC server is running on " + gorush.PushConf.GRPC.Port + " port.")
srv := &http.Server{ lis, err := net.Listen("tcp", ":"+gorush.PushConf.GRPC.Port)
Addr: ":" + gorush.PushConf.GRPC.Port, if err != nil {
Handler: s, gorush.LogError.Fatalln(err)
return err
} }
gorush.LogAccess.Info("gRPC server is running on " + gorush.PushConf.GRPC.Port + " port.")
var g errgroup.Group go func() {
g.Go(func() error {
select { select {
case <-ctx.Done(): case <-ctx.Done():
timeout := time.Duration(gorush.PushConf.Core.ShutdownTimeout) * time.Second s.GracefulStop() // graceful shutdown
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return srv.Shutdown(ctx)
} }
}) }()
g.Go(func() error { if err = s.Serve(lis); err != nil {
return srv.ListenAndServe() gorush.LogError.Fatalln(err)
}) }
return g.Wait() return err
} }

View File

@ -1 +1,47 @@
package rpc package rpc
import (
"context"
"testing"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"github.com/appleboy/gorush/gorush"
)
const gRPCAddr = "localhost:9000"
func TestGracefulShutDownGRPCServer(t *testing.T) {
// server configs
gorush.InitLog()
gorush.PushConf.GRPC.Enabled = true
gorush.PushConf.GRPC.Port = "9000"
gorush.PushConf.Log.Format = "json"
// Run gRPC server
ctx, gRPCContextCancel := context.WithCancel(context.Background())
go func() {
if err := RunGRPCServer(ctx); err != nil {
panic(err)
}
}()
// gRPC client conn
conn, err := grpc.Dial(
gRPCAddr,
grpc.WithInsecure(),
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
) // wait for server ready
if err != nil {
t.Error(err)
}
// Stop gRPC server
go gRPCContextCancel()
// wait for client connection would be closed
for conn.GetState() != connectivity.TransientFailure {
}
conn.Close()
}