adding psql

This commit is contained in:
sbouaram 2023-05-12 15:12:55 +02:00
parent 8a271fdcf8
commit 011466fb14
4 changed files with 669 additions and 0 deletions

1
.gitignore vendored
View File

@ -1,3 +1,4 @@
config.yaml
.vscode
.idea
__debug_bin

577
storage/postgresql.go Normal file
View File

@ -0,0 +1,577 @@
package storage
import (
"ariga.io/atlas/sql/postgres"
"ariga.io/atlas/sql/schema"
"context"
"database/sql"
"encoding/json"
"fmt"
"github.com/lib/pq"
_ "github.com/lib/pq"
"github.com/spf13/viper"
"os"
"strconv"
)
type PostgresqlStorage struct {
DbConnection *sql.DB
Schema string
Tables map[string]string
}
func NewPostgresqlStorage(cfg *viper.Viper) (PostgresqlStorage, error) {
var (
host = cfg.GetString("storage.db.psql.host")
port = cfg.GetString("storage.db.psql.port")
user = cfg.GetString("storage.db.psql.user")
password = cfg.GetString("storage.db.psql.password")
dbname = cfg.GetString("storage.db.psql.dbname")
sslmode = cfg.GetString("storage.db.psql.sslmode")
pg_schema = cfg.GetString("storage.db.psql.schema")
pgtables_vehicle = cfg.GetString("storage.db.psql.tables.vehicle")
pgtables_booking = cfg.GetString("storage.db.psql.tables.booking")
timezone = "Europe/Paris"
)
portInt, _ := strconv.Atoi(port)
psqlconn := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s TimeZone=%s", host, portInt,
user, password, dbname, sslmode, timezone)
db, err := sql.Open("postgres", psqlconn)
if err != nil {
fmt.Println("error", err)
return PostgresqlStorage{}, fmt.Errorf("connection to postgresql failed")
}
err = db.Ping()
if err != nil {
fmt.Println(err)
return PostgresqlStorage{}, fmt.Errorf("connection to postgresql database failed")
}
return PostgresqlStorage{
DbConnection: db,
Schema: pg_schema,
Tables: map[string]string{
"vehicle": fmt.Sprintf("%s.%s", pg_schema, pgtables_vehicle),
"booking": fmt.Sprintf("%s.%s", pg_schema, pgtables_booking),
},
}, nil
}
func (psql PostgresqlStorage) CreateVehicle(vehicle Vehicle) error {
tx, err := psql.DbConnection.Begin()
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
vehicleQuery := fmt.Sprintf(`
INSERT INTO %s (id, type, namespace, administrators, data, metadata)
VALUES ($1, $2, $3, $4, $5, $6)
`, psql.Tables["vehicle"])
data, err := json.Marshal(vehicle.Data)
if err != nil {
return err
}
metadata, err := json.Marshal(vehicle.Metadata)
if err != nil {
return err
}
_, err = tx.Exec(vehicleQuery, vehicle.ID, vehicle.Type, vehicle.Namespace, pq.Array(vehicle.Administrators),
data, metadata)
if err != nil {
tx.Rollback()
return fmt.Errorf("failed to insert vehicle: %w", err)
}
bookingQuery := fmt.Sprintf(`
INSERT INTO %s (id, vehicleid, driver, startdate, enddate, unavailablefrom, unavailableto, data, deleted)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
`, psql.Tables["booking"])
for _, booking := range vehicle.Bookings {
dataBooking, err := json.Marshal(booking.Data)
if err != nil {
return err
}
_, err = tx.Exec(bookingQuery, booking.ID, booking.Vehicleid, booking.Driver,
booking.Startdate, booking.Enddate, booking.Unavailablefrom, booking.Unavailableto,
dataBooking, booking.Deleted)
if err != nil {
tx.Rollback()
return fmt.Errorf("failed to insert booking: %w", err)
}
}
err = tx.Commit()
if err != nil {
tx.Rollback()
return fmt.Errorf("failed to commit transaction: %w", err)
}
return nil
}
func (psql PostgresqlStorage) GetVehicle(id string) (*Vehicle, error) {
vehicle := &Vehicle{}
var vehicleData []byte
var vehicleMetadata []byte
query := fmt.Sprintf(`
SELECT id, type, namespace, administrators, data, metadata
FROM %s
WHERE id = $1
`, psql.Tables["vehicle"])
err := psql.DbConnection.QueryRow(query, id).Scan(
&vehicle.ID,
&vehicle.Type,
&vehicle.Namespace,
pq.Array(&vehicle.Administrators),
&vehicleData,
&vehicleMetadata,
)
err = json.Unmarshal(vehicleData, &vehicle.Data)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal vehicle data")
}
err = json.Unmarshal(vehicleMetadata, &vehicle.Metadata)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal vehicle metadata")
}
if err != nil {
if err == sql.ErrNoRows {
return nil, fmt.Errorf("vehicle not found")
}
return nil, fmt.Errorf("failed to query database: %w", err)
}
return vehicle, nil
}
func (psql PostgresqlStorage) GetVehicles(namespaces []string) ([]Vehicle, error) {
var vehicles []Vehicle
if len(namespaces) == 0 {
return vehicles, nil
}
query := fmt.Sprintf(`
SELECT id, type, namespace, administrators, data, metadata
FROM %s
WHERE namespace = ANY($1)
`, psql.Tables["vehicle"])
rows, err := psql.DbConnection.Query(query, pq.Array(namespaces))
if err != nil {
return nil, fmt.Errorf("failed to query database: %w", err)
}
defer rows.Close()
for rows.Next() {
var vehicle Vehicle
var administrators []string
var vehicleData []byte
var vehicleMetadata []byte
scanArgs := []interface{}{
&vehicle.ID,
&vehicle.Type,
&vehicle.Namespace,
pq.Array(&administrators),
&vehicleData,
&vehicleMetadata,
}
err := rows.Scan(scanArgs...)
if err != nil {
return nil, fmt.Errorf("failed to scan row: %w", err)
}
vehicle.Administrators = administrators
err = json.Unmarshal(vehicleData, &vehicle.Data)
if err != nil {
return nil, err
}
err = json.Unmarshal(vehicleMetadata, &vehicle.Metadata)
if err != nil {
return nil, err
}
vehicles = append(vehicles, vehicle)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("unexpected error after iterating rows: %w", err)
}
return vehicles, nil
}
func (psql PostgresqlStorage) CreateBooking(booking Booking) error {
query := fmt.Sprintf(`
INSERT INTO %s (id, vehicleid, driver, startdate, enddate, unavailablefrom, unavailableto, data, deleted)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
`, psql.Tables["booking"])
bookingData, err := json.Marshal(booking.Data)
if err != nil {
return fmt.Errorf("failed to marshal booking data: %w", err)
}
_, err = psql.DbConnection.Exec(query, booking.ID, booking.Vehicleid, booking.Driver, booking.Startdate,
booking.Enddate, booking.Unavailablefrom, booking.Unavailableto, bookingData, booking.Deleted)
if err != nil {
return fmt.Errorf("failed to insert booking record: %w", err)
}
return nil
}
func (psql PostgresqlStorage) UpdateBooking(bookingToUpdate Booking) error {
query := fmt.Sprintf(`
UPDATE %s SET
driver = $1,
startdate = $2,
enddate = $3,
unavailablefrom = $4,
unavailableto = $5,
data = $6,
deleted = $7
WHERE id = $8
`, psql.Tables["booking"])
bookingData, err := json.Marshal(bookingToUpdate.Data)
if err != nil {
return fmt.Errorf("failed to marshal booking data: %w", err)
}
_, err = psql.DbConnection.Exec(
query,
bookingToUpdate.Driver,
bookingToUpdate.Startdate,
bookingToUpdate.Enddate,
bookingToUpdate.Unavailablefrom,
bookingToUpdate.Unavailableto,
bookingData,
bookingToUpdate.Deleted,
bookingToUpdate.ID,
)
if err != nil {
return fmt.Errorf("failed to update booking: %w", err)
}
return nil
}
func (psql PostgresqlStorage) GetBooking(id string) (*Booking, error) {
booking := &Booking{}
query := fmt.Sprintf(`
SELECT b.id, b.vehicleid, b.driver, b.startdate, b.enddate,
b.unavailablefrom, b.unavailableto, b.data, b.deleted,
v.id, v.type, v.namespace, v.administrators, v.data, v.metadata
FROM %s b
LEFT JOIN %s v ON b.vehicleid = v.id
WHERE b.id = $1
`, psql.Tables["booking"], psql.Tables["vehicle"])
row := psql.DbConnection.QueryRow(query, id)
var administrators []string
var bookingData []byte
var vehicleData []byte
var vehicleMetadata []byte
err := row.Scan(
&booking.ID,
&booking.Vehicleid,
&booking.Driver,
&booking.Startdate,
&booking.Enddate,
&booking.Unavailablefrom,
&booking.Unavailableto,
&bookingData,
&booking.Deleted,
&booking.Vehicle.ID,
&booking.Vehicle.Type,
&booking.Vehicle.Namespace,
pq.Array(&administrators),
&vehicleData,
&vehicleMetadata,
)
if err != nil {
return nil, fmt.Errorf("error receiving booking: %w", err)
}
err = json.Unmarshal(bookingData, &booking.Data)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal booking data: %w", err)
}
err = json.Unmarshal(vehicleData, &booking.Vehicle.Data)
if err != nil {
return nil, err
}
err = json.Unmarshal(vehicleMetadata, &booking.Vehicle.Metadata)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal vehicle metadata: %w", err)
}
booking.Vehicle.Administrators = administrators
return booking, nil
}
func (psql PostgresqlStorage) GetBookings() ([]Booking, error) {
var bookings []Booking
query := fmt.Sprintf(`
SELECT b.id, b.vehicleid, b.driver, b.startdate, b.enddate,
b.unavailablefrom, b.unavailableto, b.data, b.deleted,
v.id, v.type, v.namespace, v.administrators, v.data, v.metadata
FROM %s b
LEFT JOIN %s v ON b.vehicleid = v.id
`, psql.Tables["booking"], psql.Tables["vehicle"])
rows, err := psql.DbConnection.Query(query)
if err != nil {
return nil, fmt.Errorf("failed to retrieve bookings: %w", err)
}
defer rows.Close()
for rows.Next() {
var booking Booking
var administrators []string
var bookingData []byte
var vehicleData []byte
var vehicleMetadata []byte
err = rows.Scan(
&booking.ID,
&booking.Vehicleid,
&booking.Driver,
&booking.Startdate,
&booking.Enddate,
&booking.Unavailablefrom,
&booking.Unavailableto,
&bookingData,
&booking.Deleted,
&booking.Vehicle.ID,
&booking.Vehicle.Type,
&booking.Vehicle.Namespace,
pq.Array(&administrators),
&vehicleData,
&vehicleMetadata,
)
if err != nil {
return nil, fmt.Errorf("error receiving booking: %w", err)
}
err = json.Unmarshal(bookingData, &booking.Data)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal booking data: %w", err)
}
err = json.Unmarshal(vehicleData, &booking.Vehicle.Data)
if err != nil {
return nil, err
}
err = json.Unmarshal(vehicleMetadata, &booking.Vehicle.Metadata)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal vehicle metadata: %w", err)
}
booking.Vehicle.Administrators = administrators
bookings = append(bookings, booking)
}
if err = rows.Err(); err != nil {
return nil, fmt.Errorf("failed to retrieve bookings: %w", err)
}
return bookings, nil
}
func (psql PostgresqlStorage) GetBookingsForVehicle(vehicleid string) ([]Booking, error) {
var bookings []Booking
query := fmt.Sprintf(`
SELECT b.id, b.vehicleid, b.driver, b.startdate, b.enddate,
b.unavailablefrom, b.unavailableto, b.data, b.deleted,
v.id, v.type, v.namespace, v.administrators, v.data, v.metadata
FROM %s b
LEFT JOIN %s v ON b.vehicleid = v.id
WHERE b.vehicleid = $1
`, psql.Tables["booking"], psql.Tables["vehicle"])
rows, err := psql.DbConnection.Query(query, vehicleid)
if err != nil {
return nil, fmt.Errorf("error retrieving bookings for vehicle: %w", err)
}
for rows.Next() {
var administrators []string
var bookingData []byte
var vehicleData []byte
var vehicleMetadata []byte
var booking Booking
err := rows.Scan(
&booking.ID,
&booking.Vehicleid,
&booking.Driver,
&booking.Startdate,
&booking.Enddate,
&booking.Unavailablefrom,
&booking.Unavailableto,
&bookingData,
&booking.Deleted,
&booking.Vehicle.ID,
&booking.Vehicle.Type,
&booking.Vehicle.Namespace,
pq.Array(&administrators),
&vehicleData,
&vehicleMetadata,
)
if err != nil {
return nil, fmt.Errorf("error scanning booking row: %w", err)
}
err = json.Unmarshal(bookingData, &booking.Data)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal booking data: %w", err)
}
err = json.Unmarshal(vehicleData, &booking.Vehicle.Data)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal vehicle data: %w", err)
}
err = json.Unmarshal(vehicleMetadata, &booking.Vehicle.Metadata)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal vehicle metadata: %w", err)
}
booking.Vehicle.Administrators = administrators
bookings = append(bookings, booking)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("error iterating over booking rows: %w", err)
}
return bookings, nil
}
func (psql PostgresqlStorage) GetBookingsForDriver(driver string) ([]Booking, error) {
bookings := []Booking{}
query := fmt.Sprintf(`
SELECT b.id, b.vehicleid, b.driver, b.startdate, b.enddate,
b.unavailablefrom, b.unavailableto, b.data, b.deleted,
v.id, v.type, v.namespace, v.administrators, v.data, v.metadata
FROM %s b
LEFT JOIN %s v ON b.vehicleid = v.id
WHERE b.driver = $1
`, psql.Tables["booking"], psql.Tables["vehicle"])
rows, err := psql.DbConnection.Query(query, driver)
if err != nil {
return nil, fmt.Errorf("error querying bookings: %w", err)
}
defer rows.Close()
for rows.Next() {
booking := Booking{}
var administrators []string
var bookingData []byte
var vehicleData []byte
var vehicleMetadata []byte
err = rows.Scan(
&booking.ID,
&booking.Vehicleid,
&booking.Driver,
&booking.Startdate,
&booking.Enddate,
&booking.Unavailablefrom,
&booking.Unavailableto,
&bookingData,
&booking.Deleted,
&booking.Vehicle.ID,
&booking.Vehicle.Type,
&booking.Vehicle.Namespace,
pq.Array(&administrators),
&vehicleData,
&vehicleMetadata,
)
if err != nil {
return nil, fmt.Errorf("error scanning bookings: %w", err)
}
err = json.Unmarshal(bookingData, &booking.Data)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal booking data: %w", err)
}
err = json.Unmarshal(vehicleData, &booking.Vehicle.Data)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal vehicle data: %w", err)
}
err = json.Unmarshal(vehicleMetadata, &booking.Vehicle.Metadata)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal vehicle metadata: %w", err)
}
booking.Vehicle.Administrators = administrators
bookings = append(bookings, booking)
}
return bookings, nil
}
func (psql PostgresqlStorage) DeleteBooking(id string) error {
query := fmt.Sprintf(`
DELETE FROM %s
WHERE id = $1
`, psql.Tables["booking"])
_, err := psql.DbConnection.Exec(query, id)
if err != nil {
return fmt.Errorf("error deleting booking: %v", err)
}
return nil
}
func (psql PostgresqlStorage) Migrate() error {
ctx := context.Background()
driver, err := postgres.Open(psql.DbConnection)
if err != nil {
return err
}
existing, err := driver.InspectRealm(ctx, &schema.InspectRealmOption{Schemas: []string{psql.Schema}})
if err != nil {
return err
}
var desired schema.Realm
hcl, err := os.ReadFile("postgresql/schema.hcl")
if err != nil {
return err
}
err = postgres.EvalHCLBytes(hcl, &desired, nil)
if err != nil {
return err
}
diff, err := driver.RealmDiff(existing, &desired)
if err != nil {
return err
}
err = driver.ApplyChanges(ctx, diff)
if err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,88 @@
table "booking" {
schema = schema.fleets
column "id" {
null = false
type = uuid
}
column "vehicleid" {
null = true
type = uuid
}
column "driver" {
null = true
type = text
}
column "startdate" {
null = true
type = timestamptz
}
column "enddate" {
null = true
type = timestamptz
}
column "unavailablefrom" {
null = true
type = timestamptz
}
column "unavailableto" {
null = true
type = timestamptz
}
column "data" {
null = true
type = jsonb
}
column "deleted" {
null = true
type = boolean
}
primary_key {
columns = [column.id]
}
foreign_key "booking_vehicleid_fkey" {
columns = [column.vehicleid]
ref_columns = [table.vehicle.column.id]
on_update = NO_ACTION
on_delete = NO_ACTION
}
foreign_key "vehicle_booking_fk" {
columns = [column.vehicleid]
ref_columns = [table.vehicle.column.id]
on_update = NO_ACTION
on_delete = NO_ACTION
}
}
table "vehicle" {
schema = schema.fleets
column "id" {
null = false
type = uuid
}
column "type" {
null = true
type = text
}
column "namespace" {
null = true
type = text
}
column "administrators" {
null = true
type = sql("text[]")
}
column "data" {
null = true
type = jsonb
}
column "metadata" {
null = true
type = jsonb
}
primary_key {
columns = [column.id]
}
}
schema "fleets" {
}

View File

@ -31,6 +31,9 @@ func NewStorage(cfg *viper.Viper) (Storage, error) {
case "mongodb":
s, err := NewMongoDBStorage(cfg)
return s, err
case "psql":
s, err := NewPostgresqlStorage(cfg)
return s, err
default:
return nil, fmt.Errorf("storage type %v is not supported", storage_type)
}