package storage import ( "context" "database/sql" "encoding/json" "errors" "fmt" "os" "strconv" "ariga.io/atlas/sql/postgres" "ariga.io/atlas/sql/schema" "git.coopgo.io/coopgo-platform/carpool-service/internal" "git.coopgo.io/coopgo-platform/carpool-service/interoperability/ocss" "github.com/lib/pq" _ "github.com/lib/pq" "github.com/paulmach/orb/geojson" "github.com/rs/zerolog/log" "github.com/spf13/viper" "github.com/stretchr/objx" ) type PostgresqlStorage struct { DbConnection *sql.DB Schema string Tables map[string]string } func NewPostgresqlStorage(cfg *viper.Viper) (PostgresqlStorage, error) { var ( host = cfg.GetString("storage.db.psql.host") port = cfg.GetString("storage.db.psql.port") user = cfg.GetString("storage.db.psql.user") password = cfg.GetString("storage.db.psql.password") dbname = cfg.GetString("storage.db.psql.dbname") sslmode = cfg.GetString("storage.db.psql.sslmode") pg_schema = cfg.GetString("storage.db.psql.schema") pgtables_regular_routes = cfg.GetString("storage.db.psql.tables.regular_routes") pgtables_regular_route_schedules = cfg.GetString("storage.db.psql.tables.regular_route_schedules") pgtables_punctual_routes = cfg.GetString("storage.db.psql.tables.punctual_routes") pgtables_bookings = cfg.GetString("storage.db.psql.tables.bookings") pgtables_journeys_cache = cfg.GetString("storage.db.psql.tables.journeys_cache") ) portInt, _ := strconv.Atoi(port) psqlconn := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s", host, portInt, user, password, dbname, sslmode) db, err := sql.Open("postgres", psqlconn) if err != nil { log.Error().Err(err).Msg("opening connection to postgresql failed") return PostgresqlStorage{}, fmt.Errorf("connection to postgresql failed") } err = db.Ping() if err != nil { log.Error().Err(err).Msg("ping to postgresql failed") return PostgresqlStorage{}, fmt.Errorf("connection to postgresql database failed") } return PostgresqlStorage{ DbConnection: db, Schema: pg_schema, Tables: map[string]string{ "regular_routes": fmt.Sprintf("%s.%s", pg_schema, pgtables_regular_routes), "regular_route_schedules": fmt.Sprintf("%s.%s", pg_schema, pgtables_regular_route_schedules), "punctual_routes": fmt.Sprintf("%s.%s", pg_schema, pgtables_punctual_routes), "bookings": fmt.Sprintf("%s.%s", pg_schema, pgtables_bookings), "journeys_cache": fmt.Sprintf("%s.%s", pg_schema, pgtables_journeys_cache), }, }, nil } func (s PostgresqlStorage) CreateRegularRoutes(routes []*geojson.FeatureCollection) error { log.Debug().Msg("Postgresql Storage - CreateRegularRoutes") tx, err := s.DbConnection.Begin() if err != nil { log.Error().Err(err).Msg("error initializing transaction") return err } defer tx.Rollback() req_routes := fmt.Sprintf(`INSERT INTO %s (id, user_id, routes_group_id, grid_ids, is_driver, is_passenger, route) VALUES ($1, $2, $3, $4,$5, $6, $7)`, s.Tables["regular_routes"]) stmt_routes, err := tx.Prepare(req_routes) if err != nil { log.Error().Err(err).Msg("error preparing regular routes statement for multiple inserts") return err } defer stmt_routes.Close() req_schedules := fmt.Sprintf(`INSERT INTO %s (route_id, day, time_of_day) VALUES ($1, $2, $3)`, s.Tables["regular_route_schedules"]) stmt_schedules, err := tx.Prepare(req_schedules) if err != nil { log.Error().Err(err).Msg("error preparing schedules statement for multiple inserts") return err } defer stmt_schedules.Close() for _, fc := range routes { if fc != nil { id := fc.ExtraMembers.MustString("id") properties := objx.New(fc.ExtraMembers["properties"]) userid := properties.Get("user.id").Str() gridids := fc.ExtraMembers["grid_ids"] groupid := fc.ExtraMembers["routes_group_id"] isdriver := properties.Get("is_driver").Bool() ispassenger := properties.Get("is_passenger").Bool() route, err := fc.MarshalJSON() if err != nil { log.Error().Err(err).Msg("error creating regular route") return err } _, err = stmt_routes.Exec(id, userid, groupid, pq.Array(gridids), isdriver, ispassenger, route) if err != nil { log.Error().Err(err).Msg("error inserting regular route") return err } schedules := properties.Get("schedules") if schedules == nil { err = errors.New("could not get schedules for route") log.Error().Err(err).Str("route id", id).Msg("issue in CreateRegularRoute") return err } for _, schedule := range schedules.MustObjxMapSlice() { day := schedule.Get("day").Str() timeOfDay := schedule.Get("time_of_day").Str() _, err = stmt_schedules.Exec(id, day, timeOfDay) if err != nil { log.Error().Err(err).Msg("issue creating route schedule") return err } } } } if err = tx.Commit(); err != nil { log.Error().Err(err).Msg("issue committing transaction") return err } return nil } func (s PostgresqlStorage) GetUserRegularRoutes(userid string) ([]*geojson.FeatureCollection, error) { req := fmt.Sprintf(`select id, route from %s where user_id = $1`, s.Tables["regular_routes"]) rows, err := s.DbConnection.Query(req, userid) if err != nil { log.Error().Err(err).Str("request", req).Str("user id", userid).Msg("GetUserRegularRoutes query issue") return nil, err } results := []*geojson.FeatureCollection{} for rows.Next() { var id string var route []byte err := rows.Scan( &id, &route, ) if err != nil { return nil, err } fc, err := geojson.UnmarshalFeatureCollection(route) if err != nil { return nil, err } results = append(results, fc) } return results, nil } func (s PostgresqlStorage) GetDriverRegularRoutesForTile(day string, gridId int64) (regular_routes []*geojson.FeatureCollection, err error) { req := fmt.Sprintf(`select id, route from %s inner join %s on id = route_id where is_driver = true and day = $1 and $2 = ANY (grid_ids)`, s.Tables["regular_routes"], s.Tables["regular_route_schedules"]) rows, err := s.DbConnection.Query(req, day, gridId) if err != nil { log.Error().Err(err).Msg("GetDriverRegularRoutesForTile query error") return nil, err } results := []*geojson.FeatureCollection{} for rows.Next() { var id string var route []byte err := rows.Scan( &id, &route, ) if err != nil { return nil, err } fc, err := geojson.UnmarshalFeatureCollection(route) if err != nil { return nil, err } results = append(results, fc) } return results, nil } func (s PostgresqlStorage) GetPassengerRegularRoutesForTile(day string, gridId int64) (regular_routes []*geojson.FeatureCollection, err error) { req := fmt.Sprintf(`select id, route from %s join %s on id = route_id where is_passenger = true and day = $1 and $2 = ANY (grid_ids)`, s.Tables["regular_routes"], s.Tables["regular_route_schedules"]) rows, err := s.DbConnection.Query(req, day, gridId) if err != nil { return nil, err } results := []*geojson.FeatureCollection{} for rows.Next() { var id string var route []byte err := rows.Scan( &id, &route, ) if err != nil { return nil, err } fc, err := geojson.UnmarshalFeatureCollection(route) if err != nil { return nil, err } results = append(results, fc) } return results, nil } func (s PostgresqlStorage) CreateBooking(booking internal.Booking) error { req := fmt.Sprintf(`insert into %s (id, booking, status, driver_route, passenger_route) values($1, $2, $3, $4, $5)`, s.Tables["bookings"]) id := booking.ID status := booking.Status.String() jsonbooking, err := json.Marshal(booking) if err != nil { log.Error().Err(err).Msg("issue marshalling booking to json") return err } var jsondriverroute, jsonpassengerroute *[]byte if booking.DriverRoute != nil { jdr, err := booking.DriverRoute.MarshalJSON() if err != nil { log.Error().Err(err).Msg("error marshalling driver route") return err } jsondriverroute = &jdr } if booking.PassengerRoute != nil { jpr, err := booking.PassengerRoute.MarshalJSON() if err != nil { log.Error().Err(err).Msg("error marshalling passenger route") return err } jsonpassengerroute = &jpr } _, err = s.DbConnection.Exec(req, id, jsonbooking, status, jsondriverroute, jsonpassengerroute) if err != nil { log.Error().Err(err).Str("request", req).Msg("error creating booking") return err } return nil } func (s PostgresqlStorage) GetBooking(id string) (*internal.Booking, error) { req := fmt.Sprintf(`select booking, status, driver_route, passenger_route from %s where id=$1`, s.Tables["bookings"]) var booking ocss.Booking var status string var bookingbytes, driverroute, passengerroute []byte err := s.DbConnection.QueryRow(req, id).Scan( &bookingbytes, &status, &driverroute, &passengerroute, ) if err != nil { log.Error().Err(err).Str("booking id", id).Msg("not able to get and scan booking") return nil, err } err = json.Unmarshal(bookingbytes, &booking) if err != nil { log.Error().Err(err).Msg("issue unmarshalling booking") return nil, err } // Override booking status booking.Status = ocss.BookingStatusFromString(status) var dr, pr *geojson.FeatureCollection if driverroute != nil { dr, err = geojson.UnmarshalFeatureCollection(driverroute) if err != nil { log.Error().Err(err).Msg("could not unmarshal driver route feature collection") return nil, err } } if passengerroute != nil { pr, err = geojson.UnmarshalFeatureCollection(passengerroute) if err != nil { log.Error().Err(err).Msg("could not unmarshal passenger route feature collection") return nil, err } } return &internal.Booking{ Booking: booking, DriverRoute: dr, PassengerRoute: pr, }, nil } func (s PostgresqlStorage) UpdateBookingStatus(bookingid string, status string) error { req := fmt.Sprintf(`update %s set status = $1 where id=$2`, s.Tables["bookings"]) _, err := s.DbConnection.Exec(req, status, bookingid) if err != nil { log.Error().Err(err).Str("request", req).Str("booking id", bookingid).Str("status", status).Msg("error while updating booking status") } return nil } func (s PostgresqlStorage) GetUserBookings(userid string) ([]internal.Booking, error) { req := fmt.Sprintf(`select booking, status, driver_route, passenger_route from %s where booking->'driver'->>'id' = $1 or booking->'passenger'->>'id' = $2`, s.Tables["bookings"]) rows, err := s.DbConnection.Query(req, userid, userid) if err != nil { log.Error().Err(err).Str("user id", userid).Msg("GetUserBookings query issue") } results := []internal.Booking{} for rows.Next() { var booking ocss.Booking var status string var bookingbytes, driverroute, passengerroute []byte err := rows.Scan( &bookingbytes, &status, &driverroute, &passengerroute, ) if err != nil { log.Error().Err(err).Msg("not able to get and scan booking in GetUsersBooking") return nil, err } err = json.Unmarshal(bookingbytes, &booking) if err != nil { log.Error().Err(err).Msg("issue unmarshalling booking in GetUsersBooking") return nil, err } // Override booking status booking.Status = ocss.BookingStatusFromString(status) var dr, pr *geojson.FeatureCollection if driverroute != nil { dr, err = geojson.UnmarshalFeatureCollection(driverroute) if err != nil { log.Error().Err(err).Msg("could not unmarshal driver route feature collection") return nil, err } } if passengerroute != nil { pr, err = geojson.UnmarshalFeatureCollection(passengerroute) if err != nil { log.Error().Err(err).Msg("could not unmarshal passenger route feature collection") return nil, err } } results = append(results, internal.Booking{ Booking: booking, DriverRoute: dr, PassengerRoute: pr, }) } return results, nil } func (s PostgresqlStorage) StoreRouteSchedules(journeys []internal.PlannedRouteSchedule) error { tx, err := s.DbConnection.Begin() if err != nil { log.Error().Err(err).Msg("issue starting pg transaction") return err } defer tx.Rollback() req := fmt.Sprintf("insert into %s (id, data) values ($1, $2)", s.Tables["journeys_cache"]) stmt, err := tx.Prepare(req) if err != nil { log.Error().Err(err).Msg("issue creating prepared statement in StoreRouteSchedules") return err } for _, j := range journeys { jsonjourney, err := json.Marshal(j) if err != nil { log.Error().Err(err).Msg("error unmarshalling Route Schedule") return err } stmt.Exec(j.ID, jsonjourney) } if err = tx.Commit(); err != nil { log.Error().Err(err).Msg("issue while commiting transaction in StoreRouteSchedules") return err } return nil } func (s PostgresqlStorage) GetRouteSchedule(id string) (*internal.PlannedRouteSchedule, error) { req := fmt.Sprintf("select data from %s where id = $1", s.Tables["journeys_cache"]) var jsonjourney []byte err := s.DbConnection.QueryRow(req, id).Scan( &jsonjourney, ) if err != nil { log.Error().Err(err).Msg("issue scanning result in GetRouteSchedule") return nil, err } var result internal.PlannedRouteSchedule err = json.Unmarshal(jsonjourney, &result) if err != nil { log.Error().Err(err).Msg("error unmarshalling returned route schedule") } return &result, nil } func (s PostgresqlStorage) Migrate() error { ctx := context.Background() driver, err := postgres.Open(s.DbConnection) if err != nil { return err } existing, err := driver.InspectRealm(ctx, &schema.InspectRealmOption{Schemas: []string{s.Schema}}) if err != nil { return err } var desired schema.Realm hcl, err := os.ReadFile("postgresql/schema.hcl") if err != nil { return err } err = postgres.EvalHCLBytes(hcl, &desired, nil) if err != nil { return err } diff, err := driver.RealmDiff(existing, &desired) if err != nil { return err } err = driver.ApplyChanges(ctx, diff) if err != nil { return err } return nil }