495 lines
14 KiB
Go
495 lines
14 KiB
Go
package storage
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"strconv"
|
|
|
|
"ariga.io/atlas/sql/postgres"
|
|
"ariga.io/atlas/sql/schema"
|
|
"git.coopgo.io/coopgo-platform/carpool-service/internal"
|
|
"git.coopgo.io/coopgo-platform/carpool-service/interoperability/ocss"
|
|
"github.com/lib/pq"
|
|
_ "github.com/lib/pq"
|
|
"github.com/paulmach/orb/geojson"
|
|
"github.com/rs/zerolog/log"
|
|
"github.com/spf13/viper"
|
|
"github.com/stretchr/objx"
|
|
)
|
|
|
|
type PostgresqlStorage struct {
|
|
DbConnection *sql.DB
|
|
Schema string
|
|
Tables map[string]string
|
|
}
|
|
|
|
func NewPostgresqlStorage(cfg *viper.Viper) (PostgresqlStorage, error) {
|
|
var (
|
|
host = cfg.GetString("storage.db.psql.host")
|
|
port = cfg.GetString("storage.db.psql.port")
|
|
user = cfg.GetString("storage.db.psql.user")
|
|
password = cfg.GetString("storage.db.psql.password")
|
|
dbname = cfg.GetString("storage.db.psql.dbname")
|
|
sslmode = cfg.GetString("storage.db.psql.sslmode")
|
|
pg_schema = cfg.GetString("storage.db.psql.schema")
|
|
pgtables_regular_routes = cfg.GetString("storage.db.psql.tables.regular_routes")
|
|
pgtables_regular_route_schedules = cfg.GetString("storage.db.psql.tables.regular_route_schedules")
|
|
pgtables_punctual_routes = cfg.GetString("storage.db.psql.tables.punctual_routes")
|
|
pgtables_bookings = cfg.GetString("storage.db.psql.tables.bookings")
|
|
pgtables_journeys_cache = cfg.GetString("storage.db.psql.tables.journeys_cache")
|
|
)
|
|
portInt, _ := strconv.Atoi(port)
|
|
psqlconn := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s", host, portInt, user, password, dbname, sslmode)
|
|
db, err := sql.Open("postgres", psqlconn)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("opening connection to postgresql failed")
|
|
return PostgresqlStorage{}, fmt.Errorf("connection to postgresql failed")
|
|
}
|
|
err = db.Ping()
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("ping to postgresql failed")
|
|
return PostgresqlStorage{}, fmt.Errorf("connection to postgresql database failed")
|
|
}
|
|
return PostgresqlStorage{
|
|
DbConnection: db,
|
|
Schema: pg_schema,
|
|
Tables: map[string]string{
|
|
"regular_routes": fmt.Sprintf("%s.%s", pg_schema, pgtables_regular_routes),
|
|
"regular_route_schedules": fmt.Sprintf("%s.%s", pg_schema, pgtables_regular_route_schedules),
|
|
"punctual_routes": fmt.Sprintf("%s.%s", pg_schema, pgtables_punctual_routes),
|
|
"bookings": fmt.Sprintf("%s.%s", pg_schema, pgtables_bookings),
|
|
"journeys_cache": fmt.Sprintf("%s.%s", pg_schema, pgtables_journeys_cache),
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
func (s PostgresqlStorage) CreateRegularRoutes(routes []*geojson.FeatureCollection) error {
|
|
|
|
log.Debug().Msg("Postgresql Storage - CreateRegularRoutes")
|
|
|
|
tx, err := s.DbConnection.Begin()
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("error initializing transaction")
|
|
return err
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
req_routes := fmt.Sprintf(`INSERT INTO %s (id, user_id, routes_group_id, grid_ids, is_driver, is_passenger, route)
|
|
VALUES ($1, $2, $3, $4,$5, $6, $7)`, s.Tables["regular_routes"])
|
|
stmt_routes, err := tx.Prepare(req_routes)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("error preparing regular routes statement for multiple inserts")
|
|
return err
|
|
}
|
|
defer stmt_routes.Close()
|
|
|
|
req_schedules := fmt.Sprintf(`INSERT INTO %s (route_id, day, time_of_day) VALUES ($1, $2, $3)`, s.Tables["regular_route_schedules"])
|
|
stmt_schedules, err := tx.Prepare(req_schedules)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("error preparing schedules statement for multiple inserts")
|
|
return err
|
|
}
|
|
defer stmt_schedules.Close()
|
|
|
|
for _, fc := range routes {
|
|
if fc != nil {
|
|
id := fc.ExtraMembers.MustString("id")
|
|
properties := objx.New(fc.ExtraMembers["properties"])
|
|
userid := properties.Get("user.id").Str()
|
|
gridids := fc.ExtraMembers["grid_ids"]
|
|
groupid := fc.ExtraMembers["routes_group_id"]
|
|
isdriver := properties.Get("is_driver").Bool()
|
|
ispassenger := properties.Get("is_passenger").Bool()
|
|
route, err := fc.MarshalJSON()
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("error creating regular route")
|
|
return err
|
|
}
|
|
_, err = stmt_routes.Exec(id, userid, groupid, pq.Array(gridids), isdriver, ispassenger, route)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("error inserting regular route")
|
|
return err
|
|
}
|
|
|
|
schedules := properties.Get("schedules")
|
|
if schedules == nil {
|
|
err = errors.New("could not get schedules for route")
|
|
log.Error().Err(err).Str("route id", id).Msg("issue in CreateRegularRoute")
|
|
return err
|
|
}
|
|
for _, schedule := range schedules.MustObjxMapSlice() {
|
|
day := schedule.Get("day").Str()
|
|
timeOfDay := schedule.Get("time_of_day").Str()
|
|
_, err = stmt_schedules.Exec(id, day, timeOfDay)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("issue creating route schedule")
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if err = tx.Commit(); err != nil {
|
|
log.Error().Err(err).Msg("issue committing transaction")
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s PostgresqlStorage) GetUserRegularRoutes(userid string) ([]*geojson.FeatureCollection, error) {
|
|
req := fmt.Sprintf(`select id, route
|
|
from %s where user_id = $1`, s.Tables["regular_routes"])
|
|
rows, err := s.DbConnection.Query(req, userid)
|
|
if err != nil {
|
|
log.Error().Err(err).Str("request", req).Str("user id", userid).Msg("GetUserRegularRoutes query issue")
|
|
return nil, err
|
|
}
|
|
|
|
results := []*geojson.FeatureCollection{}
|
|
|
|
for rows.Next() {
|
|
|
|
var id string
|
|
var route []byte
|
|
err := rows.Scan(
|
|
&id,
|
|
&route,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
fc, err := geojson.UnmarshalFeatureCollection(route)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
results = append(results, fc)
|
|
}
|
|
|
|
return results, nil
|
|
}
|
|
|
|
func (s PostgresqlStorage) GetDriverRegularRoutesForTile(day string, gridId int64) (regular_routes []*geojson.FeatureCollection, err error) {
|
|
req := fmt.Sprintf(`select id, route
|
|
from %s inner join %s on id = route_id
|
|
where is_driver = true and day = $1 and $2 = ANY (grid_ids)`, s.Tables["regular_routes"], s.Tables["regular_route_schedules"])
|
|
rows, err := s.DbConnection.Query(req, day, gridId)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("GetDriverRegularRoutesForTile query error")
|
|
return nil, err
|
|
}
|
|
|
|
results := []*geojson.FeatureCollection{}
|
|
|
|
for rows.Next() {
|
|
|
|
var id string
|
|
var route []byte
|
|
err := rows.Scan(
|
|
&id,
|
|
&route,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
fc, err := geojson.UnmarshalFeatureCollection(route)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
results = append(results, fc)
|
|
}
|
|
|
|
return results, nil
|
|
}
|
|
|
|
func (s PostgresqlStorage) GetPassengerRegularRoutesForTile(day string, gridId int64) (regular_routes []*geojson.FeatureCollection, err error) {
|
|
req := fmt.Sprintf(`select id, route
|
|
from %s join %s on id = route_id
|
|
where is_passenger = true and day = $1 and $2 = ANY (grid_ids)`, s.Tables["regular_routes"], s.Tables["regular_route_schedules"])
|
|
rows, err := s.DbConnection.Query(req, day, gridId)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
results := []*geojson.FeatureCollection{}
|
|
|
|
for rows.Next() {
|
|
|
|
var id string
|
|
var route []byte
|
|
err := rows.Scan(
|
|
&id,
|
|
&route,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
fc, err := geojson.UnmarshalFeatureCollection(route)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
results = append(results, fc)
|
|
}
|
|
|
|
return results, nil
|
|
}
|
|
|
|
func (s PostgresqlStorage) CreateBooking(booking internal.Booking) error {
|
|
req := fmt.Sprintf(`insert into %s (id, booking, status, driver_route, passenger_route)
|
|
values($1, $2, $3, $4, $5)`, s.Tables["bookings"])
|
|
|
|
id := booking.ID
|
|
status := booking.Status.String()
|
|
|
|
jsonbooking, err := json.Marshal(booking)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("issue marshalling booking to json")
|
|
return err
|
|
}
|
|
|
|
var jsondriverroute, jsonpassengerroute *[]byte
|
|
if booking.DriverRoute != nil {
|
|
jdr, err := booking.DriverRoute.MarshalJSON()
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("error marshalling driver route")
|
|
return err
|
|
}
|
|
jsondriverroute = &jdr
|
|
}
|
|
if booking.PassengerRoute != nil {
|
|
jpr, err := booking.PassengerRoute.MarshalJSON()
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("error marshalling passenger route")
|
|
return err
|
|
}
|
|
|
|
jsonpassengerroute = &jpr
|
|
}
|
|
|
|
_, err = s.DbConnection.Exec(req, id, jsonbooking, status, jsondriverroute, jsonpassengerroute)
|
|
if err != nil {
|
|
log.Error().Err(err).Str("request", req).Msg("error creating booking")
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s PostgresqlStorage) GetBooking(id string) (*internal.Booking, error) {
|
|
req := fmt.Sprintf(`select booking, status, driver_route, passenger_route
|
|
from %s where id=$1`, s.Tables["bookings"])
|
|
var booking ocss.Booking
|
|
var status string
|
|
var bookingbytes, driverroute, passengerroute []byte
|
|
err := s.DbConnection.QueryRow(req, id).Scan(
|
|
&bookingbytes,
|
|
&status,
|
|
&driverroute,
|
|
&passengerroute,
|
|
)
|
|
if err != nil {
|
|
log.Error().Err(err).Str("booking id", id).Msg("not able to get and scan booking")
|
|
return nil, err
|
|
}
|
|
|
|
err = json.Unmarshal(bookingbytes, &booking)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("issue unmarshalling booking")
|
|
return nil, err
|
|
}
|
|
|
|
// Override booking status
|
|
booking.Status = ocss.BookingStatusFromString(status)
|
|
|
|
var dr, pr *geojson.FeatureCollection
|
|
if driverroute != nil {
|
|
dr, err = geojson.UnmarshalFeatureCollection(driverroute)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("could not unmarshal driver route feature collection")
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if passengerroute != nil {
|
|
pr, err = geojson.UnmarshalFeatureCollection(passengerroute)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("could not unmarshal passenger route feature collection")
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return &internal.Booking{
|
|
Booking: booking,
|
|
DriverRoute: dr,
|
|
PassengerRoute: pr,
|
|
}, nil
|
|
}
|
|
|
|
func (s PostgresqlStorage) UpdateBookingStatus(bookingid string, status string) error {
|
|
req := fmt.Sprintf(`update %s set status = $1 where id=$2`, s.Tables["bookings"])
|
|
_, err := s.DbConnection.Exec(req, status, bookingid)
|
|
if err != nil {
|
|
log.Error().Err(err).Str("request", req).Str("booking id", bookingid).Str("status", status).Msg("error while updating booking status")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s PostgresqlStorage) GetUserBookings(userid string) ([]internal.Booking, error) {
|
|
req := fmt.Sprintf(`select booking, status, driver_route, passenger_route from %s
|
|
where booking->'driver'->>'id' = $1 or booking->'passenger'->>'id' = $2`, s.Tables["bookings"])
|
|
rows, err := s.DbConnection.Query(req, userid, userid)
|
|
if err != nil {
|
|
log.Error().Err(err).Str("user id", userid).Msg("GetUserBookings query issue")
|
|
}
|
|
|
|
results := []internal.Booking{}
|
|
for rows.Next() {
|
|
var booking ocss.Booking
|
|
var status string
|
|
var bookingbytes, driverroute, passengerroute []byte
|
|
err := rows.Scan(
|
|
&bookingbytes,
|
|
&status,
|
|
&driverroute,
|
|
&passengerroute,
|
|
)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("not able to get and scan booking in GetUsersBooking")
|
|
return nil, err
|
|
}
|
|
|
|
err = json.Unmarshal(bookingbytes, &booking)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("issue unmarshalling booking in GetUsersBooking")
|
|
return nil, err
|
|
}
|
|
|
|
// Override booking status
|
|
booking.Status = ocss.BookingStatusFromString(status)
|
|
|
|
var dr, pr *geojson.FeatureCollection
|
|
if driverroute != nil {
|
|
dr, err = geojson.UnmarshalFeatureCollection(driverroute)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("could not unmarshal driver route feature collection")
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if passengerroute != nil {
|
|
pr, err = geojson.UnmarshalFeatureCollection(passengerroute)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("could not unmarshal passenger route feature collection")
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
results = append(results, internal.Booking{
|
|
Booking: booking,
|
|
DriverRoute: dr,
|
|
PassengerRoute: pr,
|
|
})
|
|
|
|
}
|
|
return results, nil
|
|
}
|
|
|
|
func (s PostgresqlStorage) StoreRouteSchedules(journeys []internal.PlannedRouteSchedule) error {
|
|
tx, err := s.DbConnection.Begin()
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("issue starting pg transaction")
|
|
return err
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
req := fmt.Sprintf("insert into %s (id, data) values ($1, $2)", s.Tables["journeys_cache"])
|
|
stmt, err := tx.Prepare(req)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("issue creating prepared statement in StoreRouteSchedules")
|
|
return err
|
|
}
|
|
|
|
for _, j := range journeys {
|
|
jsonjourney, err := json.Marshal(j)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("error unmarshalling Route Schedule")
|
|
return err
|
|
}
|
|
stmt.Exec(j.ID, jsonjourney)
|
|
}
|
|
|
|
if err = tx.Commit(); err != nil {
|
|
log.Error().Err(err).Msg("issue while commiting transaction in StoreRouteSchedules")
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
func (s PostgresqlStorage) GetRouteSchedule(id string) (*internal.PlannedRouteSchedule, error) {
|
|
req := fmt.Sprintf("select data from %s where id = $1", s.Tables["journeys_cache"])
|
|
var jsonjourney []byte
|
|
err := s.DbConnection.QueryRow(req, id).Scan(
|
|
&jsonjourney,
|
|
)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("issue scanning result in GetRouteSchedule")
|
|
return nil, err
|
|
}
|
|
|
|
var result internal.PlannedRouteSchedule
|
|
err = json.Unmarshal(jsonjourney, &result)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("error unmarshalling returned route schedule")
|
|
}
|
|
|
|
return &result, nil
|
|
}
|
|
|
|
func (s PostgresqlStorage) Migrate() error {
|
|
ctx := context.Background()
|
|
driver, err := postgres.Open(s.DbConnection)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
existing, err := driver.InspectRealm(ctx, &schema.InspectRealmOption{Schemas: []string{s.Schema}})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var desired schema.Realm
|
|
|
|
hcl, err := os.ReadFile("postgresql/schema.hcl")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = postgres.EvalHCLBytes(hcl, &desired, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
diff, err := driver.RealmDiff(existing, &desired)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = driver.ApplyChanges(ctx, diff)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|