package grpcapi import ( context "context" "net" "time" "git.coopgo.io/coopgo-platform/agenda/handlers" "git.coopgo.io/coopgo-platform/agenda/storage" timestamp "github.com/golang/protobuf/ptypes/timestamp" "github.com/rs/zerolog/log" "github.com/spf13/viper" "google.golang.org/grpc" codes "google.golang.org/grpc/codes" "google.golang.org/grpc/reflection" status "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/structpb" ) type AgendaServerImpl struct { handler handlers.AgendaHandler } func NewAgendaServer(h handlers.AgendaHandler) *AgendaServerImpl { return &AgendaServerImpl{ handler: h, } } func (s AgendaServerImpl) CreateEvent(ctx context.Context, req *CreateEventRequest) (*CreateEventResponse, error) { e := req.Event.ToStorageType() event, err := s.handler.CreateEvent(e) if err != nil { log.Error().Err(err).Msg("") return nil, status.Errorf(codes.AlreadyExists, "event creation failed : %v", err) } response, err := EventFromStorageType(event) if err != nil { log.Error().Err(err).Msg("") return nil, status.Errorf(codes.Internal, "issue while retrieving event : %v", err) } return &CreateEventResponse{Event: response}, nil } func (s AgendaServerImpl) GetEvent(ctx context.Context, req *GetEventRequest) (*GetEventResponse, error) { event, err := s.handler.GetEvent(req.Id) if err != nil { log.Error().Err(err).Msg("") return nil, status.Errorf(codes.AlreadyExists, "issue while retrieving event : %v", err) } response, err := EventFromStorageType(event) if err != nil { log.Error().Err(err).Msg("") return nil, status.Errorf(codes.Internal, "issue while retrieving event : %v", err) } return &GetEventResponse{Event: response}, nil } func (s AgendaServerImpl) GetEvents(ctx context.Context, req *GetEventsRequest) (*GetEventsResponse, error) { var mindate, maxdate *time.Time if req.Mindate != nil { m := req.Mindate.AsTime() mindate = &m } if req.Maxdate != nil { m := req.Maxdate.AsTime() mindate = &m } responses, err := s.handler.GetEvents(req.Namespaces, mindate, maxdate) if err != nil { log.Error().Err(err).Msg("") return nil, status.Errorf(codes.NotFound, "could not get events : %v", err) } var events []*Event for _, e := range responses { event, err := EventFromStorageType(&e) if err != nil { log.Error().Err(err).Msg("") return nil, status.Errorf(codes.Internal, "could not get events : %v", err) } events = append(events, event) } return &GetEventsResponse{Events: events}, nil } func (s AgendaServerImpl) DeleteEvent(context.Context, *DeleteEventRequest) (*DeleteEventResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method DeleteEvent not implemented") } func (s AgendaServerImpl) SubscribeEvent(ctx context.Context, req *SubscribeEventRequest) (*SubscribeEventResponse, error) { err := s.handler.SubscribeEvent(req.Eventid, req.Subscriber, req.Data.AsMap()) return &SubscribeEventResponse{ Ok: err != nil, }, err } func (s AgendaServerImpl) UnsubscribeEvent(context.Context, *UnsubscribeEventRequest) (*UnsubscribeEventResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Unsubscribe not implemented") } ///////////////////////////////////////////////////////////////////////////////////////// func ConvertStorageToProtoSubscription(storageSubs []*storage.Subscription) ([]*Subscription, error) { var protoSubs []*Subscription for _, storageSub := range storageSubs { dataStruct, err := ConvertMapToStruct(storageSub.Data) if err != nil { return nil, err } protoSub := &Subscription{ Id: storageSub.ID, Eventid: storageSub.EventID, Subscriber: storageSub.Subscriber, Tags: storageSub.Tags, CreatedAt: ×tamp.Timestamp{Seconds: storageSub.CreatedAt.Unix()}, Data: dataStruct, } protoSubs = append(protoSubs, protoSub) } return protoSubs, nil } // ConvertMapToStruct converts a map[string]interface{} to a google.protobuf.Struct func ConvertMapToStruct(dataMap map[string]interface{}) (*structpb.Struct, error) { fields := make(map[string]*structpb.Value) for key, value := range dataMap { fieldValue, err := structpb.NewValue(value) if err != nil { return nil, err } fields[key] = fieldValue } return &structpb.Struct{ Fields: fields, }, nil } func (s AgendaServerImpl) GetSubscriptionByUser(ctx context.Context, req *GetSubscriptionByUserRequest) (*GetSubscriptionByUserResponse, error) { results, err := s.handler.GetSubscriptionByUser(req.Subscriber) if err != nil { log.Error().Err(err).Msg("") return nil, status.Errorf(codes.NotFound, "could not get subscriptions : %v", err) } var subscriptions []*storage.Subscription for _, result := range results { if err != nil { log.Error().Err(err).Msg("") return nil, status.Errorf(codes.Internal, "could not get subscriptions : %v", err) } subscription := &storage.Subscription{ ID: result.ID, EventID: result.EventID, Subscriber: result.Subscriber, Tags: result.Tags, Data: result.Data, CreatedAt: result.CreatedAt, } subscriptions = append(subscriptions, subscription) } sub, err := ConvertStorageToProtoSubscription(subscriptions) if err != nil { log.Error().Err(err).Msg("") return nil, status.Errorf(codes.Internal, "could not get subscriptions : %v", err) } return &GetSubscriptionByUserResponse{Subscription: sub}, nil } // ////////////////////////////////////////////////////////// func (s AgendaServerImpl) DeleteSubscription(ctx context.Context, req *DeleteSubscriptionRequest) (*DeleteSubscriptionResponse, error) { err := s.handler.DeleteSubscription(req.Eventid, req.Subscriber, req.Data.AsMap()) return &DeleteSubscriptionResponse{ Ok: err != nil, }, err } //////////////////////////////////////////////////////////////////// // /////////////////////////////////////////////////////////////////////////////////////// func (s AgendaServerImpl) UpdateEvent(ctx context.Context, req *UpdateEventRequest) (*UpdateEventResponse, error) { b := req.Event.ToStorageType() event, err := s.handler.UpdateEvent(b) if err != nil { log.Error().Err(err).Msg("") return nil, status.Errorf(codes.AlreadyExists, "event update failed : %v", err) } response, err := EventFromStorageType(event) if err != nil { log.Error().Err(err).Msg("") return nil, status.Errorf(codes.Internal, "issue while retrieving event : %v", err) } return &UpdateEventResponse{Event: response}, nil } // //////////////////////////////////////////////////////////////////////////////////////////// func (s AgendaServerImpl) mustEmbedUnimplementedAgendaServer() {} func Run(done chan error, cfg *viper.Viper, handler handlers.AgendaHandler) { var ( dev_env = cfg.GetBool("dev_env") address = ":" + cfg.GetString("services.grpc.port") ) log.Info().Str("address", address).Msg("Running gRPC server") server := grpc.NewServer() RegisterAgendaServer(server, NewAgendaServer(handler)) l, err := net.Listen("tcp", address) if err != nil { log.Fatal().Str("address", address).Err(err).Msg("Cannot listen to address") } if dev_env { reflection.Register(server) } if err := server.Serve(l); err != nil { log.Error().Err(err).Msg("gRPC service ended") done <- err } }