2 Commits

Author SHA1 Message Date
f06ed52547 agenda postgresql 2023-05-09 10:42:52 +02:00
ef3add576f merge 2023-03-10 13:19:01 +01:00
11 changed files with 2321 additions and 4 deletions

1
.gitignore vendored
View File

@@ -1,3 +1,4 @@
config.yaml
.vscode
.idea
__debug_bin

View File

@@ -3,7 +3,7 @@ FROM golang:alpine as builder
ARG ACCESS_TOKEN_USR="nothing"
ARG ACCESS_TOKEN_PWD="nothing"
RUN apk add --no-cache ca-certificates tzdata
RUN apk add --no-cache ca-certificates tzdata git
WORKDIR /

8
go.mod
View File

@@ -3,17 +3,19 @@ module git.coopgo.io/coopgo-platform/agenda
go 1.18
require (
github.com/golang/protobuf v1.5.2
github.com/google/uuid v1.3.0
github.com/spf13/viper v1.12.0
go.mongodb.org/mongo-driver v1.10.1
google.golang.org/grpc v1.46.2
google.golang.org/protobuf v1.28.0
google.golang.org/grpc v1.48.0
google.golang.org/protobuf v1.28.1
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22
)
require (
git.coopgo.io/coopgo-platform/fleets v0.0.0-20221205162030-cecdcb20e1d5 // indirect
git.coopgo.io/coopgo-platform/groups-management v0.0.0-20221205161801-9705c8d898f0 // indirect
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/klauspost/compress v1.13.6 // indirect

8
go.sum
View File

@@ -36,6 +36,10 @@ cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RX
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
git.coopgo.io/coopgo-platform/fleets v0.0.0-20221205162030-cecdcb20e1d5 h1:HL/M681G9R1ZN8XPp4LvG9hcF20//R9yQmr5cdXwQaM=
git.coopgo.io/coopgo-platform/fleets v0.0.0-20221205162030-cecdcb20e1d5/go.mod h1:s9OIFCNcjBAbBzRNHwoCTYV6kAntPG9CpT3GVweGdTY=
git.coopgo.io/coopgo-platform/groups-management v0.0.0-20221205161801-9705c8d898f0 h1:CnLKO1kzoGtaqPhDqfOX3WPRFRcJVJZdGzPdBE4X//w=
git.coopgo.io/coopgo-platform/groups-management v0.0.0-20221205161801-9705c8d898f0/go.mod h1:lozSy6qlIIYhvKKXscZzz28HAtS0qBDUTv5nofLRmYA=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
@@ -502,6 +506,8 @@ google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAG
google.golang.org/grpc v1.46.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
google.golang.org/grpc v1.46.2 h1:u+MLGgVf7vRdjEYZ8wDFhAVNmhkbJ5hmrA1LMWK1CAQ=
google.golang.org/grpc v1.46.2/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
google.golang.org/grpc v1.48.0 h1:rQOsyJ/8+ufEDJd/Gdsz7HG220Mh9HAhFHRGnIjda0w=
google.golang.org/grpc v1.48.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
@@ -517,6 +523,8 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w=
google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

1078
grpcapi/lol/agenda.pb.go Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,321 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.12.4
// source: agenda.proto
package grpcapi
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// AgendaClient is the client API for Agenda service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type AgendaClient interface {
CreateEvent(ctx context.Context, in *CreateEventRequest, opts ...grpc.CallOption) (*CreateEventResponse, error)
GetEvent(ctx context.Context, in *GetEventRequest, opts ...grpc.CallOption) (*GetEventResponse, error)
GetEvents(ctx context.Context, in *GetEventsRequest, opts ...grpc.CallOption) (*GetEventsResponse, error)
DeleteEvent(ctx context.Context, in *DeleteEventRequest, opts ...grpc.CallOption) (*DeleteEventResponse, error)
SubscribeEvent(ctx context.Context, in *SubscribeEventRequest, opts ...grpc.CallOption) (*SubscribeEventResponse, error)
UnsubscribeEvent(ctx context.Context, in *UnsubscribeEventRequest, opts ...grpc.CallOption) (*UnsubscribeEventResponse, error)
DeleteSubscription(ctx context.Context, in *DeleteSubscriptionRequest, opts ...grpc.CallOption) (*DeleteSubscriptionResponse, error)
}
type agendaClient struct {
cc grpc.ClientConnInterface
}
func NewAgendaClient(cc grpc.ClientConnInterface) AgendaClient {
return &agendaClient{cc}
}
func (c *agendaClient) CreateEvent(ctx context.Context, in *CreateEventRequest, opts ...grpc.CallOption) (*CreateEventResponse, error) {
out := new(CreateEventResponse)
err := c.cc.Invoke(ctx, "/Agenda/CreateEvent", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *agendaClient) GetEvent(ctx context.Context, in *GetEventRequest, opts ...grpc.CallOption) (*GetEventResponse, error) {
out := new(GetEventResponse)
err := c.cc.Invoke(ctx, "/Agenda/GetEvent", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *agendaClient) GetEvents(ctx context.Context, in *GetEventsRequest, opts ...grpc.CallOption) (*GetEventsResponse, error) {
out := new(GetEventsResponse)
err := c.cc.Invoke(ctx, "/Agenda/GetEvents", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *agendaClient) DeleteEvent(ctx context.Context, in *DeleteEventRequest, opts ...grpc.CallOption) (*DeleteEventResponse, error) {
out := new(DeleteEventResponse)
err := c.cc.Invoke(ctx, "/Agenda/DeleteEvent", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *agendaClient) SubscribeEvent(ctx context.Context, in *SubscribeEventRequest, opts ...grpc.CallOption) (*SubscribeEventResponse, error) {
out := new(SubscribeEventResponse)
err := c.cc.Invoke(ctx, "/Agenda/SubscribeEvent", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *agendaClient) UnsubscribeEvent(ctx context.Context, in *UnsubscribeEventRequest, opts ...grpc.CallOption) (*UnsubscribeEventResponse, error) {
out := new(UnsubscribeEventResponse)
err := c.cc.Invoke(ctx, "/Agenda/UnsubscribeEvent", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *agendaClient) DeleteSubscription(ctx context.Context, in *DeleteSubscriptionRequest, opts ...grpc.CallOption) (*DeleteSubscriptionResponse, error) {
out := new(DeleteSubscriptionResponse)
err := c.cc.Invoke(ctx, "/Agenda/DeleteSubscription", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// AgendaServer is the server API for Agenda service.
// All implementations must embed UnimplementedAgendaServer
// for forward compatibility
type AgendaServer interface {
CreateEvent(context.Context, *CreateEventRequest) (*CreateEventResponse, error)
GetEvent(context.Context, *GetEventRequest) (*GetEventResponse, error)
GetEvents(context.Context, *GetEventsRequest) (*GetEventsResponse, error)
DeleteEvent(context.Context, *DeleteEventRequest) (*DeleteEventResponse, error)
SubscribeEvent(context.Context, *SubscribeEventRequest) (*SubscribeEventResponse, error)
UnsubscribeEvent(context.Context, *UnsubscribeEventRequest) (*UnsubscribeEventResponse, error)
DeleteSubscription(context.Context, *DeleteSubscriptionRequest) (*DeleteSubscriptionResponse, error)
mustEmbedUnimplementedAgendaServer()
}
// UnimplementedAgendaServer must be embedded to have forward compatible implementations.
type UnimplementedAgendaServer struct {
}
func (UnimplementedAgendaServer) CreateEvent(context.Context, *CreateEventRequest) (*CreateEventResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method CreateEvent not implemented")
}
func (UnimplementedAgendaServer) GetEvent(context.Context, *GetEventRequest) (*GetEventResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetEvent not implemented")
}
func (UnimplementedAgendaServer) GetEvents(context.Context, *GetEventsRequest) (*GetEventsResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetEvents not implemented")
}
func (UnimplementedAgendaServer) DeleteEvent(context.Context, *DeleteEventRequest) (*DeleteEventResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method DeleteEvent not implemented")
}
func (UnimplementedAgendaServer) SubscribeEvent(context.Context, *SubscribeEventRequest) (*SubscribeEventResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method SubscribeEvent not implemented")
}
func (UnimplementedAgendaServer) UnsubscribeEvent(context.Context, *UnsubscribeEventRequest) (*UnsubscribeEventResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method UnsubscribeEvent not implemented")
}
func (UnimplementedAgendaServer) DeleteSubscription(context.Context, *DeleteSubscriptionRequest) (*DeleteSubscriptionResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method DeleteSubscription not implemented")
}
func (UnimplementedAgendaServer) mustEmbedUnimplementedAgendaServer() {}
// UnsafeAgendaServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to AgendaServer will
// result in compilation errors.
type UnsafeAgendaServer interface {
mustEmbedUnimplementedAgendaServer()
}
func RegisterAgendaServer(s grpc.ServiceRegistrar, srv AgendaServer) {
s.RegisterService(&Agenda_ServiceDesc, srv)
}
func _Agenda_CreateEvent_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(CreateEventRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(AgendaServer).CreateEvent(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/Agenda/CreateEvent",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(AgendaServer).CreateEvent(ctx, req.(*CreateEventRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Agenda_GetEvent_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GetEventRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(AgendaServer).GetEvent(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/Agenda/GetEvent",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(AgendaServer).GetEvent(ctx, req.(*GetEventRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Agenda_GetEvents_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GetEventsRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(AgendaServer).GetEvents(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/Agenda/GetEvents",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(AgendaServer).GetEvents(ctx, req.(*GetEventsRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Agenda_DeleteEvent_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(DeleteEventRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(AgendaServer).DeleteEvent(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/Agenda/DeleteEvent",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(AgendaServer).DeleteEvent(ctx, req.(*DeleteEventRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Agenda_SubscribeEvent_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(SubscribeEventRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(AgendaServer).SubscribeEvent(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/Agenda/SubscribeEvent",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(AgendaServer).SubscribeEvent(ctx, req.(*SubscribeEventRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Agenda_UnsubscribeEvent_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(UnsubscribeEventRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(AgendaServer).UnsubscribeEvent(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/Agenda/UnsubscribeEvent",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(AgendaServer).UnsubscribeEvent(ctx, req.(*UnsubscribeEventRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Agenda_DeleteSubscription_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(DeleteSubscriptionRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(AgendaServer).DeleteSubscription(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/Agenda/DeleteSubscription",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(AgendaServer).DeleteSubscription(ctx, req.(*DeleteSubscriptionRequest))
}
return interceptor(ctx, in, info, handler)
}
// Agenda_ServiceDesc is the grpc.ServiceDesc for Agenda service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var Agenda_ServiceDesc = grpc.ServiceDesc{
ServiceName: "Agenda",
HandlerType: (*AgendaServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "CreateEvent",
Handler: _Agenda_CreateEvent_Handler,
},
{
MethodName: "GetEvent",
Handler: _Agenda_GetEvent_Handler,
},
{
MethodName: "GetEvents",
Handler: _Agenda_GetEvents_Handler,
},
{
MethodName: "DeleteEvent",
Handler: _Agenda_DeleteEvent_Handler,
},
{
MethodName: "SubscribeEvent",
Handler: _Agenda_SubscribeEvent_Handler,
},
{
MethodName: "UnsubscribeEvent",
Handler: _Agenda_UnsubscribeEvent_Handler,
},
{
MethodName: "DeleteSubscription",
Handler: _Agenda_DeleteSubscription_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "agenda.proto",
}

View File

@@ -18,6 +18,7 @@ func main() {
var (
service_name = cfg.GetString("name")
grpc_enable = cfg.GetBool("services.grpc.enable")
dev_env = cfg.GetBool("dev_env")
)
storage, err := storage.NewStorage(cfg)
@@ -28,6 +29,9 @@ func main() {
handler := handlers.NewHandler(cfg, storage)
fmt.Println("Running", service_name, ":")
if dev_env {
fmt.Printf("\033]0;%s\007", service_name)
}
failed := make(chan error)

513
storage/postgresql.go Normal file
View File

@@ -0,0 +1,513 @@
package storage
import (
"ariga.io/atlas/sql/postgres"
"ariga.io/atlas/sql/schema"
"context"
"database/sql"
"encoding/json"
"fmt"
"github.com/lib/pq"
"github.com/spf13/viper"
"os"
"strconv"
)
type PostgresqlStorage struct {
DbConnection *sql.DB
Schema string
Tables map[string]string
}
func NewPostgresqlStorage(cfg *viper.Viper) (PostgresqlStorage, error) {
var (
host = cfg.GetString("storage.db.psql.host")
port = cfg.GetString("storage.db.psql.port")
user = cfg.GetString("storage.db.psql.user")
password = cfg.GetString("storage.db.psql.password")
dbname = cfg.GetString("storage.db.psql.dbname")
sslmode = cfg.GetString("storage.db.psql.sslmode")
pg_schema = cfg.GetString("storage.db.psql.schema")
pgtables_event = cfg.GetString("storage.db.psql.tables.event")
pgtables_subscription = cfg.GetString("storage.db.psql.tables.subscription")
timezone = "Europe/Paris"
)
portInt, _ := strconv.Atoi(port)
psqlconn := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s TimeZone=%s", host, portInt,
user, password, dbname, sslmode, timezone)
db, err := sql.Open("postgres", psqlconn)
if err != nil {
fmt.Println("error", err)
return PostgresqlStorage{}, fmt.Errorf("connection to postgresql failed")
}
err = db.Ping()
if err != nil {
fmt.Println(err)
return PostgresqlStorage{}, fmt.Errorf("connection to postgresql database failed")
}
return PostgresqlStorage{
DbConnection: db,
Schema: pg_schema,
Tables: map[string]string{
"event": fmt.Sprintf("%s.%s", pg_schema, pgtables_event),
"subscription": fmt.Sprintf("%s.%s", pg_schema, pgtables_subscription),
},
}, nil
}
func (psql PostgresqlStorage) CreateEvent(e Event) error {
tx, err := psql.DbConnection.Begin()
if err != nil {
return err
}
defer func() {
if err != nil {
tx.Rollback()
return
}
tx.Commit()
}()
dataEvent, err := json.Marshal(e.Data)
if err != nil {
return err
}
deletedSubscriptionsJSON, err := json.Marshal(e.DeletedSubscription)
if err != nil {
return err
}
eventQuery := fmt.Sprintf(`
INSERT INTO %s (id, namespace, owners, restricted_to, type, name, description, startdate,
enddate, starttime, endtime, allday, maxsubscribers, data, deleted, deletedsubscriptions)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
`, psql.Tables["event"])
ownersArray, err := pq.Array(e.Owners).Value()
if err != nil {
return err
}
restrictedToArray, err := pq.Array(e.RestrictedTo).Value()
if err != nil {
return err
}
_, err = tx.Exec(
eventQuery,
e.ID,
e.Namespace,
ownersArray,
restrictedToArray,
e.Type,
e.Name,
e.Description,
e.Startdate,
e.Enddate,
e.Starttime,
e.Endtime,
e.Allday,
e.MaxSubscribers,
dataEvent,
e.Deleted,
deletedSubscriptionsJSON,
)
if err != nil {
return err
}
for _, subscription := range e.Subscriptions {
subscriptionQuery := fmt.Sprintf(`
INSERT INTO %s (id, event_id, subscriber, tags, created_at, data)
VALUES ($1, $2, $3, $4, $5, $6)
`, psql.Tables["subscription"])
dataSubscription, err := json.Marshal(subscription.Data)
if err != nil {
return err
}
tagsArray, err := pq.Array(subscription.Tags).Value()
if err != nil {
return err
}
_, err = tx.Exec(
subscriptionQuery,
subscription.ID,
e.ID,
subscription.Subscriber,
tagsArray,
subscription.CreatedAt,
dataSubscription,
)
if err != nil {
return err
}
}
return nil
}
func (psql PostgresqlStorage) GetEvent(eventID string) (*Event, error) {
var event Event
eventQuery := fmt.Sprintf(`
SELECT id, namespace, owners, restricted_to, type, name, description, startdate,
enddate, starttime, endtime, allday, maxsubscribers, data, deleted, deletedsubscriptions
FROM %s
WHERE id = $1
`, psql.Tables["event"])
row := psql.DbConnection.QueryRow(eventQuery, eventID)
owners := pq.StringArray{}
restrictedTo := pq.StringArray{}
dataEvent := []byte{}
deletedSubscriptions := []byte{}
err := row.Scan(
&event.ID,
&event.Namespace,
&owners,
&restrictedTo,
&event.Type,
&event.Name,
&event.Description,
&event.Startdate,
&event.Enddate,
&event.Starttime,
&event.Endtime,
&event.Allday,
&event.MaxSubscribers,
&dataEvent,
&event.Deleted,
&deletedSubscriptions,
)
if err != nil {
return nil, err
}
event.Owners = []string(owners)
event.RestrictedTo = []string(restrictedTo)
data := make(map[string]any)
err = json.Unmarshal(dataEvent, &data)
if err != nil {
return nil, err
}
event.Data = data
subscriptions, err := psql.getSubscriptions(eventID)
if err != nil {
return nil, err
}
event.Subscriptions = subscriptions
deletedSubs := []Subscription{}
err = json.Unmarshal(deletedSubscriptions, &deletedSubs)
if err != nil {
return nil, err
}
event.DeletedSubscription = deletedSubs
return &event, nil
}
func (psql PostgresqlStorage) GetEvents(namespaces []string) ([]Event, error) {
var events []Event
eventQuery := fmt.Sprintf(`
SELECT id, namespace, owners, restricted_to, type, name, description, startdate,
enddate, starttime, endtime, allday, maxsubscribers, data, deletedsubscriptions, deleted
FROM %s
WHERE namespace = ANY($1::text[])
`, psql.Tables["event"])
rows, err := psql.DbConnection.Query(eventQuery, pq.Array(namespaces))
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var event Event
var owners pq.StringArray
var restrictedTo pq.StringArray
var dataEvent []byte
var deletedSubscriptions []byte
err := rows.Scan(
&event.ID,
&event.Namespace,
&owners,
&restrictedTo,
&event.Type,
&event.Name,
&event.Description,
&event.Startdate,
&event.Enddate,
&event.Starttime,
&event.Endtime,
&event.Allday,
&event.MaxSubscribers,
&dataEvent,
&deletedSubscriptions,
&event.Deleted,
)
if err != nil {
return nil, err
}
event.Owners = []string(owners)
event.RestrictedTo = []string(restrictedTo)
err = json.Unmarshal(dataEvent, &event.Data)
if err != nil {
return nil, err
}
event.Subscriptions, err = psql.getSubscriptions(event.ID)
if err != nil {
return nil, err
}
err = json.Unmarshal(deletedSubscriptions, &event.DeletedSubscription)
if err != nil {
return nil, err
}
events = append(events, event)
}
return events, nil
}
func (psql PostgresqlStorage) UpdateEvent(e Event) error {
tx, err := psql.DbConnection.Begin()
if err != nil {
return err
}
defer func() {
if err != nil {
tx.Rollback()
return
}
tx.Commit()
}()
dataEvent, err := json.Marshal(e.Data)
if err != nil {
return err
}
deletedSubscriptions, err := json.Marshal(e.DeletedSubscription)
if err != nil {
return err
}
eventQuery := fmt.Sprintf(`
UPDATE %s
SET namespace = $2,
owners = $3,
restricted_to = $4,
type = $5,
name = $6,
description = $7,
startdate = $8,
enddate = $9,
starttime = $10,
endtime = $11,
allday = $12,
maxsubscribers = $13,
data = $14,
deleted = $15,
deletedsubscriptions = $16
WHERE id = $1
`, psql.Tables["event"])
ownersArray, err := pq.Array(e.Owners).Value()
if err != nil {
return err
}
restrictedToArray, err := pq.Array(e.RestrictedTo).Value()
if err != nil {
return err
}
_, err = tx.Exec(
eventQuery,
e.ID,
e.Namespace,
ownersArray,
restrictedToArray,
e.Type,
e.Name,
e.Description,
e.Startdate,
e.Enddate,
e.Starttime,
e.Endtime,
e.Allday,
e.MaxSubscribers,
dataEvent,
e.Deleted,
deletedSubscriptions,
)
if err != nil {
return err
}
for _, subscription := range e.Subscriptions {
subscriptionQuery := fmt.Sprintf(`
UPDATE %s
SET subscriber = $2,
tags = $3,
data = $4,
created_at= $5
WHERE event_id = $1
`, psql.Tables["subscription"])
dataSubscription, err := json.Marshal(subscription.Data)
if err != nil {
return err
}
tagsArray, err := pq.Array(subscription.Tags).Value()
if err != nil {
return err
}
_, err = tx.Exec(
subscriptionQuery,
e.ID,
subscription.Subscriber,
tagsArray,
dataSubscription,
subscription.CreatedAt,
)
if err != nil {
return err
}
}
return nil
}
func (psql PostgresqlStorage) AddSubscription(eventid string, subscription Subscription) error {
tags := pq.Array(subscription.Tags)
data, err := json.Marshal(subscription.Data)
if err != nil {
return err
}
_, err = psql.DbConnection.Exec(fmt.Sprintf(`
INSERT INTO %s (id, event_id, subscriber, tags, created_at, data)
VALUES ($1, $2, $3, $4, $5, $6)
`, psql.Tables["subscription"]),
subscription.ID,
eventid,
subscription.Subscriber,
tags,
subscription.CreatedAt,
data,
)
return err
}
func (psql PostgresqlStorage) UpdateSubscription(eventid string, subscriber string, deletesubscription Subscription) error {
tx, err := psql.DbConnection.Begin()
if err != nil {
return err
}
defer func() {
if err != nil {
tx.Rollback()
return
}
tx.Commit()
}()
subscriptionQuery := fmt.Sprintf(`
DELETE FROM %s
WHERE event_id = $1 AND subscriber = $2
`, psql.Tables["subscription"])
_, err = tx.Exec(
subscriptionQuery,
eventid,
subscriber,
)
if err != nil {
fmt.Println(err)
return err
}
eventQuery := fmt.Sprintf(`
UPDATE %s
SET deletedsubscriptions = deletedsubscriptions || $1
WHERE id = $2
`, psql.Tables["event"])
deletedSubscriptions, err := json.Marshal(deletesubscription)
if err != nil {
fmt.Println(err)
return err
}
_, err = tx.Exec(
eventQuery,
deletedSubscriptions,
eventid,
)
if err != nil {
fmt.Println(err)
return err
}
return nil
}
func (psql PostgresqlStorage) getSubscriptions(eventID string) ([]Subscription, error) {
var subscriptions []Subscription
subscriptionQuery := fmt.Sprintf(`
SELECT id, subscriber, tags, created_at, data
FROM %s
WHERE event_id = $1
`, psql.Tables["subscription"])
rows, err := psql.DbConnection.Query(subscriptionQuery, eventID)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var subscription Subscription
var tags pq.StringArray
var dataSubscription []byte
err := rows.Scan(
&subscription.ID,
&subscription.Subscriber,
&tags,
&subscription.CreatedAt,
&dataSubscription,
)
if err != nil {
return nil, err
}
subscription.Tags = []string(tags)
data := make(map[string]any)
err = json.Unmarshal(dataSubscription, &data)
if err != nil {
return nil, err
}
subscription.Data = data
subscriptions = append(subscriptions, subscription)
}
return subscriptions, nil
}
func (psql PostgresqlStorage) Migrate() error {
ctx := context.Background()
driver, err := postgres.Open(psql.DbConnection)
if err != nil {
return err
}
existing, err := driver.InspectRealm(ctx, &schema.InspectRealmOption{Schemas: []string{psql.Schema}})
if err != nil {
return err
}
var desired schema.Realm
hcl, err := os.ReadFile("postgresql/schema.hcl")
if err != nil {
return err
}
err = postgres.EvalHCLBytes(hcl, &desired, nil)
if err != nil {
return err
}
diff, err := driver.RealmDiff(existing, &desired)
if err != nil {
return err
}
err = driver.ApplyChanges(ctx, diff)
if err != nil {
return err
}
return nil
}

View File

@@ -0,0 +1,110 @@
table "event" {
schema = schema.agenda
column "id" {
null = false
type = uuid
}
column "namespace" {
null = true
type = text
}
column "owners" {
null = true
type = sql("text[]")
}
column "restricted_to" {
null = true
type = sql("text[]")
}
column "type" {
null = true
type = text
}
column "name" {
null = true
type = text
}
column "description" {
null = true
type = text
}
column "startdate" {
null = true
type = timestamptz
}
column "enddate" {
null = true
type = timestamptz
}
column "starttime" {
null = true
type = text
}
column "endtime" {
null = true
type = text
}
column "allday" {
null = true
type = boolean
}
column "maxsubscribers" {
null = true
type = bigint
}
column "data" {
null = true
type = jsonb
}
column "deleted" {
null = true
type = boolean
}
column "deletedsubscriptions" {
null = true
type = jsonb
}
primary_key {
columns = [column.id]
}
}
table "subscription" {
schema = schema.agenda
column "id" {
null = false
type = uuid
}
column "event_id" {
null = true
type = uuid
}
column "subscriber" {
null = true
type = text
}
column "tags" {
null = true
type = sql("text[]")
}
column "created_at" {
null = true
type = timestamptz
}
column "data" {
null = true
type = jsonb
}
primary_key {
columns = [column.id]
}
foreign_key "subscription_event_id_fkey" {
columns = [column.event_id]
ref_columns = [table.event.column.id]
on_update = NO_ACTION
on_delete = NO_ACTION
}
}
schema "agenda" {
}

277
storage/postgresql_test.go Normal file
View File

@@ -0,0 +1,277 @@
package storage
import (
"context"
"fmt"
"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
"github.com/spf13/viper"
"testing"
"time"
)
var cfg *viper.Viper
var date, _ = time.Parse(time.RFC3339Nano, "2023-05-04T16:00:50.165419+02:00")
var event = Event{
ID: uuid.New().String(),
Namespace: "test_namespace",
Owners: []string{"owners", "owners"},
RestrictedTo: []string{"restricted_to", "restricted_to"},
Type: "type",
Name: "test",
Description: "description",
Startdate: date,
Enddate: date,
Starttime: "starttime",
Endtime: "endtime",
Allday: false,
MaxSubscribers: 23,
Subscriptions: []Subscription{
{
ID: uuid.NewString(),
Subscriber: "subscriber1",
Tags: []string{"tag1", "tag2"},
CreatedAt: date,
Data: map[string]any{
"test": "test",
},
},
},
DeletedSubscription: []Subscription{
{
ID: uuid.NewString(),
Subscriber: "subscriber_deleted",
Tags: []string{"tag1", "tag2"},
CreatedAt: date,
Data: map[string]any{
"deleted": "deleted",
},
},
},
Data: map[string]any{
"data": "data",
},
Deleted: false,
}
var event1 = Event{
ID: uuid.New().String(),
Namespace: "test_namespace",
Owners: []string{"owners", "owners"},
RestrictedTo: []string{"restricted_to", "restricted_to"},
Type: "type",
Name: "test",
Description: "description",
Startdate: date,
Enddate: date,
Starttime: "starttime",
Endtime: "endtime",
Allday: false,
MaxSubscribers: 23,
Subscriptions: []Subscription{
{
ID: uuid.NewString(),
Subscriber: "subscriber1",
Tags: []string{"tag1", "tag2"},
CreatedAt: date,
Data: map[string]any{
"test": "test",
},
},
},
DeletedSubscription: []Subscription{
{
ID: uuid.NewString(),
Subscriber: "subscriber_deleted",
Tags: []string{"tag1", "tag2"},
CreatedAt: date,
Data: map[string]any{
"deleted": "deleted",
},
},
},
Data: map[string]any{
"data": "data",
},
Deleted: false,
}
func init() {
cfg = viper.New()
cfg.SetDefault("storage.db.psql.host", "localhost")
cfg.SetDefault("storage.db.psql.port", "5432")
cfg.SetDefault("storage.db.psql.user", "postgres")
cfg.SetDefault("storage.db.psql.password", "postgres")
cfg.SetDefault("storage.db.psql.dbname", "coopgo_platform")
cfg.SetDefault("storage.db.psql.sslmode", "disable")
cfg.SetDefault("storage.db.psql.schema", "agenda")
cfg.SetDefault("storage.db.psql.tables.event", "event")
cfg.SetDefault("storage.db.psql.tables.subscription", "subscription")
cfg.SetConfigName("config") // Override default values in a config.yaml file within this directory
cfg.AddConfigPath(".")
cfg.ReadInConfig()
}
func TestPostgresqlStorage_Initialize(t *testing.T) {
storage, err := NewPostgresqlStorage(cfg)
if err != nil {
t.Errorf("error creating new PostgreSQL storage: %v", err)
}
defer storage.DbConnection.Close()
err = storage.Migrate()
if err != nil {
t.Errorf("database migration issue: %v", err)
return
}
tx, err := storage.DbConnection.BeginTx(context.Background(), nil)
if err != nil {
t.Errorf("transaction issue: %v", err)
return
}
defer tx.Rollback()
_, err = tx.Exec(fmt.Sprintf("DELETE FROM %s;", storage.Tables["subscription"]))
if err != nil {
t.Errorf("delete accounts subscription issue: %v", err)
return
}
_, err = tx.Exec(fmt.Sprintf("DELETE FROM %s;", storage.Tables["event"]))
if err != nil {
t.Errorf("delete accounts event issue: %v", err)
return
}
if err = tx.Commit(); err != nil {
t.Errorf("commit transaction issue: %v", err)
return
}
}
func TestPostgresqlStorage_CreateAndGetEvent(t *testing.T) {
storage, err := NewPostgresqlStorage(cfg)
if err != nil {
t.Errorf("error creating new PostgreSQL storage: %v", err)
}
err = storage.CreateEvent(event)
if err != nil {
t.Error(err)
}
retrieved_event, err := storage.GetEvent(event.ID)
if err != nil {
t.Error(err)
}
diff := cmp.Diff(&event, retrieved_event)
if diff != "" {
fmt.Printf("The retrieved event differs from the original event:\n%s", diff)
t.Fail()
}
}
func TestPostgresqlStorage_GetEvents(t *testing.T) {
storage, err := NewPostgresqlStorage(cfg)
if err != nil {
t.Errorf("error creating new PostgreSQL storage: %v", err)
}
event.Namespace = "test"
err = storage.CreateEvent(event)
if err != nil {
t.Error(err)
}
event1.Namespace = "test1"
err = storage.CreateEvent(event1)
if err != nil {
t.Error(err)
}
events, err := storage.GetEvents([]string{"test", "test1"})
if err != nil {
t.Error(err)
}
diff := cmp.Diff(events, []Event{event, event1})
if diff != "" {
fmt.Printf("The retrieved event differs from the original event:\n%s", diff)
t.Fail()
}
}
func TestPostgresqlStorage_AddSubscription(t *testing.T) {
storage, err := NewPostgresqlStorage(cfg)
if err != nil {
t.Errorf("error creating new PostgreSQL storage: %v", err)
}
err = storage.CreateEvent(event)
if err != nil {
t.Error(err)
}
subscription := Subscription{
ID: uuid.NewString(),
Subscriber: "salim",
Tags: []string{"tag"},
CreatedAt: date,
Data: map[string]any{"data": "data"},
}
err = storage.AddSubscription(event.ID, subscription)
stored_event, err := storage.GetEvent(event.ID)
if err != nil {
t.Error(err)
}
expected_event := Event{
ID: event.ID,
Namespace: event.Namespace,
Owners: event.Owners,
RestrictedTo: event.RestrictedTo,
Type: event.Type,
Name: event.Name,
Description: event.Description,
Startdate: event.Startdate,
Enddate: event.Enddate,
Starttime: event.Starttime,
Endtime: event.Endtime,
Allday: event.Allday,
MaxSubscribers: event.MaxSubscribers,
Subscriptions: []Subscription{event.Subscriptions[0], subscription},
DeletedSubscription: event.DeletedSubscription,
Data: event.Data,
Deleted: event.Deleted,
}
diff := cmp.Diff(stored_event, &expected_event)
if diff != "" {
fmt.Printf("The retrieved event differs from the original event:\n%s", diff)
t.Fail()
}
}
func TestPostgresqlStorage_UpdateEvent(t *testing.T) {
storage, err := NewPostgresqlStorage(cfg)
if err != nil {
t.Errorf("error creating new PostgreSQL storage: %v", err)
}
err = storage.CreateEvent(event)
if err != nil {
t.Error(err)
}
subscription := Subscription{
ID: event.Subscriptions[0].ID,
Subscriber: "updated",
Tags: []string{"tag"},
CreatedAt: date,
Data: map[string]any{"data": "data"},
}
event.Subscriptions[0] = subscription
err = storage.UpdateEvent(event)
if err != nil {
t.Error(err)
}
retrieved_event, err := storage.GetEvent(event.ID)
if err != nil {
t.Error(err)
}
diff := cmp.Diff(retrieved_event, &event)
if diff != "" {
fmt.Printf("The retrieved event differs from the original event:\n%s", diff)
t.Fail()
}
}

View File

@@ -26,6 +26,9 @@ func NewStorage(cfg *viper.Viper) (Storage, error) {
case "mongodb":
s, err := NewMongoDBStorage(cfg)
return s, err
case "psql":
s, err := NewPostgresqlStorage(cfg)
return s, err
default:
return nil, fmt.Errorf("storage type %v is not supported", storage_type)
}