4 Commits

Author SHA1 Message Date
soukainna
498f5bc170 fix func get Subscriber 2025-02-12 11:11:17 +01:00
soukainna
84e2db1b73 fix db 2025-02-12 11:05:24 +01:00
soukainna
5a67e2c377 fix aganda subscription 2024-10-30 09:16:16 +01:00
soukainna
0e42bd7f79 add id and fix err 2024-09-02 21:40:14 +02:00
9 changed files with 123 additions and 126 deletions

5
go.mod
View File

@@ -25,15 +25,12 @@ require (
github.com/hashicorp/hcl/v2 v2.10.0 // indirect
github.com/klauspost/compress v1.13.6 // indirect
github.com/magiconair/properties v1.8.6 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mitchellh/go-wordwrap v0.0.0-20150314170334-ad45545899c7 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.0.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/rs/zerolog v1.33.0 // indirect
github.com/spf13/afero v1.8.2 // indirect
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
@@ -47,7 +44,7 @@ require (
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect
golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.27.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/text v0.8.0 // indirect
google.golang.org/genproto v0.0.0-20220519153652-3a47de7e79bd // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect

19
go.sum
View File

@@ -61,7 +61,6 @@ github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XP
github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -83,7 +82,6 @@ github.com/go-openapi/inflect v0.19.0 h1:9jCH9scKIbHeV9m12SmPilScz6krDxKRasNNSNP
github.com/go-openapi/inflect v0.19.0/go.mod h1:lHpZVlpIQqLyKwJ4N+YSc9hchQy/i12fJykb83CRBH4=
github.com/go-test/deep v1.0.3 h1:ZrJSEWsXzPOxaZnFteGEfooLba+ju3FYIbOrS+rQd68=
github.com/go-test/deep v1.0.3/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
@@ -111,6 +109,7 @@ github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QD
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
@@ -177,12 +176,6 @@ github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/magiconair/properties v1.8.6 h1:5ibWZ6iY0NctNGWo87LalDlEZ6R41TqbbDamhfG/Qzo=
github.com/magiconair/properties v1.8.6/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mitchellh/go-wordwrap v0.0.0-20150314170334-ad45545899c7 h1:DpOJ2HYzCv8LZP15IdmG+YdwD2luVPHITV96TkirNBM=
github.com/mitchellh/go-wordwrap v0.0.0-20150314170334-ad45545899c7/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
@@ -202,9 +195,6 @@ github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8=
github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/spf13/afero v1.8.2 h1:xehSyVa0YnHWsJ49JFljMpg1HX19V6NDZ1fkm1Xznbo=
@@ -397,13 +387,8 @@ golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s=
golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -570,6 +555,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w=
google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

View File

@@ -2,9 +2,9 @@ package grpcapi
import (
"encoding/json"
"fmt"
"git.coopgo.io/coopgo-platform/agenda/storage"
"github.com/rs/zerolog/log"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"
@@ -42,7 +42,7 @@ func (e Event) ToStorageType() storage.Event {
for k, d := range e.Data.GetFields() {
jsondata, err := protojson.Marshal(d)
if err != nil {
log.Error().Err(err).Msg("")
fmt.Println(err)
break
}
var data any
@@ -73,7 +73,7 @@ func EventFromStorageType(event *storage.Event) (*Event, error) {
data, err := structpb.NewStruct(d)
if err != nil {
log.Error().Err(err).Msg("")
fmt.Println(err)
return nil, err
}
@@ -121,7 +121,7 @@ func SubscriptionFromStorageType(s storage.Subscription) (*Subscription, error)
data, err := structpb.NewStruct(d)
if err != nil {
log.Error().Err(err).Msg("")
fmt.Println(err)
return nil, err
}
subscription := Subscription{

View File

@@ -2,13 +2,14 @@ package grpcapi
import (
context "context"
"fmt"
"log"
"net"
"time"
"git.coopgo.io/coopgo-platform/agenda/handlers"
"git.coopgo.io/coopgo-platform/agenda/storage"
timestamp "github.com/golang/protobuf/ptypes/timestamp"
"github.com/rs/zerolog/log"
"github.com/spf13/viper"
"google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
@@ -31,12 +32,12 @@ func (s AgendaServerImpl) CreateEvent(ctx context.Context, req *CreateEventReque
e := req.Event.ToStorageType()
event, err := s.handler.CreateEvent(e)
if err != nil {
log.Error().Err(err).Msg("")
fmt.Println(err)
return nil, status.Errorf(codes.AlreadyExists, "event creation failed : %v", err)
}
response, err := EventFromStorageType(event)
if err != nil {
log.Error().Err(err).Msg("")
fmt.Println(err)
return nil, status.Errorf(codes.Internal, "issue while retrieving event : %v", err)
}
return &CreateEventResponse{Event: response}, nil
@@ -44,12 +45,12 @@ func (s AgendaServerImpl) CreateEvent(ctx context.Context, req *CreateEventReque
func (s AgendaServerImpl) GetEvent(ctx context.Context, req *GetEventRequest) (*GetEventResponse, error) {
event, err := s.handler.GetEvent(req.Id)
if err != nil {
log.Error().Err(err).Msg("")
fmt.Println(err)
return nil, status.Errorf(codes.AlreadyExists, "issue while retrieving event : %v", err)
}
response, err := EventFromStorageType(event)
if err != nil {
log.Error().Err(err).Msg("")
fmt.Println(err)
return nil, status.Errorf(codes.Internal, "issue while retrieving event : %v", err)
}
return &GetEventResponse{Event: response}, nil
@@ -69,7 +70,7 @@ func (s AgendaServerImpl) GetEvents(ctx context.Context, req *GetEventsRequest)
responses, err := s.handler.GetEvents(req.Namespaces, mindate, maxdate)
if err != nil {
log.Error().Err(err).Msg("")
fmt.Println(err)
return nil, status.Errorf(codes.NotFound, "could not get events : %v", err)
}
var events []*Event
@@ -77,7 +78,7 @@ func (s AgendaServerImpl) GetEvents(ctx context.Context, req *GetEventsRequest)
for _, e := range responses {
event, err := EventFromStorageType(&e)
if err != nil {
log.Error().Err(err).Msg("")
fmt.Println(err)
return nil, status.Errorf(codes.Internal, "could not get events : %v", err)
}
events = append(events, event)
@@ -140,13 +141,13 @@ func ConvertMapToStruct(dataMap map[string]interface{}) (*structpb.Struct, error
func (s AgendaServerImpl) GetSubscriptionByUser(ctx context.Context, req *GetSubscriptionByUserRequest) (*GetSubscriptionByUserResponse, error) {
results, err := s.handler.GetSubscriptionByUser(req.Subscriber)
if err != nil {
log.Error().Err(err).Msg("")
fmt.Println(err)
return nil, status.Errorf(codes.NotFound, "could not get subscriptions : %v", err)
}
var subscriptions []*storage.Subscription
for _, result := range results {
if err != nil {
log.Error().Err(err).Msg("")
fmt.Println(err)
return nil, status.Errorf(codes.Internal, "could not get subscriptions : %v", err)
}
subscription := &storage.Subscription{
@@ -161,7 +162,7 @@ func (s AgendaServerImpl) GetSubscriptionByUser(ctx context.Context, req *GetSub
}
sub, err := ConvertStorageToProtoSubscription(subscriptions)
if err != nil {
log.Error().Err(err).Msg("")
fmt.Println(err)
return nil, status.Errorf(codes.Internal, "could not get subscriptions : %v", err)
}
return &GetSubscriptionByUserResponse{Subscription: sub}, nil
@@ -182,12 +183,12 @@ func (s AgendaServerImpl) UpdateEvent(ctx context.Context, req *UpdateEventReque
b := req.Event.ToStorageType()
event, err := s.handler.UpdateEvent(b)
if err != nil {
log.Error().Err(err).Msg("")
fmt.Println(err)
return nil, status.Errorf(codes.AlreadyExists, "event update failed : %v", err)
}
response, err := EventFromStorageType(event)
if err != nil {
log.Error().Err(err).Msg("")
fmt.Println(err)
return nil, status.Errorf(codes.Internal, "issue while retrieving event : %v", err)
}
return &UpdateEventResponse{Event: response}, nil
@@ -201,13 +202,13 @@ func Run(done chan error, cfg *viper.Viper, handler handlers.AgendaHandler) {
dev_env = cfg.GetBool("dev_env")
address = ":" + cfg.GetString("services.grpc.port")
)
log.Info().Str("address", address).Msg("Running gRPC server")
fmt.Println("-> GRPC server on", address)
server := grpc.NewServer()
RegisterAgendaServer(server, NewAgendaServer(handler))
l, err := net.Listen("tcp", address)
if err != nil {
log.Fatal().Str("address", address).Err(err).Msg("Cannot listen to address")
log.Fatal(err)
}
if dev_env {
@@ -215,7 +216,7 @@ func Run(done chan error, cfg *viper.Viper, handler handlers.AgendaHandler) {
}
if err := server.Serve(l); err != nil {
log.Error().Err(err).Msg("gRPC service ended")
fmt.Println("gRPC service ended")
done <- err
}
}

View File

@@ -59,6 +59,7 @@ func (h AgendaHandler) SubscribeEvent(eventid string, subscriber string, data ma
subscription := storage.Subscription{
ID: id,
Subscriber: subscriber,
EventID: eventid,
Tags: []string{},
CreatedAt: now,
Data: map[string]any{},
@@ -80,6 +81,7 @@ func (h AgendaHandler) DeleteSubscription(eventid string, subscriber string, dat
deletesubscription := storage.Subscription{
ID: id,
Subscriber: subscriber,
EventID: eventid,
Tags: []string{},
CreatedAt: now,
Data: map[string]any{},
@@ -101,6 +103,17 @@ func (h AgendaHandler) UpdateEvent(event storage.Event) (*storage.Event, error)
return &event, nil
}
func (h AgendaHandler) GetSubscriber(subscriber string) (results []storage.Subscription, err error) {
if subscriber == "" {
return nil, errors.New("missing subscriber")
}
results, err = h.storage.GetSubscriber(subscriber)
if err != nil {
return nil, err
}
return results, nil
}
func (h AgendaHandler) GetSubscriptionByUser(subscriber string) (results []storage.Subscription, err error) {
if subscriber == "" {
return nil, errors.New("missing subscriber")

19
main.go
View File

@@ -1,10 +1,7 @@
package main
import (
"os"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"fmt"
"git.coopgo.io/coopgo-platform/agenda/grpcapi"
"git.coopgo.io/coopgo-platform/agenda/handlers"
@@ -12,7 +9,6 @@ import (
)
func main() {
zerolog.TimeFieldFormat = zerolog.TimeFormatUnix
cfg, err := ReadConfig()
if err != nil {
@@ -25,10 +21,6 @@ func main() {
dev_env = cfg.GetBool("dev_env")
)
if dev_env {
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})
}
storage, err := storage.NewStorage(cfg)
if err != nil {
panic(err)
@@ -36,9 +28,12 @@ func main() {
handler := handlers.NewHandler(cfg, storage)
failed := make(chan error)
fmt.Println("Running", service_name, ":")
if dev_env {
fmt.Printf("\033]0;%s\007", service_name)
}
log.Info().Str("service_name", service_name).Msg("Running service")
failed := make(chan error)
if grpc_enable {
go grpcapi.Run(failed, cfg, handler)
@@ -48,5 +43,5 @@ func main() {
err = <-failed
log.Fatal().Err(err).Msg("Terminating server")
fmt.Println("Terminating :", err)
}

View File

@@ -2,8 +2,8 @@ package storage
import (
"context"
"fmt"
"github.com/rs/zerolog/log"
"github.com/spf13/viper"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
@@ -77,20 +77,21 @@ func (s MongoDBStorage) GetEvents(namespaces []string) (events []Event, err erro
if len(namespaces) == 0 {
cur, err = collection.Find(context.TODO(), bson.D{}, findOptions)
if err != nil {
log.Error().Err(err).Msg("")
fmt.Println("...")
fmt.Println(err)
return events, err
}
} else {
cur, err = collection.Find(context.TODO(), bson.M{"namespace": bson.M{"$in": namespaces}}, findOptions)
if err != nil {
log.Error().Err(err).Msg("")
fmt.Println("Error here")
fmt.Println(err)
return events, err
}
}
for cur.Next(context.TODO()) {
var event Event
//var elem bson.M
if err := cur.Decode(&event); err != nil {
return events, err
@@ -139,7 +140,7 @@ func (s MongoDBStorage) UpdateSubscription(eventid string, subscriber string, de
func (s MongoDBStorage) UpdateEvent(event Event) error {
collection := s.Client.Database(s.DbName).Collection(s.Collections["events"])
if _, err := collection.ReplaceOne(context.TODO(), bson.M{"_id": event.ID}, event); err != nil {
log.Error().Err(err).Msg("")
fmt.Println(err)
return err
}
@@ -147,33 +148,36 @@ func (s MongoDBStorage) UpdateEvent(event Event) error {
}
func (s MongoDBStorage) GetSubscriber(subscriber string) ([]Subscription, error) {
// collection := s.Client.Database(s.DbName).Collection(s.Collections["events"])
// subscriptions := []Subscription{}
// cur, err := collection.Find(context.TODO(), bson.M{"subscriptions": bson.M{"$elemMatch": bson.M{"subscriber": subscriber}}})
// if err != nil {
// return subscriptions, err
// }
// for cur.Next(context.Background()) {
// var event Event
// if err := cur.Decode(&event); err != nil {
// return subscriptions, err
// }
// for i := range event.Subscriptions {
// if event.Subscriptions[i].Subscriber == subscriber {
// subscriptions = append(subscriptions, event.Subscriptions[i])
// }
// }
// }
// return subscriptions, nil
return nil, nil
collection := s.Client.Database(s.DbName).Collection(s.Collections["events"])
subscriptions := []Subscription{}
var cur *mongo.Cursor
cur, err := collection.Find(context.TODO(), bson.M{"subscriptions": bson.M{"$elemMatch": bson.M{"subscriber": subscriber}}})
if err != nil {
return subscriptions, err
}
for cur.Next(context.Background()) {
var event Event
if err := cur.Decode(&event); err != nil {
return subscriptions, err
}
for i := range event.Subscriptions {
if event.Subscriptions[i].Subscriber == subscriber {
subscriptions = append(subscriptions, event.Subscriptions[i])
}
}
}
fmt.Println(subscriptions)
return subscriptions, nil
}
func (s MongoDBStorage) GetSubscriptionByUser(subscriber string) ([]Subscription, error) {
// events, err := s.GetSubscriber(subscriber)
// if err != nil {
// panic(err)
// }
// fmt.Println(events)
// return events, nil
return nil, nil
events, err := s.GetSubscriber(subscriber)
if err != nil {
panic(err)
}
fmt.Println(events)
return events, nil
//return nil, nil
}

View File

@@ -11,7 +11,6 @@ import (
"ariga.io/atlas/sql/postgres"
"ariga.io/atlas/sql/schema"
"github.com/lib/pq"
"github.com/rs/zerolog/log"
"github.com/spf13/viper"
)
@@ -39,12 +38,12 @@ func NewPostgresqlStorage(cfg *viper.Viper) (PostgresqlStorage, error) {
user, password, dbname, sslmode, timezone)
db, err := sql.Open("postgres", psqlconn)
if err != nil {
log.Error().Err(err).Msg("erroropening postgresql connection")
fmt.Println("error", err)
return PostgresqlStorage{}, fmt.Errorf("connection to postgresql failed")
}
err = db.Ping()
if err != nil {
log.Error().Err(err).Msg("error trying to reach postgresql server")
fmt.Println(err)
return PostgresqlStorage{}, fmt.Errorf("connection to postgresql database failed")
}
return PostgresqlStorage{
@@ -414,7 +413,7 @@ func (psql PostgresqlStorage) UpdateSubscription(eventid string, subscriber stri
subscriber,
)
if err != nil {
log.Error().Err(err).Msg("")
fmt.Println(err)
return err
}
eventQuery := fmt.Sprintf(`
@@ -424,7 +423,7 @@ func (psql PostgresqlStorage) UpdateSubscription(eventid string, subscriber stri
`, psql.Tables["event"])
deletedSubscriptions, err := json.Marshal(deletesubscription)
if err != nil {
log.Error().Err(err).Msg("")
fmt.Println(err)
return err
}
_, err = tx.Exec(
@@ -433,7 +432,7 @@ func (psql PostgresqlStorage) UpdateSubscription(eventid string, subscriber stri
eventid,
)
if err != nil {
log.Error().Err(err).Msg("")
fmt.Println(err)
return err
}
@@ -483,7 +482,7 @@ func (psql PostgresqlStorage) GetSubscriber(subscriber string) ([]Subscription,
subscriptionQuery := fmt.Sprintf("SELECT id, event_id, subscriber, tags, created_at, data FROM %s WHERE subscriber = $1", psql.Tables["subscription"])
rows, err := psql.DbConnection.Query(subscriptionQuery, subscriber)
if err != nil {
log.Error().Err(err).Msg("")
fmt.Println(err)
return nil, err
}
defer rows.Close()
@@ -500,14 +499,14 @@ func (psql PostgresqlStorage) GetSubscriber(subscriber string) ([]Subscription,
&dataSubscription,
)
if err != nil {
log.Error().Err(err).Msg("")
fmt.Println(err)
return nil, err
}
subscription.Tags = []string(tags)
data := make(map[string]any)
err = json.Unmarshal(dataSubscription, &data)
if err != nil {
log.Error().Err(err).Msg("")
fmt.Println(err)
return nil, err
}
subscription.Data = data

View File

@@ -13,6 +13,7 @@ type Storage interface {
AddSubscription(eventid string, subscription Subscription) error
UpdateSubscription(eventid string, subscriber string, deletesubscription Subscription) error
UpdateEvent(Event) error
GetSubscriber(subscriber string) ([]Subscription, error)
GetSubscriptionByUser(subscriber string) ([]Subscription, error)
}
type StorageImpl struct {