gorush/rpc/server.go

142 lines
3.5 KiB
Go
Raw Normal View History

package rpc
import (
"context"
"net"
"strings"
"sync"
2021-07-13 08:32:39 +00:00
"github.com/appleboy/gorush/core"
"github.com/appleboy/gorush/gorush"
2021-07-13 08:32:39 +00:00
"github.com/appleboy/gorush/logx"
"github.com/appleboy/gorush/rpc/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/status"
)
// Server is used to implement gorush grpc server.
type Server struct {
mu sync.Mutex
// statusMap stores the serving status of the services this Server monitors.
statusMap map[string]proto.HealthCheckResponse_ServingStatus
}
// NewServer returns a new Server.
func NewServer() *Server {
return &Server{
statusMap: make(map[string]proto.HealthCheckResponse_ServingStatus),
}
}
// Check implements `service Health`.
func (s *Server) Check(ctx context.Context, in *proto.HealthCheckRequest) (*proto.HealthCheckResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
if in.Service == "" {
// check the server overall health status.
return &proto.HealthCheckResponse{
Status: proto.HealthCheckResponse_SERVING,
}, nil
}
if status, ok := s.statusMap[in.Service]; ok {
return &proto.HealthCheckResponse{
Status: status,
}, nil
}
return nil, status.Error(codes.NotFound, "unknown service")
}
// Send implements helloworld.GreeterServer
func (s *Server) Send(ctx context.Context, in *proto.NotificationRequest) (*proto.NotificationReply, error) {
2021-01-23 01:39:06 +00:00
badge := int(in.Badge)
notification := gorush.PushNotification{
Platform: int(in.Platform),
Tokens: in.Tokens,
Message: in.Message,
Title: in.Title,
Topic: in.Topic,
APIKey: in.Key,
Category: in.Category,
Sound: in.Sound,
ContentAvailable: in.ContentAvailable,
ThreadID: in.ThreadID,
MutableContent: in.MutableContent,
2020-03-13 16:00:03 +00:00
Image: in.Image,
2020-09-27 04:45:58 +00:00
Priority: strings.ToLower(in.GetPriority().String()),
}
if badge > 0 {
notification.Badge = &badge
}
2021-07-13 08:32:39 +00:00
if in.Topic != "" && in.Platform == core.PlatFormAndroid {
notification.To = in.Topic
}
if in.Alert != nil {
notification.Alert = gorush.Alert{
Title: in.Alert.Title,
Body: in.Alert.Body,
Subtitle: in.Alert.Subtitle,
Action: in.Alert.Action,
ActionLocKey: in.Alert.Action,
LaunchImage: in.Alert.LaunchImage,
LocArgs: in.Alert.LocArgs,
LocKey: in.Alert.LocKey,
TitleLocArgs: in.Alert.TitleLocArgs,
TitleLocKey: in.Alert.TitleLocKey,
}
}
if in.Data != nil {
notification.Data = map[string]interface{}{}
for k, v := range in.Data.Fields {
notification.Data[k] = v
}
}
go gorush.SendNotification(ctx, notification)
return &proto.NotificationReply{
Success: true,
Counts: int32(len(notification.Tokens)),
}, nil
}
// RunGRPCServer run gorush grpc server
func RunGRPCServer(ctx context.Context) error {
2017-07-24 10:58:30 +00:00
if !gorush.PushConf.GRPC.Enabled {
2021-07-13 08:32:39 +00:00
logx.LogAccess.Info("gRPC server is disabled.")
2017-07-24 10:58:30 +00:00
return nil
}
s := grpc.NewServer()
rpcSrv := NewServer()
proto.RegisterGorushServer(s, rpcSrv)
proto.RegisterHealthServer(s, rpcSrv)
// Register reflection service on gRPC server.
reflection.Register(s)
lis, err := net.Listen("tcp", ":"+gorush.PushConf.GRPC.Port)
if err != nil {
2021-07-13 08:32:39 +00:00
logx.LogError.Fatalln(err)
return err
}
2021-07-13 08:32:39 +00:00
logx.LogAccess.Info("gRPC server is running on " + gorush.PushConf.GRPC.Port + " port.")
go func() {
select {
case <-ctx.Done():
s.GracefulStop() // graceful shutdown
2021-07-13 08:32:39 +00:00
logx.LogAccess.Info("shutdown the gRPC server")
}
}()
if err = s.Serve(lis); err != nil {
2021-07-13 08:32:39 +00:00
logx.LogError.Fatalln(err)
}
return err
}