From f06ed52547c4c8c90e9531fdeb47b560fa1fdda4 Mon Sep 17 00:00:00 2001 From: sbouaram Date: Tue, 9 May 2023 10:42:52 +0200 Subject: [PATCH] agenda postgresql --- .gitignore | 1 + storage/postgresql.go | 513 ++++++++++++++++++++++++++++++++++ storage/postgresql/schema.hcl | 110 ++++++++ storage/postgresql_test.go | 277 ++++++++++++++++++ storage/storage.go | 3 + 5 files changed, 904 insertions(+) create mode 100644 storage/postgresql.go create mode 100644 storage/postgresql/schema.hcl create mode 100644 storage/postgresql_test.go diff --git a/.gitignore b/.gitignore index f702e7b..f77345f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ config.yaml .vscode +.idea __debug_bin \ No newline at end of file diff --git a/storage/postgresql.go b/storage/postgresql.go new file mode 100644 index 0000000..e0c536d --- /dev/null +++ b/storage/postgresql.go @@ -0,0 +1,513 @@ +package storage + +import ( + "ariga.io/atlas/sql/postgres" + "ariga.io/atlas/sql/schema" + "context" + "database/sql" + "encoding/json" + "fmt" + "github.com/lib/pq" + "github.com/spf13/viper" + "os" + "strconv" +) + +type PostgresqlStorage struct { + DbConnection *sql.DB + Schema string + Tables map[string]string +} + +func NewPostgresqlStorage(cfg *viper.Viper) (PostgresqlStorage, error) { + var ( + host = cfg.GetString("storage.db.psql.host") + port = cfg.GetString("storage.db.psql.port") + user = cfg.GetString("storage.db.psql.user") + password = cfg.GetString("storage.db.psql.password") + dbname = cfg.GetString("storage.db.psql.dbname") + sslmode = cfg.GetString("storage.db.psql.sslmode") + pg_schema = cfg.GetString("storage.db.psql.schema") + pgtables_event = cfg.GetString("storage.db.psql.tables.event") + pgtables_subscription = cfg.GetString("storage.db.psql.tables.subscription") + timezone = "Europe/Paris" + ) + portInt, _ := strconv.Atoi(port) + psqlconn := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s TimeZone=%s", host, portInt, + user, password, dbname, sslmode, timezone) + db, err := sql.Open("postgres", psqlconn) + if err != nil { + fmt.Println("error", err) + return PostgresqlStorage{}, fmt.Errorf("connection to postgresql failed") + } + err = db.Ping() + if err != nil { + fmt.Println(err) + return PostgresqlStorage{}, fmt.Errorf("connection to postgresql database failed") + } + return PostgresqlStorage{ + DbConnection: db, + Schema: pg_schema, + Tables: map[string]string{ + "event": fmt.Sprintf("%s.%s", pg_schema, pgtables_event), + "subscription": fmt.Sprintf("%s.%s", pg_schema, pgtables_subscription), + }, + }, nil +} + +func (psql PostgresqlStorage) CreateEvent(e Event) error { + tx, err := psql.DbConnection.Begin() + if err != nil { + return err + } + defer func() { + if err != nil { + tx.Rollback() + return + } + tx.Commit() + }() + dataEvent, err := json.Marshal(e.Data) + if err != nil { + return err + } + + deletedSubscriptionsJSON, err := json.Marshal(e.DeletedSubscription) + if err != nil { + return err + } + + eventQuery := fmt.Sprintf(` + INSERT INTO %s (id, namespace, owners, restricted_to, type, name, description, startdate, + enddate, starttime, endtime, allday, maxsubscribers, data, deleted, deletedsubscriptions) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) + `, psql.Tables["event"]) + + ownersArray, err := pq.Array(e.Owners).Value() + if err != nil { + return err + } + + restrictedToArray, err := pq.Array(e.RestrictedTo).Value() + if err != nil { + return err + } + _, err = tx.Exec( + eventQuery, + e.ID, + e.Namespace, + ownersArray, + restrictedToArray, + e.Type, + e.Name, + e.Description, + e.Startdate, + e.Enddate, + e.Starttime, + e.Endtime, + e.Allday, + e.MaxSubscribers, + dataEvent, + e.Deleted, + deletedSubscriptionsJSON, + ) + + if err != nil { + return err + } + for _, subscription := range e.Subscriptions { + subscriptionQuery := fmt.Sprintf(` + INSERT INTO %s (id, event_id, subscriber, tags, created_at, data) + VALUES ($1, $2, $3, $4, $5, $6) + `, psql.Tables["subscription"]) + dataSubscription, err := json.Marshal(subscription.Data) + if err != nil { + return err + } + tagsArray, err := pq.Array(subscription.Tags).Value() + if err != nil { + return err + } + _, err = tx.Exec( + subscriptionQuery, + subscription.ID, + e.ID, + subscription.Subscriber, + tagsArray, + subscription.CreatedAt, + dataSubscription, + ) + + if err != nil { + return err + } + } + return nil +} + +func (psql PostgresqlStorage) GetEvent(eventID string) (*Event, error) { + var event Event + eventQuery := fmt.Sprintf(` + SELECT id, namespace, owners, restricted_to, type, name, description, startdate, + enddate, starttime, endtime, allday, maxsubscribers, data, deleted, deletedsubscriptions + FROM %s + WHERE id = $1 + `, psql.Tables["event"]) + row := psql.DbConnection.QueryRow(eventQuery, eventID) + owners := pq.StringArray{} + restrictedTo := pq.StringArray{} + dataEvent := []byte{} + deletedSubscriptions := []byte{} + err := row.Scan( + &event.ID, + &event.Namespace, + &owners, + &restrictedTo, + &event.Type, + &event.Name, + &event.Description, + &event.Startdate, + &event.Enddate, + &event.Starttime, + &event.Endtime, + &event.Allday, + &event.MaxSubscribers, + &dataEvent, + &event.Deleted, + &deletedSubscriptions, + ) + if err != nil { + return nil, err + } + event.Owners = []string(owners) + event.RestrictedTo = []string(restrictedTo) + data := make(map[string]any) + err = json.Unmarshal(dataEvent, &data) + if err != nil { + return nil, err + } + event.Data = data + subscriptions, err := psql.getSubscriptions(eventID) + if err != nil { + return nil, err + } + event.Subscriptions = subscriptions + deletedSubs := []Subscription{} + err = json.Unmarshal(deletedSubscriptions, &deletedSubs) + if err != nil { + return nil, err + } + event.DeletedSubscription = deletedSubs + return &event, nil +} + +func (psql PostgresqlStorage) GetEvents(namespaces []string) ([]Event, error) { + var events []Event + eventQuery := fmt.Sprintf(` + SELECT id, namespace, owners, restricted_to, type, name, description, startdate, + enddate, starttime, endtime, allday, maxsubscribers, data, deletedsubscriptions, deleted + FROM %s + WHERE namespace = ANY($1::text[]) + `, psql.Tables["event"]) + rows, err := psql.DbConnection.Query(eventQuery, pq.Array(namespaces)) + if err != nil { + return nil, err + } + defer rows.Close() + for rows.Next() { + var event Event + var owners pq.StringArray + var restrictedTo pq.StringArray + var dataEvent []byte + var deletedSubscriptions []byte + err := rows.Scan( + &event.ID, + &event.Namespace, + &owners, + &restrictedTo, + &event.Type, + &event.Name, + &event.Description, + &event.Startdate, + &event.Enddate, + &event.Starttime, + &event.Endtime, + &event.Allday, + &event.MaxSubscribers, + &dataEvent, + &deletedSubscriptions, + &event.Deleted, + ) + if err != nil { + return nil, err + } + event.Owners = []string(owners) + event.RestrictedTo = []string(restrictedTo) + err = json.Unmarshal(dataEvent, &event.Data) + if err != nil { + return nil, err + } + event.Subscriptions, err = psql.getSubscriptions(event.ID) + if err != nil { + return nil, err + } + err = json.Unmarshal(deletedSubscriptions, &event.DeletedSubscription) + if err != nil { + return nil, err + } + events = append(events, event) + } + return events, nil +} + +func (psql PostgresqlStorage) UpdateEvent(e Event) error { + tx, err := psql.DbConnection.Begin() + if err != nil { + return err + } + defer func() { + if err != nil { + tx.Rollback() + return + } + tx.Commit() + }() + dataEvent, err := json.Marshal(e.Data) + if err != nil { + return err + } + deletedSubscriptions, err := json.Marshal(e.DeletedSubscription) + if err != nil { + return err + } + eventQuery := fmt.Sprintf(` + UPDATE %s + SET namespace = $2, + owners = $3, + restricted_to = $4, + type = $5, + name = $6, + description = $7, + startdate = $8, + enddate = $9, + starttime = $10, + endtime = $11, + allday = $12, + maxsubscribers = $13, + data = $14, + deleted = $15, + deletedsubscriptions = $16 + WHERE id = $1 + `, psql.Tables["event"]) + + ownersArray, err := pq.Array(e.Owners).Value() + if err != nil { + return err + } + + restrictedToArray, err := pq.Array(e.RestrictedTo).Value() + if err != nil { + return err + } + _, err = tx.Exec( + eventQuery, + e.ID, + e.Namespace, + ownersArray, + restrictedToArray, + e.Type, + e.Name, + e.Description, + e.Startdate, + e.Enddate, + e.Starttime, + e.Endtime, + e.Allday, + e.MaxSubscribers, + dataEvent, + e.Deleted, + deletedSubscriptions, + ) + + if err != nil { + return err + } + + for _, subscription := range e.Subscriptions { + subscriptionQuery := fmt.Sprintf(` + UPDATE %s + SET subscriber = $2, + tags = $3, + data = $4, + created_at= $5 + WHERE event_id = $1 + `, psql.Tables["subscription"]) + + dataSubscription, err := json.Marshal(subscription.Data) + if err != nil { + return err + } + tagsArray, err := pq.Array(subscription.Tags).Value() + if err != nil { + return err + } + _, err = tx.Exec( + subscriptionQuery, + e.ID, + subscription.Subscriber, + tagsArray, + dataSubscription, + subscription.CreatedAt, + ) + + if err != nil { + return err + } + } + + return nil + +} + +func (psql PostgresqlStorage) AddSubscription(eventid string, subscription Subscription) error { + tags := pq.Array(subscription.Tags) + data, err := json.Marshal(subscription.Data) + if err != nil { + return err + } + _, err = psql.DbConnection.Exec(fmt.Sprintf(` + INSERT INTO %s (id, event_id, subscriber, tags, created_at, data) + VALUES ($1, $2, $3, $4, $5, $6) + `, psql.Tables["subscription"]), + subscription.ID, + eventid, + subscription.Subscriber, + tags, + subscription.CreatedAt, + data, + ) + return err +} + +func (psql PostgresqlStorage) UpdateSubscription(eventid string, subscriber string, deletesubscription Subscription) error { + tx, err := psql.DbConnection.Begin() + if err != nil { + return err + } + defer func() { + if err != nil { + tx.Rollback() + return + } + tx.Commit() + }() + + subscriptionQuery := fmt.Sprintf(` + DELETE FROM %s + WHERE event_id = $1 AND subscriber = $2 + `, psql.Tables["subscription"]) + _, err = tx.Exec( + subscriptionQuery, + eventid, + subscriber, + ) + if err != nil { + fmt.Println(err) + return err + } + eventQuery := fmt.Sprintf(` + UPDATE %s + SET deletedsubscriptions = deletedsubscriptions || $1 + WHERE id = $2 +`, psql.Tables["event"]) + deletedSubscriptions, err := json.Marshal(deletesubscription) + if err != nil { + fmt.Println(err) + return err + } + _, err = tx.Exec( + eventQuery, + deletedSubscriptions, + eventid, + ) + if err != nil { + fmt.Println(err) + return err + } + + return nil +} + +func (psql PostgresqlStorage) getSubscriptions(eventID string) ([]Subscription, error) { + var subscriptions []Subscription + subscriptionQuery := fmt.Sprintf(` + SELECT id, subscriber, tags, created_at, data + FROM %s + WHERE event_id = $1 + `, psql.Tables["subscription"]) + rows, err := psql.DbConnection.Query(subscriptionQuery, eventID) + if err != nil { + return nil, err + } + defer rows.Close() + for rows.Next() { + var subscription Subscription + var tags pq.StringArray + var dataSubscription []byte + err := rows.Scan( + &subscription.ID, + &subscription.Subscriber, + &tags, + &subscription.CreatedAt, + &dataSubscription, + ) + if err != nil { + return nil, err + } + subscription.Tags = []string(tags) + data := make(map[string]any) + err = json.Unmarshal(dataSubscription, &data) + if err != nil { + return nil, err + } + subscription.Data = data + subscriptions = append(subscriptions, subscription) + } + return subscriptions, nil +} + +func (psql PostgresqlStorage) Migrate() error { + ctx := context.Background() + driver, err := postgres.Open(psql.DbConnection) + if err != nil { + return err + } + existing, err := driver.InspectRealm(ctx, &schema.InspectRealmOption{Schemas: []string{psql.Schema}}) + if err != nil { + return err + } + + var desired schema.Realm + + hcl, err := os.ReadFile("postgresql/schema.hcl") + if err != nil { + return err + } + + err = postgres.EvalHCLBytes(hcl, &desired, nil) + if err != nil { + return err + } + + diff, err := driver.RealmDiff(existing, &desired) + if err != nil { + return err + } + + err = driver.ApplyChanges(ctx, diff) + if err != nil { + return err + } + + return nil +} diff --git a/storage/postgresql/schema.hcl b/storage/postgresql/schema.hcl new file mode 100644 index 0000000..86d5eb9 --- /dev/null +++ b/storage/postgresql/schema.hcl @@ -0,0 +1,110 @@ +table "event" { + schema = schema.agenda + column "id" { + null = false + type = uuid + } + column "namespace" { + null = true + type = text + } + column "owners" { + null = true + type = sql("text[]") + } + column "restricted_to" { + null = true + type = sql("text[]") + } + column "type" { + null = true + type = text + } + column "name" { + null = true + type = text + } + column "description" { + null = true + type = text + } + column "startdate" { + null = true + type = timestamptz + } + column "enddate" { + null = true + type = timestamptz + } + column "starttime" { + null = true + type = text + } + column "endtime" { + null = true + type = text + } + column "allday" { + null = true + type = boolean + } + column "maxsubscribers" { + null = true + type = bigint + } + column "data" { + null = true + type = jsonb + } + column "deleted" { + null = true + type = boolean + } + + column "deletedsubscriptions" { + null = true + type = jsonb + } + primary_key { + columns = [column.id] + } +} +table "subscription" { + schema = schema.agenda + column "id" { + null = false + type = uuid + } + column "event_id" { + null = true + type = uuid + } + column "subscriber" { + null = true + type = text + } + column "tags" { + null = true + type = sql("text[]") + } + column "created_at" { + null = true + type = timestamptz + } + column "data" { + null = true + type = jsonb + } + primary_key { + columns = [column.id] + } + foreign_key "subscription_event_id_fkey" { + columns = [column.event_id] + ref_columns = [table.event.column.id] + on_update = NO_ACTION + on_delete = NO_ACTION + } +} + +schema "agenda" { +} \ No newline at end of file diff --git a/storage/postgresql_test.go b/storage/postgresql_test.go new file mode 100644 index 0000000..9e72776 --- /dev/null +++ b/storage/postgresql_test.go @@ -0,0 +1,277 @@ +package storage + +import ( + "context" + "fmt" + "github.com/google/go-cmp/cmp" + "github.com/google/uuid" + "github.com/spf13/viper" + "testing" + "time" +) + +var cfg *viper.Viper + +var date, _ = time.Parse(time.RFC3339Nano, "2023-05-04T16:00:50.165419+02:00") +var event = Event{ + ID: uuid.New().String(), + Namespace: "test_namespace", + Owners: []string{"owners", "owners"}, + RestrictedTo: []string{"restricted_to", "restricted_to"}, + Type: "type", + Name: "test", + Description: "description", + Startdate: date, + Enddate: date, + Starttime: "starttime", + Endtime: "endtime", + Allday: false, + MaxSubscribers: 23, + Subscriptions: []Subscription{ + { + ID: uuid.NewString(), + Subscriber: "subscriber1", + Tags: []string{"tag1", "tag2"}, + CreatedAt: date, + Data: map[string]any{ + "test": "test", + }, + }, + }, + DeletedSubscription: []Subscription{ + { + ID: uuid.NewString(), + Subscriber: "subscriber_deleted", + Tags: []string{"tag1", "tag2"}, + CreatedAt: date, + Data: map[string]any{ + "deleted": "deleted", + }, + }, + }, + Data: map[string]any{ + "data": "data", + }, + Deleted: false, +} +var event1 = Event{ + ID: uuid.New().String(), + Namespace: "test_namespace", + Owners: []string{"owners", "owners"}, + RestrictedTo: []string{"restricted_to", "restricted_to"}, + Type: "type", + Name: "test", + Description: "description", + Startdate: date, + Enddate: date, + Starttime: "starttime", + Endtime: "endtime", + Allday: false, + MaxSubscribers: 23, + Subscriptions: []Subscription{ + { + ID: uuid.NewString(), + Subscriber: "subscriber1", + Tags: []string{"tag1", "tag2"}, + CreatedAt: date, + Data: map[string]any{ + "test": "test", + }, + }, + }, + DeletedSubscription: []Subscription{ + { + ID: uuid.NewString(), + Subscriber: "subscriber_deleted", + Tags: []string{"tag1", "tag2"}, + CreatedAt: date, + Data: map[string]any{ + "deleted": "deleted", + }, + }, + }, + Data: map[string]any{ + "data": "data", + }, + Deleted: false, +} + +func init() { + cfg = viper.New() + cfg.SetDefault("storage.db.psql.host", "localhost") + cfg.SetDefault("storage.db.psql.port", "5432") + cfg.SetDefault("storage.db.psql.user", "postgres") + cfg.SetDefault("storage.db.psql.password", "postgres") + cfg.SetDefault("storage.db.psql.dbname", "coopgo_platform") + cfg.SetDefault("storage.db.psql.sslmode", "disable") + cfg.SetDefault("storage.db.psql.schema", "agenda") + cfg.SetDefault("storage.db.psql.tables.event", "event") + cfg.SetDefault("storage.db.psql.tables.subscription", "subscription") + cfg.SetConfigName("config") // Override default values in a config.yaml file within this directory + cfg.AddConfigPath(".") + cfg.ReadInConfig() +} + +func TestPostgresqlStorage_Initialize(t *testing.T) { + storage, err := NewPostgresqlStorage(cfg) + if err != nil { + t.Errorf("error creating new PostgreSQL storage: %v", err) + } + defer storage.DbConnection.Close() + + err = storage.Migrate() + if err != nil { + t.Errorf("database migration issue: %v", err) + return + } + + tx, err := storage.DbConnection.BeginTx(context.Background(), nil) + if err != nil { + t.Errorf("transaction issue: %v", err) + return + } + defer tx.Rollback() + _, err = tx.Exec(fmt.Sprintf("DELETE FROM %s;", storage.Tables["subscription"])) + if err != nil { + t.Errorf("delete accounts subscription issue: %v", err) + return + } + _, err = tx.Exec(fmt.Sprintf("DELETE FROM %s;", storage.Tables["event"])) + if err != nil { + t.Errorf("delete accounts event issue: %v", err) + return + } + if err = tx.Commit(); err != nil { + t.Errorf("commit transaction issue: %v", err) + return + } +} + +func TestPostgresqlStorage_CreateAndGetEvent(t *testing.T) { + storage, err := NewPostgresqlStorage(cfg) + if err != nil { + t.Errorf("error creating new PostgreSQL storage: %v", err) + } + err = storage.CreateEvent(event) + if err != nil { + t.Error(err) + } + retrieved_event, err := storage.GetEvent(event.ID) + if err != nil { + t.Error(err) + } + diff := cmp.Diff(&event, retrieved_event) + if diff != "" { + fmt.Printf("The retrieved event differs from the original event:\n%s", diff) + t.Fail() + } +} + +func TestPostgresqlStorage_GetEvents(t *testing.T) { + storage, err := NewPostgresqlStorage(cfg) + if err != nil { + t.Errorf("error creating new PostgreSQL storage: %v", err) + } + event.Namespace = "test" + err = storage.CreateEvent(event) + if err != nil { + t.Error(err) + } + event1.Namespace = "test1" + err = storage.CreateEvent(event1) + if err != nil { + t.Error(err) + } + events, err := storage.GetEvents([]string{"test", "test1"}) + if err != nil { + t.Error(err) + } + diff := cmp.Diff(events, []Event{event, event1}) + if diff != "" { + fmt.Printf("The retrieved event differs from the original event:\n%s", diff) + t.Fail() + } + +} + +func TestPostgresqlStorage_AddSubscription(t *testing.T) { + storage, err := NewPostgresqlStorage(cfg) + if err != nil { + t.Errorf("error creating new PostgreSQL storage: %v", err) + } + err = storage.CreateEvent(event) + if err != nil { + t.Error(err) + } + subscription := Subscription{ + ID: uuid.NewString(), + Subscriber: "salim", + Tags: []string{"tag"}, + CreatedAt: date, + Data: map[string]any{"data": "data"}, + } + err = storage.AddSubscription(event.ID, subscription) + stored_event, err := storage.GetEvent(event.ID) + if err != nil { + t.Error(err) + } + expected_event := Event{ + ID: event.ID, + Namespace: event.Namespace, + Owners: event.Owners, + RestrictedTo: event.RestrictedTo, + Type: event.Type, + Name: event.Name, + Description: event.Description, + Startdate: event.Startdate, + Enddate: event.Enddate, + Starttime: event.Starttime, + Endtime: event.Endtime, + Allday: event.Allday, + MaxSubscribers: event.MaxSubscribers, + Subscriptions: []Subscription{event.Subscriptions[0], subscription}, + DeletedSubscription: event.DeletedSubscription, + Data: event.Data, + Deleted: event.Deleted, + } + diff := cmp.Diff(stored_event, &expected_event) + if diff != "" { + fmt.Printf("The retrieved event differs from the original event:\n%s", diff) + t.Fail() + } + +} + +func TestPostgresqlStorage_UpdateEvent(t *testing.T) { + storage, err := NewPostgresqlStorage(cfg) + if err != nil { + t.Errorf("error creating new PostgreSQL storage: %v", err) + } + err = storage.CreateEvent(event) + if err != nil { + t.Error(err) + } + subscription := Subscription{ + ID: event.Subscriptions[0].ID, + Subscriber: "updated", + Tags: []string{"tag"}, + CreatedAt: date, + Data: map[string]any{"data": "data"}, + } + event.Subscriptions[0] = subscription + + err = storage.UpdateEvent(event) + if err != nil { + t.Error(err) + } + retrieved_event, err := storage.GetEvent(event.ID) + if err != nil { + t.Error(err) + } + diff := cmp.Diff(retrieved_event, &event) + if diff != "" { + fmt.Printf("The retrieved event differs from the original event:\n%s", diff) + t.Fail() + } + +} diff --git a/storage/storage.go b/storage/storage.go index d63710d..aa91585 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -26,6 +26,9 @@ func NewStorage(cfg *viper.Viper) (Storage, error) { case "mongodb": s, err := NewMongoDBStorage(cfg) return s, err + case "psql": + s, err := NewPostgresqlStorage(cfg) + return s, err default: return nil, fmt.Errorf("storage type %v is not supported", storage_type) }