132 lines
3.8 KiB
Go
132 lines
3.8 KiB
Go
|
package grpcapi
|
||
|
|
||
|
import (
|
||
|
context "context"
|
||
|
"net"
|
||
|
"time"
|
||
|
|
||
|
"git.coopgo.io/coopgo-platform/diags/handlers"
|
||
|
// "git.coopgo.io/coopgo-platform/diags/storage"
|
||
|
// timestamp "github.com/golang/protobuf/ptypes/timestamp"
|
||
|
"github.com/rs/zerolog/log"
|
||
|
"github.com/spf13/viper"
|
||
|
"google.golang.org/grpc"
|
||
|
codes "google.golang.org/grpc/codes"
|
||
|
"google.golang.org/grpc/reflection"
|
||
|
status "google.golang.org/grpc/status"
|
||
|
// "google.golang.org/protobuf/types/known/structpb"
|
||
|
)
|
||
|
|
||
|
type DiagsServerImpl struct {
|
||
|
handler handlers.DiagsHandler
|
||
|
}
|
||
|
|
||
|
func NewDiagsServer(h handlers.DiagsHandler) *DiagsServerImpl {
|
||
|
return &DiagsServerImpl{
|
||
|
handler: h,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (s DiagsServerImpl) CreateDiag(ctx context.Context, req *CreateDiagRequest) (*CreateDiagResponse, error) {
|
||
|
e := req.Diag.ToStorageType()
|
||
|
diag, err := s.handler.CreateDiag(e)
|
||
|
if err != nil {
|
||
|
log.Error().Err(err).Msg("")
|
||
|
return nil, status.Errorf(codes.AlreadyExists, "diag creation failed : %v", err)
|
||
|
}
|
||
|
response, err := DiagFromStorageType(diag)
|
||
|
if err != nil {
|
||
|
log.Error().Err(err).Msg("")
|
||
|
return nil, status.Errorf(codes.Internal, "issue while retrieving diag : %v", err)
|
||
|
}
|
||
|
return &CreateDiagResponse{Diag: response}, nil
|
||
|
}
|
||
|
func (s DiagsServerImpl) GetDiag(ctx context.Context, req *GetDiagRequest) (*GetDiagResponse, error) {
|
||
|
diag, err := s.handler.GetDiag(req.Id)
|
||
|
if err != nil {
|
||
|
log.Error().Err(err).Msg("")
|
||
|
return nil, status.Errorf(codes.AlreadyExists, "issue while retrieving diag : %v", err)
|
||
|
}
|
||
|
response, err := DiagFromStorageType(diag)
|
||
|
if err != nil {
|
||
|
log.Error().Err(err).Msg("")
|
||
|
return nil, status.Errorf(codes.Internal, "issue while retrieving diag : %v", err)
|
||
|
}
|
||
|
return &GetDiagResponse{Diag: response}, nil
|
||
|
}
|
||
|
func (s DiagsServerImpl) GetDiags(ctx context.Context, req *GetDiagsRequest) (*GetDiagsResponse, error) {
|
||
|
var mindate, maxdate *time.Time
|
||
|
|
||
|
if req.Mindate != nil {
|
||
|
m := req.Mindate.AsTime()
|
||
|
mindate = &m
|
||
|
}
|
||
|
|
||
|
if req.Maxdate != nil {
|
||
|
m := req.Maxdate.AsTime()
|
||
|
mindate = &m
|
||
|
}
|
||
|
|
||
|
responses, err := s.handler.GetDiags(req.Namespaces, mindate, maxdate)
|
||
|
if err != nil {
|
||
|
log.Error().Err(err).Msg("")
|
||
|
return nil, status.Errorf(codes.NotFound, "could not get diags : %v", err)
|
||
|
}
|
||
|
var diags []*Diag
|
||
|
|
||
|
for _, e := range responses {
|
||
|
diag, err := DiagFromStorageType(&e)
|
||
|
if err != nil {
|
||
|
log.Error().Err(err).Msg("")
|
||
|
return nil, status.Errorf(codes.Internal, "could not get diags : %v", err)
|
||
|
}
|
||
|
diags = append(diags, diag)
|
||
|
}
|
||
|
return &GetDiagsResponse{Diags: diags}, nil
|
||
|
}
|
||
|
func (s DiagsServerImpl) DeleteDiag(context.Context, *DeleteDiagRequest) (*DeleteDiagResponse, error) {
|
||
|
return nil, status.Errorf(codes.Unimplemented, "method DeleteDiag not implemented")
|
||
|
}
|
||
|
|
||
|
|
||
|
func (s DiagsServerImpl) UpdateDiag(ctx context.Context, req *UpdateDiagRequest) (*UpdateDiagResponse, error) {
|
||
|
b := req.Diag.ToStorageType()
|
||
|
diag, err := s.handler.UpdateDiag(b)
|
||
|
if err != nil {
|
||
|
log.Error().Err(err).Msg("")
|
||
|
return nil, status.Errorf(codes.AlreadyExists, "diag update failed : %v", err)
|
||
|
}
|
||
|
response, err := DiagFromStorageType(diag)
|
||
|
if err != nil {
|
||
|
log.Error().Err(err).Msg("")
|
||
|
return nil, status.Errorf(codes.Internal, "issue while retrieving diag : %v", err)
|
||
|
}
|
||
|
return &UpdateDiagResponse{Diag: response}, nil
|
||
|
}
|
||
|
|
||
|
func (s DiagsServerImpl) mustEmbedUnimplementedDiagsServer() {}
|
||
|
|
||
|
func Run(done chan error, cfg *viper.Viper, handler handlers.DiagsHandler) {
|
||
|
var (
|
||
|
dev_env = cfg.GetBool("dev_env")
|
||
|
address = ":" + cfg.GetString("services.grpc.port")
|
||
|
)
|
||
|
log.Info().Str("address", address).Msg("Running gRPC server")
|
||
|
|
||
|
server := grpc.NewServer()
|
||
|
RegisterDiagsServer(server, NewDiagsServer(handler))
|
||
|
l, err := net.Listen("tcp", address)
|
||
|
if err != nil {
|
||
|
log.Fatal().Str("address", address).Err(err).Msg("Cannot listen to address")
|
||
|
}
|
||
|
|
||
|
if dev_env {
|
||
|
reflection.Register(server)
|
||
|
}
|
||
|
|
||
|
if err := server.Serve(l); err != nil {
|
||
|
log.Error().Err(err).Msg("gRPC service ended")
|
||
|
done <- err
|
||
|
}
|
||
|
}
|