Add PostgreSQL
This commit is contained in:
@@ -86,3 +86,7 @@ func (s MongoDBStorage) SubscribeIncentive(incentive_subscription IncentiveSubsc
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (psql MongoDBStorage) Migrate() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
158
storage/postgresql.go
Normal file
158
storage/postgresql.go
Normal file
@@ -0,0 +1,158 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
|
||||
"ariga.io/atlas/sql/postgres"
|
||||
"ariga.io/atlas/sql/schema"
|
||||
"github.com/lib/pq"
|
||||
_ "github.com/lib/pq"
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
|
||||
type PostgresqlStorage struct {
|
||||
DbConnection *sql.DB
|
||||
Schema string
|
||||
Tables map[string]string
|
||||
}
|
||||
|
||||
func NewPostgresqlStorage(cfg *viper.Viper) (PostgresqlStorage, error) {
|
||||
var (
|
||||
host = cfg.GetString("storage.db.psql.host")
|
||||
port = cfg.GetString("storage.db.psql.port")
|
||||
user = cfg.GetString("storage.db.psql.user")
|
||||
password = cfg.GetString("storage.db.psql.password")
|
||||
dbname = cfg.GetString("storage.db.psql.dbname")
|
||||
sslmode = cfg.GetString("storage.db.psql.sslmode")
|
||||
pg_schema = cfg.GetString("storage.db.psql.schema")
|
||||
pgtables_incentive_subscriptions = cfg.GetString("storage.db.psql.tables.incentive_subscriptions")
|
||||
)
|
||||
portInt, _ := strconv.Atoi(port)
|
||||
psqlconn := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s", host, portInt, user, password, dbname, sslmode)
|
||||
db, err := sql.Open("postgres", psqlconn)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("opening connection to postgresql failed")
|
||||
return PostgresqlStorage{}, fmt.Errorf("connection to postgresql failed")
|
||||
}
|
||||
err = db.Ping()
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("ping to postgresql failed")
|
||||
return PostgresqlStorage{}, fmt.Errorf("connection to postgresql database failed")
|
||||
}
|
||||
return PostgresqlStorage{
|
||||
DbConnection: db,
|
||||
Schema: pg_schema,
|
||||
Tables: map[string]string{
|
||||
"incentive_subscriptions": fmt.Sprintf("%s.%s", pg_schema, pgtables_incentive_subscriptions),
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s PostgresqlStorage) GetUserSubscriptions(userid string) ([]IncentiveSubscription, error) {
|
||||
|
||||
req := fmt.Sprintf(`SELECT id, incentive_id, user_id, identity_verification_ids, data, declined, subscription_datetime
|
||||
FROM %s
|
||||
WHERE user_id=$1`, s.Tables["incentive_subscriptions"])
|
||||
rows, err := s.DbConnection.Query(req, userid)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
subscriptions := []IncentiveSubscription{}
|
||||
for rows.Next() {
|
||||
is := IncentiveSubscription{}
|
||||
var data []byte
|
||||
if err := rows.Scan(
|
||||
&is.ID,
|
||||
&is.IncentiveID,
|
||||
&is.UserID,
|
||||
pq.Array(&is.IdentityVerificationIDs),
|
||||
&data,
|
||||
&is.Declined,
|
||||
&is.SubscriptionDatetime,
|
||||
); err != nil {
|
||||
log.Error().Err(err).Msg("scan row issue")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = json.Unmarshal(data, &is.Data)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Str("subscription id", is.ID).Msg("data unmarshal issue")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
subscriptions = append(subscriptions, is)
|
||||
}
|
||||
|
||||
return subscriptions, nil
|
||||
}
|
||||
|
||||
func (s PostgresqlStorage) SubscribeIncentive(is IncentiveSubscription) error {
|
||||
|
||||
datajson, err := json.Marshal(is.Data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
req := fmt.Sprintf(`INSERT INTO %s (id, incentive_id, user_id, identity_verification_ids, data, declined, subscription_datetime)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7)`, s.Tables["incentive_subscriptions"])
|
||||
if _, err := s.DbConnection.Exec(
|
||||
req,
|
||||
is.ID,
|
||||
is.IncentiveID,
|
||||
is.UserID,
|
||||
pq.Array(is.IdentityVerificationIDs),
|
||||
datajson,
|
||||
is.Declined,
|
||||
is.SubscriptionDatetime,
|
||||
); err != nil {
|
||||
log.Error().Err(err).Str("incentive", is.IncentiveID).Str("user", is.UserID).Msg("issue subscribing to incentive")
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s PostgresqlStorage) Migrate() error {
|
||||
ctx := context.Background()
|
||||
driver, err := postgres.Open(s.DbConnection)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
existing, err := driver.InspectRealm(ctx, &schema.InspectRealmOption{Schemas: []string{s.Schema}})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var desired schema.Realm
|
||||
|
||||
hcl, err := os.ReadFile("postgresql/schema.hcl")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = postgres.EvalHCLBytes(hcl, &desired, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
diff, err := driver.RealmDiff(existing, &desired)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = driver.ApplyChanges(ctx, diff)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
36
storage/postgresql/schema.hcl
Normal file
36
storage/postgresql/schema.hcl
Normal file
@@ -0,0 +1,36 @@
|
||||
table "incentive_subscriptions" {
|
||||
schema = schema.carpool_incentives
|
||||
column "id" {
|
||||
null = false
|
||||
type = uuid
|
||||
}
|
||||
column "incentive_id" {
|
||||
null = false
|
||||
type = text
|
||||
}
|
||||
column "user_id" {
|
||||
null = false
|
||||
type = text
|
||||
}
|
||||
column "identity_verification_ids" {
|
||||
null = true
|
||||
type = sql("text[]")
|
||||
}
|
||||
column "data" {
|
||||
null = false
|
||||
type = jsonb
|
||||
}
|
||||
column "declined" {
|
||||
null = true
|
||||
type = boolean
|
||||
}
|
||||
column "subscription_datetime" {
|
||||
null = true
|
||||
type = timestamptz
|
||||
}
|
||||
primary_key {
|
||||
columns = [column.id]
|
||||
}
|
||||
}
|
||||
schema "carpool_incentives" {
|
||||
}
|
||||
81
storage/postgresql_test.go
Normal file
81
storage/postgresql_test.go
Normal file
@@ -0,0 +1,81 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
_ "github.com/lib/pq"
|
||||
"github.com/spf13/viper"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
var cfg *viper.Viper
|
||||
|
||||
func init() {
|
||||
cfg = viper.New()
|
||||
cfg.SetDefault("storage.db.psql.host", "localhost")
|
||||
cfg.SetDefault("storage.db.psql.port", "5432")
|
||||
cfg.SetDefault("storage.db.psql.user", "postgres")
|
||||
cfg.SetDefault("storage.db.psql.password", "postgres")
|
||||
cfg.SetDefault("storage.db.psql.dbname", "mobilityaccounts_tests")
|
||||
cfg.SetDefault("storage.db.psql.sslmode", "disable")
|
||||
cfg.SetDefault("storage.db.psql.schema", "carpool_incentives")
|
||||
cfg.SetDefault("storage.db.psql.tables.incentive_subscriptions", "incentive_subscriptions")
|
||||
cfg.SetConfigName("config") // Override default values in a config.yaml file within this directory
|
||||
cfg.AddConfigPath(".")
|
||||
cfg.ReadInConfig()
|
||||
}
|
||||
|
||||
func TestPostgresqlStorage_Initialize(t *testing.T) {
|
||||
storage, err := NewPostgresqlStorage(cfg)
|
||||
require.NoError(t, err)
|
||||
defer storage.DbConnection.Close()
|
||||
|
||||
err = storage.Migrate()
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = storage.DbConnection.Exec(fmt.Sprintf("DELETE FROM %s;", storage.Tables["incentive_subscriptions"]))
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestPostgresqlStorage_SubscribeIncentive(t *testing.T) {
|
||||
|
||||
storage, err := NewPostgresqlStorage(cfg)
|
||||
require.NoError(t, err)
|
||||
defer storage.DbConnection.Close()
|
||||
|
||||
err = storage.SubscribeIncentive(is1)
|
||||
require.NoError(t, err)
|
||||
err = storage.SubscribeIncentive(is2)
|
||||
require.NoError(t, err)
|
||||
err = storage.SubscribeIncentive(is3)
|
||||
require.NoError(t, err)
|
||||
|
||||
res, err := storage.GetUserSubscriptions(testid)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, res, 2)
|
||||
for _, is := range res {
|
||||
require.Contains(t, []string{is1.ID, is3.ID}, is.ID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPostgresqlStorage_GetSubscriptions(t *testing.T) {
|
||||
|
||||
storage, err := NewPostgresqlStorage(cfg)
|
||||
require.NoError(t, err)
|
||||
defer storage.DbConnection.Close()
|
||||
|
||||
res, err := storage.GetUserSubscriptions(is2.UserID)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, res, 1)
|
||||
fmt.Println(res[0])
|
||||
fmt.Println(is2)
|
||||
require.Equal(t, is2.ID, res[0].ID)
|
||||
require.Equal(t, is2.IncentiveID, res[0].IncentiveID)
|
||||
require.Equal(t, is2.UserID, res[0].UserID)
|
||||
require.Equal(t, is2.Data, res[0].Data)
|
||||
require.Equal(t, is2.Declined, res[0].Declined)
|
||||
require.Equal(t, is2.IdentityVerificationIDs, res[0].IdentityVerificationIDs)
|
||||
require.WithinDuration(t, is2.SubscriptionDatetime, res[0].SubscriptionDatetime, time.Millisecond)
|
||||
}
|
||||
@@ -9,6 +9,8 @@ import (
|
||||
type Storage interface {
|
||||
GetUserSubscriptions(userid string) ([]IncentiveSubscription, error)
|
||||
SubscribeIncentive(incentive_subscription IncentiveSubscription) error
|
||||
|
||||
Migrate() error
|
||||
}
|
||||
|
||||
func NewStorage(cfg *viper.Viper) (Storage, error) {
|
||||
|
||||
47
storage/storage_test.go
Normal file
47
storage/storage_test.go
Normal file
@@ -0,0 +1,47 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
var testid = uuid.NewString()
|
||||
|
||||
var is1 = IncentiveSubscription{
|
||||
ID: uuid.NewString(),
|
||||
IncentiveID: "bonuscovoituragecourtedistance",
|
||||
UserID: testid,
|
||||
IdentityVerificationIDs: []string{uuid.NewString(), uuid.NewString()},
|
||||
Declined: false,
|
||||
Data: map[string]any{
|
||||
"key": "value",
|
||||
"secondkey": "secondvalue",
|
||||
},
|
||||
SubscriptionDatetime: time.Now(),
|
||||
}
|
||||
|
||||
var is2 = IncentiveSubscription{
|
||||
ID: uuid.NewString(),
|
||||
IncentiveID: "bonuscovoituragecourtedistance",
|
||||
UserID: uuid.NewString(),
|
||||
IdentityVerificationIDs: []string{uuid.NewString()},
|
||||
Declined: true,
|
||||
Data: map[string]any{
|
||||
"key": "value",
|
||||
"secondkey": "secondvalue",
|
||||
},
|
||||
SubscriptionDatetime: time.Now(),
|
||||
}
|
||||
|
||||
var is3 = IncentiveSubscription{
|
||||
ID: uuid.NewString(),
|
||||
IncentiveID: "fmdchunimes",
|
||||
UserID: testid,
|
||||
Declined: false,
|
||||
Data: map[string]any{
|
||||
"key": "value",
|
||||
"secondkey": "secondvalue",
|
||||
},
|
||||
SubscriptionDatetime: time.Now(),
|
||||
}
|
||||
Reference in New Issue
Block a user