package storage import ( "context" "database/sql" "encoding/json" "fmt" "os" "strconv" "ariga.io/atlas/sql/postgres" "ariga.io/atlas/sql/schema" "github.com/lib/pq" _ "github.com/lib/pq" "github.com/rs/zerolog/log" "github.com/spf13/viper" ) type PostgresqlStorage struct { DbConnection *sql.DB Schema string Tables map[string]string } func NewPostgresqlStorage(cfg *viper.Viper) (PostgresqlStorage, error) { var ( host = cfg.GetString("storage.db.psql.host") port = cfg.GetString("storage.db.psql.port") user = cfg.GetString("storage.db.psql.user") password = cfg.GetString("storage.db.psql.password") dbname = cfg.GetString("storage.db.psql.dbname") sslmode = cfg.GetString("storage.db.psql.sslmode") pg_schema = cfg.GetString("storage.db.psql.schema") pgtables_incentive_subscriptions = cfg.GetString("storage.db.psql.tables.incentive_subscriptions") ) portInt, _ := strconv.Atoi(port) psqlconn := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s", host, portInt, user, password, dbname, sslmode) db, err := sql.Open("postgres", psqlconn) if err != nil { log.Error().Err(err).Msg("opening connection to postgresql failed") return PostgresqlStorage{}, fmt.Errorf("connection to postgresql failed") } err = db.Ping() if err != nil { log.Error().Err(err).Msg("ping to postgresql failed") return PostgresqlStorage{}, fmt.Errorf("connection to postgresql database failed") } return PostgresqlStorage{ DbConnection: db, Schema: pg_schema, Tables: map[string]string{ "incentive_subscriptions": fmt.Sprintf("%s.%s", pg_schema, pgtables_incentive_subscriptions), }, }, nil } func (s PostgresqlStorage) GetUserSubscriptions(userid string) ([]IncentiveSubscription, error) { req := fmt.Sprintf(`SELECT id, incentive_id, user_id, identity_verification_ids, data, declined, subscription_datetime FROM %s WHERE user_id = $1`, s.Tables["incentive_subscriptions"]) rows, err := s.DbConnection.Query(req, userid) if err != nil { log.Error().Str("request", req).Err(err).Msg("GetUserSubscriptions query error") return nil, err } defer rows.Close() subscriptions := []IncentiveSubscription{} for rows.Next() { is := IncentiveSubscription{} var data []byte if err := rows.Scan( &is.ID, &is.IncentiveID, &is.UserID, pq.Array(&is.IdentityVerificationIDs), &data, &is.Declined, &is.SubscriptionDatetime, ); err != nil { log.Error().Err(err).Msg("scan row issue") return nil, err } err = json.Unmarshal(data, &is.Data) if err != nil { log.Error().Err(err).Str("subscription id", is.ID).Msg("data unmarshal issue") return nil, err } subscriptions = append(subscriptions, is) } return subscriptions, nil } func (s PostgresqlStorage) SubscribeIncentive(is IncentiveSubscription) error { datajson, err := json.Marshal(is.Data) if err != nil { return err } req := fmt.Sprintf(`INSERT INTO %s (id, incentive_id, user_id, identity_verification_ids, data, declined, subscription_datetime) VALUES ($1, $2, $3, $4, $5, $6, $7)`, s.Tables["incentive_subscriptions"]) if _, err := s.DbConnection.Exec( req, is.ID, is.IncentiveID, is.UserID, pq.Array(is.IdentityVerificationIDs), datajson, is.Declined, is.SubscriptionDatetime, ); err != nil { log.Error().Err(err).Str("incentive", is.IncentiveID).Str("user", is.UserID).Msg("issue subscribing to incentive") return err } return nil } func (s PostgresqlStorage) Migrate() error { ctx := context.Background() driver, err := postgres.Open(s.DbConnection) if err != nil { return err } existing, err := driver.InspectRealm(ctx, &schema.InspectRealmOption{Schemas: []string{s.Schema}}) if err != nil { return err } var desired schema.Realm hcl, err := os.ReadFile("postgresql/schema.hcl") if err != nil { return err } err = postgres.EvalHCLBytes(hcl, &desired, nil) if err != nil { return err } diff, err := driver.RealmDiff(existing, &desired) if err != nil { return err } err = driver.ApplyChanges(ctx, diff) if err != nil { return err } return nil }