Add grpc health check. (#304)

* Add grpc health check.

Signed-off-by: Bo-Yi Wu <appleboy.tw@gmail.com>

* fix missing package.

Signed-off-by: Bo-Yi Wu <appleboy.tw@gmail.com>

* fix readme..

Signed-off-by: Bo-Yi Wu <appleboy.tw@gmail.com>

* fix vet

Signed-off-by: Bo-Yi Wu <appleboy.tw@gmail.com>
This commit is contained in:
Bo-Yi Wu
2017-11-12 08:44:33 -06:00
committed by GitHub
parent ff4ab7002f
commit 25bfe420b0
22 changed files with 1391 additions and 37 deletions

61
rpc/client_grpc_health.go Normal file
View File

@@ -0,0 +1,61 @@
package rpc
import (
"context"
"time"
"github.com/appleboy/gorush/rpc/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
// generate protobuffs
// protoc --go_out=plugins=grpc,import_path=proto:. *.proto
var backoff = time.Second
type healthClient struct {
client proto.HealthClient
conn *grpc.ClientConn
}
// NewGrpcHealthClient returns a new grpc Client.
func NewGrpcHealthClient(conn *grpc.ClientConn) Health {
client := new(healthClient)
client.client = proto.NewHealthClient(conn)
client.conn = conn
return client
}
func (c *healthClient) Close() error {
return c.conn.Close()
}
func (c *healthClient) Check(ctx context.Context) (bool, error) {
var res *proto.HealthCheckResponse
var err error
req := new(proto.HealthCheckRequest)
for {
res, err = c.client.Check(ctx, req)
if err == nil {
if res.GetStatus() == proto.HealthCheckResponse_SERVING {
return true, nil
}
return false, nil
}
switch grpc.Code(err) {
case
codes.Aborted,
codes.DataLoss,
codes.DeadlineExceeded,
codes.Internal,
codes.Unavailable:
// non-fatal errors
default:
return false, err
}
<-time.After(backoff)
}
}

1
rpc/client_test.go Normal file
View File

@@ -0,0 +1 @@
package rpc

View File

@@ -0,0 +1,30 @@
package main
import (
"log"
"github.com/appleboy/gorush/rpc/proto"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
const (
address = "localhost:9000"
)
func main() {
// Set up a connection to the server.
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := proto.NewHealthClient(conn)
r, err := c.Check(context.Background(), &proto.HealthCheckRequest{})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
log.Printf("Health: %d\n", r.GetStatus())
}

View File

@@ -3,7 +3,8 @@ package main
import (
"log"
pb "github.com/appleboy/gorush/rpc/proto"
"github.com/appleboy/gorush/rpc/proto"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
@@ -19,9 +20,9 @@ func main() {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewGorushClient(conn)
c := proto.NewGorushClient(conn)
r, err := c.Send(context.Background(), &pb.NotificationRequest{
r, err := c.Send(context.Background(), &proto.NotificationRequest{
Platform: 2,
Tokens: []string{"1234567890"},
Message: "test message",

11
rpc/health.go Normal file
View File

@@ -0,0 +1,11 @@
package rpc
import (
"context"
)
// Health defines a health-check connection.
type Health interface {
// Check returns if server is healthy or not
Check(c context.Context) (bool, error)
}

View File

@@ -10,6 +10,8 @@ It is generated from these files:
It has these top-level messages:
NotificationRequest
NotificationReply
HealthCheckRequest
HealthCheckResponse
*/
package proto
@@ -33,6 +35,32 @@ var _ = math.Inf
// proto package needs to be updated.
const _ = proto1.ProtoPackageIsVersion2 // please upgrade the proto package
type HealthCheckResponse_ServingStatus int32
const (
HealthCheckResponse_UNKNOWN HealthCheckResponse_ServingStatus = 0
HealthCheckResponse_SERVING HealthCheckResponse_ServingStatus = 1
HealthCheckResponse_NOT_SERVING HealthCheckResponse_ServingStatus = 2
)
var HealthCheckResponse_ServingStatus_name = map[int32]string{
0: "UNKNOWN",
1: "SERVING",
2: "NOT_SERVING",
}
var HealthCheckResponse_ServingStatus_value = map[string]int32{
"UNKNOWN": 0,
"SERVING": 1,
"NOT_SERVING": 2,
}
func (x HealthCheckResponse_ServingStatus) String() string {
return proto1.EnumName(HealthCheckResponse_ServingStatus_name, int32(x))
}
func (HealthCheckResponse_ServingStatus) EnumDescriptor() ([]byte, []int) {
return fileDescriptor0, []int{3, 0}
}
type NotificationRequest struct {
Tokens []string `protobuf:"bytes,1,rep,name=tokens" json:"tokens,omitempty"`
Platform int32 `protobuf:"varint,2,opt,name=platform" json:"platform,omitempty"`
@@ -113,9 +141,44 @@ func (m *NotificationReply) GetCounts() int32 {
return 0
}
type HealthCheckRequest struct {
Service string `protobuf:"bytes,1,opt,name=service" json:"service,omitempty"`
}
func (m *HealthCheckRequest) Reset() { *m = HealthCheckRequest{} }
func (m *HealthCheckRequest) String() string { return proto1.CompactTextString(m) }
func (*HealthCheckRequest) ProtoMessage() {}
func (*HealthCheckRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
func (m *HealthCheckRequest) GetService() string {
if m != nil {
return m.Service
}
return ""
}
type HealthCheckResponse struct {
Status HealthCheckResponse_ServingStatus `protobuf:"varint,1,opt,name=status,enum=proto.HealthCheckResponse_ServingStatus" json:"status,omitempty"`
}
func (m *HealthCheckResponse) Reset() { *m = HealthCheckResponse{} }
func (m *HealthCheckResponse) String() string { return proto1.CompactTextString(m) }
func (*HealthCheckResponse) ProtoMessage() {}
func (*HealthCheckResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} }
func (m *HealthCheckResponse) GetStatus() HealthCheckResponse_ServingStatus {
if m != nil {
return m.Status
}
return HealthCheckResponse_UNKNOWN
}
func init() {
proto1.RegisterType((*NotificationRequest)(nil), "proto.NotificationRequest")
proto1.RegisterType((*NotificationReply)(nil), "proto.NotificationReply")
proto1.RegisterType((*HealthCheckRequest)(nil), "proto.HealthCheckRequest")
proto1.RegisterType((*HealthCheckResponse)(nil), "proto.HealthCheckResponse")
proto1.RegisterEnum("proto.HealthCheckResponse_ServingStatus", HealthCheckResponse_ServingStatus_name, HealthCheckResponse_ServingStatus_value)
}
// Reference imports to suppress errors if they are not otherwise used.
@@ -190,23 +253,95 @@ var _Gorush_serviceDesc = grpc.ServiceDesc{
Metadata: "gorush.proto",
}
// Client API for Health service
type HealthClient interface {
Check(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error)
}
type healthClient struct {
cc *grpc.ClientConn
}
func NewHealthClient(cc *grpc.ClientConn) HealthClient {
return &healthClient{cc}
}
func (c *healthClient) Check(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) {
out := new(HealthCheckResponse)
err := grpc.Invoke(ctx, "/proto.Health/Check", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for Health service
type HealthServer interface {
Check(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error)
}
func RegisterHealthServer(s *grpc.Server, srv HealthServer) {
s.RegisterService(&_Health_serviceDesc, srv)
}
func _Health_Check_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(HealthCheckRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(HealthServer).Check(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/proto.Health/Check",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(HealthServer).Check(ctx, req.(*HealthCheckRequest))
}
return interceptor(ctx, in, info, handler)
}
var _Health_serviceDesc = grpc.ServiceDesc{
ServiceName: "proto.Health",
HandlerType: (*HealthServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Check",
Handler: _Health_Check_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "gorush.proto",
}
func init() { proto1.RegisterFile("gorush.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 231 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x90, 0xb1, 0x4e, 0xc3, 0x30,
0x10, 0x86, 0x31, 0x69, 0x42, 0x7b, 0x62, 0x00, 0x83, 0x90, 0x95, 0x29, 0xca, 0x94, 0xa9, 0x03,
0xec, 0x6c, 0x08, 0x26, 0x06, 0xf3, 0x04, 0xc1, 0x5c, 0x8b, 0xd5, 0x34, 0x67, 0x72, 0x97, 0x21,
0x8f, 0xc3, 0x9b, 0x22, 0xbb, 0x29, 0x52, 0xa5, 0x4e, 0xf6, 0xf7, 0xff, 0xc3, 0x7d, 0xfa, 0xe1,
0x7a, 0x4b, 0xc3, 0xc8, 0xdf, 0xeb, 0x30, 0x90, 0x90, 0xce, 0xd3, 0x53, 0xff, 0x2a, 0xb8, 0x7b,
0x27, 0xf1, 0x1b, 0xef, 0x5a, 0xf1, 0xd4, 0x5b, 0xfc, 0x19, 0x91, 0x45, 0x3f, 0x40, 0x21, 0xb4,
0xc3, 0x9e, 0x8d, 0xaa, 0xb2, 0x66, 0x65, 0x67, 0xd2, 0x25, 0x2c, 0x43, 0xd7, 0xca, 0x86, 0x86,
0xbd, 0xb9, 0xac, 0x54, 0x93, 0xdb, 0x7f, 0xd6, 0x06, 0xae, 0xf6, 0xc8, 0xdc, 0x6e, 0xd1, 0x64,
0x95, 0x6a, 0x56, 0xf6, 0x88, 0xfa, 0x1e, 0x72, 0xf1, 0xd2, 0xa1, 0x59, 0xa4, 0xfc, 0x00, 0x29,
0xa5, 0xe0, 0x9d, 0xc9, 0xe7, 0x34, 0x82, 0xbe, 0x81, 0x6c, 0x87, 0x93, 0x29, 0x52, 0x16, 0xbf,
0xf5, 0x0b, 0xdc, 0x9e, 0x2a, 0x86, 0x6e, 0x8a, 0xc7, 0x78, 0x74, 0x0e, 0x39, 0x1a, 0xaa, 0x66,
0x69, 0x8f, 0x18, 0xd5, 0x1d, 0x8d, 0xbd, 0xf0, 0x2c, 0x38, 0xd3, 0xe3, 0x1b, 0x14, 0xaf, 0x69,
0x01, 0xfd, 0x0c, 0x8b, 0x0f, 0xec, 0xbf, 0x74, 0x79, 0xd8, 0x62, 0x7d, 0x66, 0x80, 0xd2, 0x9c,
0xed, 0x42, 0x37, 0xd5, 0x17, 0x9f, 0x45, 0xaa, 0x9e, 0xfe, 0x02, 0x00, 0x00, 0xff, 0xff, 0x52,
0x8a, 0x53, 0x9a, 0x52, 0x01, 0x00, 0x00,
// 353 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x91, 0xcf, 0x6e, 0x82, 0x40,
0x10, 0xc6, 0x45, 0x05, 0x75, 0xec, 0x1f, 0xbb, 0x36, 0xcd, 0x96, 0x93, 0xe1, 0xc4, 0x89, 0x83,
0xbd, 0xf5, 0x60, 0x9a, 0x34, 0x46, 0x9b, 0x26, 0x98, 0x2c, 0xfd, 0x73, 0x6c, 0x28, 0x1d, 0x95,
0x88, 0x2c, 0x65, 0x97, 0x26, 0x3e, 0x47, 0x9f, 0xa0, 0x6f, 0xda, 0xec, 0x02, 0x4d, 0x4c, 0xe8,
0x09, 0x7e, 0xdf, 0xec, 0x7e, 0xf3, 0xcd, 0x0e, 0x9c, 0x6c, 0x78, 0x5e, 0x88, 0xad, 0x97, 0xe5,
0x5c, 0x72, 0x62, 0xea, 0x8f, 0xf3, 0x63, 0xc0, 0xd8, 0xe7, 0x32, 0x5e, 0xc7, 0x51, 0x28, 0x63,
0x9e, 0x32, 0xfc, 0x2c, 0x50, 0x48, 0x72, 0x05, 0x96, 0xe4, 0x3b, 0x4c, 0x05, 0x35, 0x26, 0x1d,
0x77, 0xc0, 0x2a, 0x22, 0x36, 0xf4, 0xb3, 0x24, 0x94, 0x6b, 0x9e, 0xef, 0x69, 0x7b, 0x62, 0xb8,
0x26, 0xfb, 0x63, 0x42, 0xa1, 0xb7, 0x47, 0x21, 0xc2, 0x0d, 0xd2, 0xce, 0xc4, 0x70, 0x07, 0xac,
0x46, 0x72, 0x09, 0xa6, 0x8c, 0x65, 0x82, 0xb4, 0xab, 0xf5, 0x12, 0xb4, 0xca, 0xb3, 0x38, 0xa2,
0x66, 0xa5, 0x2a, 0x20, 0x23, 0xe8, 0xec, 0xf0, 0x40, 0x2d, 0xad, 0xa9, 0x5f, 0x67, 0x0e, 0x17,
0xc7, 0x11, 0xb3, 0xe4, 0xa0, 0x9a, 0x89, 0x22, 0x8a, 0x50, 0xa8, 0x84, 0x86, 0xdb, 0x67, 0x35,
0xaa, 0xe8, 0x11, 0x2f, 0x52, 0x29, 0xaa, 0x80, 0x15, 0x39, 0x1e, 0x90, 0x25, 0x86, 0x89, 0xdc,
0xde, 0x6f, 0x31, 0xda, 0xd5, 0x83, 0x2a, 0x1f, 0xcc, 0xbf, 0xe2, 0x08, 0xb5, 0xcf, 0x80, 0xd5,
0xe8, 0x7c, 0x1b, 0x30, 0x3e, 0xba, 0x20, 0x32, 0x9e, 0x0a, 0x24, 0x77, 0x60, 0x09, 0x19, 0xca,
0xa2, 0x6c, 0x7c, 0x36, 0x75, 0xcb, 0x17, 0xf5, 0x1a, 0xce, 0x7a, 0x81, 0xf2, 0x4a, 0x37, 0x81,
0x3e, 0xcf, 0xaa, 0x7b, 0xce, 0x2d, 0x9c, 0x1e, 0x15, 0xc8, 0x10, 0x7a, 0xcf, 0xfe, 0xa3, 0xbf,
0x7a, 0xf5, 0x47, 0x2d, 0x05, 0xc1, 0x9c, 0xbd, 0x3c, 0xf8, 0x8b, 0x91, 0x41, 0xce, 0x61, 0xe8,
0xaf, 0x9e, 0xde, 0x6a, 0xa1, 0x3d, 0x5d, 0x82, 0xb5, 0xd0, 0x7b, 0x24, 0x33, 0xe8, 0x06, 0x98,
0x7e, 0x10, 0xbb, 0xea, 0xdf, 0xb0, 0x46, 0x9b, 0x36, 0xd6, 0xb2, 0xe4, 0xe0, 0xb4, 0x94, 0x53,
0x19, 0x99, 0xcc, 0xc0, 0xd4, 0xb1, 0xc9, 0x75, 0xd3, 0x28, 0xa5, 0x93, 0xfd, 0xff, 0x94, 0xef,
0x96, 0x2e, 0xdd, 0xfc, 0x06, 0x00, 0x00, 0xff, 0xff, 0xc6, 0xc3, 0xf2, 0x8d, 0x62, 0x02, 0x00,
0x00,
}

View File

@@ -20,3 +20,19 @@ service Gorush {
rpc Send (NotificationRequest) returns (NotificationReply) {}
}
message HealthCheckRequest {
string service = 1;
}
message HealthCheckResponse {
enum ServingStatus {
UNKNOWN = 0;
SERVING = 1;
NOT_SERVING = 2;
}
ServingStatus status = 1;
}
service Health {
rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
}

View File

@@ -2,24 +2,56 @@ package rpc
import (
"net"
"sync"
"github.com/appleboy/gorush/gorush"
pb "github.com/appleboy/gorush/rpc/proto"
"github.com/appleboy/gorush/rpc/proto"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/status"
)
const (
port = ":9000"
)
// server is used to implement gorush grpc server.
type server struct{}
// Server is used to implement gorush grpc server.
type Server struct {
mu sync.Mutex
// statusMap stores the serving status of the services this Server monitors.
statusMap map[string]proto.HealthCheckResponse_ServingStatus
}
// NewServer returns a new Server.
func NewServer() *Server {
return &Server{
statusMap: make(map[string]proto.HealthCheckResponse_ServingStatus),
}
}
// Check implements `service Health`.
func (s *Server) Check(ctx context.Context, in *proto.HealthCheckRequest) (*proto.HealthCheckResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
if in.Service == "" {
// check the server overall health status.
return &proto.HealthCheckResponse{
Status: proto.HealthCheckResponse_SERVING,
}, nil
}
if status, ok := s.statusMap[in.Service]; ok {
return &proto.HealthCheckResponse{
Status: status,
}, nil
}
return nil, status.Error(codes.NotFound, "unknown service")
}
// Send implements helloworld.GreeterServer
func (s *server) Send(ctx context.Context, in *pb.NotificationRequest) (*pb.NotificationReply, error) {
func (s *Server) Send(ctx context.Context, in *proto.NotificationRequest) (*proto.NotificationReply, error) {
notification := gorush.PushNotification{
Platform: int(in.Platform),
Tokens: in.Tokens,
@@ -31,7 +63,7 @@ func (s *server) Send(ctx context.Context, in *pb.NotificationRequest) (*pb.Noti
go gorush.SendNotification(notification)
return &pb.NotificationReply{
return &proto.NotificationReply{
Success: false,
Counts: int32(len(notification.Tokens)),
}, nil
@@ -50,7 +82,9 @@ func RunGRPCServer() error {
return err
}
s := grpc.NewServer()
pb.RegisterGorushServer(s, &server{})
srv := NewServer()
proto.RegisterGorushServer(s, srv)
proto.RegisterHealthServer(s, srv)
// Register reflection service on gRPC server.
reflection.Register(s)
gorush.LogAccess.Debug("gRPC server is running on " + gorush.PushConf.GRPC.Port + " port.")

1
rpc/server_test.go Normal file
View File

@@ -0,0 +1 @@
package rpc