solidarity-service/storage/postgresql.go

944 lines
30 KiB
Go

package storage
import (
"database/sql"
"encoding/json"
"errors"
"fmt"
"github.com/google/uuid"
_ "github.com/lib/pq"
"github.com/paulmach/orb"
"github.com/paulmach/orb/geojson"
"github.com/rs/zerolog/log"
"github.com/spf13/viper"
"solidarity-service/internal"
"solidarity-service/utils"
"strconv"
"strings"
"sync"
"time"
)
type PostgresqlStorage struct {
DbConnection *sql.DB
Schema string
Tables map[string]string
}
func NewPostgresqlStorage(cfg *viper.Viper) (PostgresqlStorage, error) {
var (
host = cfg.GetString("storage.db.psql.host")
port = cfg.GetString("storage.db.psql.port")
user = cfg.GetString("storage.db.psql.user")
password = cfg.GetString("storage.db.psql.password")
dbname = cfg.GetString("storage.db.psql.dbname")
sslmode = cfg.GetString("storage.db.psql.sslmode")
pg_schema = cfg.GetString("storage.db.psql.schema")
pgtables_drivers = cfg.GetString("storage.db.psql.tables.drivers")
pgtables_passengers = cfg.GetString("storage.db.psql.tables.passengers")
pgtables_bookings = cfg.GetString("storage.db.psql.tables.bookings")
)
portInt, _ := strconv.Atoi(port)
psqlconn := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s", host, portInt, user, password, dbname, sslmode)
db, err := sql.Open("postgres", psqlconn)
if err != nil {
log.Error().Err(err).Msg("opening connection to postgresql failed")
return PostgresqlStorage{}, fmt.Errorf("connection to postgresql failed")
}
err = db.Ping()
if err != nil {
log.Error().Err(err).Msg("ping to postgresql failed")
return PostgresqlStorage{}, fmt.Errorf("connection to postgresql database failed")
}
return PostgresqlStorage{
DbConnection: db,
Schema: pg_schema,
Tables: map[string]string{
"drivers": fmt.Sprintf("%s.%s", pg_schema, pgtables_drivers),
"bookings": fmt.Sprintf("%s.%s", pg_schema, pgtables_bookings),
"passengers": fmt.Sprintf("%s.%s", pg_schema, pgtables_passengers),
},
}, nil
}
func (s PostgresqlStorage) CreateFirebaseToken(user_id string, fcm_token string, device_platform string) (err error) {
_, err = uuid.Parse(user_id)
if err != nil {
log.Error().Err(err).Msg("Postgresql Storage CreateFirebaseToken invalid User ID")
return err
}
_, err = s.DbConnection.Exec(fmt.Sprintf("INSERT INTO %s (user_id , fcm_token , device_platform) VALUES($1,$2,$3)", s.Tables["users_firebase"]),
user_id,
fcm_token,
device_platform)
if err != nil {
if strings.Contains(err.Error(), "duplicate key") {
_ = s.UpdateFirebaseToken(user_id, device_platform, fcm_token)
}
}
return nil
}
func (s PostgresqlStorage) GetFirebaseToken(user_id string) (fcm string, device_platform string, err error) {
err = s.DbConnection.QueryRow(fmt.Sprintf("SELECT fcm_token , device_platform FROM %s WHERE user_id = $1", s.Tables["users_firebase"]), user_id).
Scan(
&fcm,
&device_platform,
)
if err != nil {
return "", "", err
}
return fcm, device_platform, nil
}
func (s PostgresqlStorage) UpdateFirebaseToken(user_id string, fcm_token string, device_platform string) error {
query := fmt.Sprintf("UPDATE %s SET fcm_token = $1 device_platform = $2 WHERE user_id = $3", s.Tables["users_firebase"])
_, err := s.DbConnection.Exec(query, fcm_token, device_platform, user_id)
if err != nil {
return err
}
return nil
}
func (s PostgresqlStorage) CreatePassenger(passenger internal.Passenger) (err error) {
_, err = uuid.Parse(passenger.Passenger.ID)
if err != nil {
log.Error().Err(err).Msg("Postgresql Storage CreatePassenger invalid ID")
return err
}
if passenger.Passenger.Alias == "" || passenger.Passenger.Operator == "" {
errMsg := "Postgresql Storage CreatePassenger empty alias or operator FQDN."
log.Error().Msg(errMsg)
return errors.New(errMsg)
}
preferencesJSON, err := json.Marshal(passenger.Preferences)
if err != nil {
errMsg := "Postgresql Storage CreatePassenger Error encoding Preferences to JSON"
log.Error().Err(err).Msg(errMsg)
return errors.New(errMsg + err.Error())
}
_, err = s.DbConnection.Exec(fmt.Sprintf("INSERT INTO %s (passenger_id, alias, last_name, first_name, grade, picture, verified_identity, operator, preferences) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9)", s.Tables["passengers"]),
passenger.Passenger.ID,
passenger.Passenger.Alias,
passenger.Passenger.LastName,
passenger.Passenger.FirstName,
passenger.Passenger.Grade,
passenger.Passenger.Picture,
passenger.Passenger.VerifiedIdentity,
passenger.Passenger.Operator,
preferencesJSON)
if err != nil {
if strings.Contains(err.Error(), utils.SQL_DUPLICATE) {
err = s.UpdatePassenger(passenger)
if err != nil {
return err
}
return nil
}
errMsg := "Postgresql Storage CreatePassenger Error inserting data into the database"
log.Error().Err(err).Msg(errMsg)
return errors.New(errMsg + err.Error())
}
return nil
}
func (s PostgresqlStorage) UpdatePassenger(passenger internal.Passenger) (err error) {
_, err = uuid.Parse(passenger.Passenger.ID)
if err != nil {
log.Error().Err(err).Msg("Postgresql Storage UpdatePassenger invalid ID")
return err
}
if passenger.Passenger.Alias == "" || passenger.Passenger.Operator == "" {
errMsg := "Postgresql Storage UpdatePassenger empty alias or operator FQDN."
log.Error().Msg(errMsg)
return errors.New(errMsg)
}
preferencesJSON, err := json.Marshal(passenger.Preferences)
if err != nil {
errMsg := "Postgresql Storage UpdatePassenger Error encoding Preferences to JSON"
log.Error().Err(err).Msg(errMsg)
return errors.New(errMsg + err.Error())
}
_, err = s.DbConnection.Exec(fmt.Sprintf(`
UPDATE %s
SET
alias = $2,
last_name = $3,
first_name = $4,
grade = $5,
picture = $6,
verified_identity = $7,
operator = $8,
preferences = $9
WHERE passenger_id = $1
`, s.Tables["passengers"]),
passenger.Passenger.ID,
passenger.Passenger.Alias,
passenger.Passenger.LastName,
passenger.Passenger.FirstName,
passenger.Passenger.Grade,
passenger.Passenger.Picture,
passenger.Passenger.VerifiedIdentity,
passenger.Passenger.Operator,
preferencesJSON)
if err != nil {
errMsg := "Postgresql Storage UpdatePassenger Error updating data in the database"
log.Error().Err(err).Msg(errMsg)
return errors.New(errMsg + err.Error())
}
return nil
}
func (s PostgresqlStorage) GetPassenger(passengerID string) (passenger internal.Passenger, err error) {
var preferencesJSON []byte
err = s.DbConnection.QueryRow(fmt.Sprintf("SELECT alias, last_name, first_name, grade, picture, verified_identity, operator, preferences, passenger_id FROM %s WHERE passenger_id = $1", s.Tables["passengers"]), passengerID).
Scan(
&passenger.Passenger.Alias,
&passenger.Passenger.LastName,
&passenger.Passenger.FirstName,
&passenger.Passenger.Grade,
&passenger.Passenger.Picture,
&passenger.Passenger.VerifiedIdentity,
&passenger.Passenger.Operator,
&preferencesJSON,
&passenger.Passenger.ID,
)
if err != nil {
errMsg := "Postgresql Storage GetPassenger Error querying data from the database"
log.Error().Err(err).Msg(errMsg)
return passenger, errors.New(errMsg + err.Error())
}
err = json.Unmarshal(preferencesJSON, &passenger.Preferences)
if err != nil {
errMsg := "Postgresql Storage GetPassenger Error decoding Preferences from JSON"
log.Error().Err(err).Msg(errMsg)
return passenger, errors.New(errMsg + err.Error())
}
return passenger, nil
}
func (s PostgresqlStorage) GetAllPassengers() (passengers []internal.Passenger, err error) {
var preferencesJSON []byte
rows, err := s.DbConnection.Query(fmt.Sprintf("SELECT alias, last_name, first_name, grade, picture, verified_identity, operator, preferences, passenger_id FROM %s ", s.Tables["passengers"]))
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var passenger internal.Passenger
if err := rows.Scan(
&passenger.Passenger.Alias,
&passenger.Passenger.LastName,
&passenger.Passenger.FirstName,
&passenger.Passenger.Grade,
&passenger.Passenger.Picture,
&passenger.Passenger.VerifiedIdentity,
&passenger.Passenger.Operator,
&preferencesJSON,
&passenger.Passenger.ID,
); err != nil {
return nil, err
}
passengers = append(passengers, passenger)
}
if err != nil {
errMsg := "Postgresql Storage GetPassenger Error querying data from the database"
log.Error().Err(err).Msg(errMsg)
return passengers, errors.New(errMsg + err.Error())
}
for i := range passengers {
err = json.Unmarshal(preferencesJSON, &passengers[i].Preferences)
if err != nil {
errMsg := "Postgresql Storage GetPassenger Error decoding Preferences from JSON"
log.Error().Err(err).Msg(errMsg)
return passengers, errors.New(errMsg + err.Error())
}
}
return passengers, nil
}
func (s PostgresqlStorage) CreateDriver(driver internal.Driver) (err error) {
var availabilities []byte
_, err = uuid.Parse(driver.Driver.ID)
if err != nil {
log.Error().Err(err).Msg("Postgresql Storage CreateDriver invalid ID")
return err
}
departureJSON, err := json.Marshal(driver.Driver_departure_address)
if err != nil {
errMsg := "Postgresql Storage CreateDriver Error encoding departure Feature to JSON"
log.Error().Err(err).Msg(errMsg)
return errors.New(errMsg + err.Error())
}
preferencesJSON, err := json.Marshal(driver.Preferences)
if err != nil {
errMsg := "Postgresql Storage CreateDriver Error encoding Preferences to JSON"
log.Error().Err(err).Msg(errMsg + err.Error())
return errors.New(errMsg)
}
carJSON, err := json.Marshal(driver.Car)
if err != nil {
errMsg := "Postgresql Storage CreateDriver Error encoding Car to JSON"
log.Error().Err(err).Msg(errMsg)
return errors.New(errMsg + err.Error())
}
if driver.Driver.Alias == "" || driver.Driver.Operator == "" {
errMsg := "Postgresql Storage CreateDriver empty alias or operator FQDN."
log.Error().Msg(errMsg)
return errors.New(errMsg)
}
if driver.AvailabilitiesType != internal.Punctual && driver.AvailabilitiesType != internal.Regular {
errMsg := "Postgresql Storage CreateDriver invalid Availabilities Type"
log.Error().Msg(errMsg)
return errors.New(errMsg)
}
if driver.Radius == 0 {
errMsg := "Postgresql Storage CreateDriver Radius has to be defined"
log.Error().Msg(errMsg)
return errors.New(errMsg)
}
switch driver.AvailabilitiesType {
case internal.Punctual:
availabilities, err = json.Marshal(driver.PunctualAvailabilities)
if err != nil {
errMsg := "Postgresql Storage CreateDriver error converting Punctual availabilities"
log.Error().Msg(errMsg)
return errors.New(errMsg + err.Error())
}
case internal.Regular:
availabilities, err = json.Marshal(driver.RegularAvailabilities)
if err != nil {
errMsg := "Postgresql Storage CreateDriver error converting Regular availabilities"
log.Error().Msg(errMsg)
return errors.New(errMsg + err.Error())
}
}
_, err = s.DbConnection.Exec(fmt.Sprintf("INSERT INTO %s (driver_id,driver_departure_route,driver_radius,last_name,first_name,grade,alias,picture,verified_identity,preferences,availabilities_type,availabilities,operator , car) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14)", s.Tables["drivers"]),
driver.Driver.ID,
departureJSON,
driver.Radius,
driver.Driver.LastName,
driver.Driver.FirstName,
driver.Driver.Grade,
driver.Driver.Alias,
driver.Driver.Picture,
driver.Driver.VerifiedIdentity,
preferencesJSON,
driver.AvailabilitiesType,
string(availabilities),
driver.Driver.Operator,
carJSON,
)
if err != nil {
if strings.Contains(err.Error(), utils.SQL_DUPLICATE) {
err = s.UpdateDriver(driver)
if err != nil {
return err
}
return nil
}
errMsg := "Postgresql Storage CreateDriver Error inserting data into the database"
log.Error().Err(err).Msg(errMsg)
return errors.New(errMsg + err.Error())
}
return nil
}
func (s PostgresqlStorage) UpdateDriver(driver internal.Driver) (err error) {
var availabilities []byte
_, err = uuid.Parse(driver.Driver.ID)
if err != nil {
log.Error().Err(err).Msg("Postgresql Storage UpdateDriver invalid ID")
return err
}
departureJSON, err := json.Marshal(driver.Driver_departure_address)
if err != nil {
errMsg := "Postgresql Storage UpdateDriver Error encoding departure Feature to JSON"
log.Error().Err(err).Msg(errMsg)
return errors.New(errMsg + err.Error())
}
preferencesJSON, err := json.Marshal(driver.Preferences)
if err != nil {
errMsg := "Postgresql Storage UpdateDriver Error encoding Preferences to JSON"
log.Error().Err(err).Msg(errMsg + err.Error())
return errors.New(errMsg)
}
carJSON, err := json.Marshal(driver.Car)
if err != nil {
errMsg := "Postgresql Storage UpdateDriver Error encoding Car to JSON"
log.Error().Err(err).Msg(errMsg)
return errors.New(errMsg + err.Error())
}
if driver.Driver.Alias == "" || driver.Driver.Operator == "" {
errMsg := "Postgresql Storage UpdateDriver empty alias or operator FQDN."
log.Error().Msg(errMsg)
return errors.New(errMsg)
}
if driver.AvailabilitiesType != internal.Punctual && driver.AvailabilitiesType != internal.Regular {
errMsg := "Postgresql Storage UpdateDriver invalid Availabilities Type"
log.Error().Msg(errMsg)
return errors.New(errMsg)
}
if driver.Radius == 0 {
errMsg := "Postgresql Storage UpdateDriver Radius has to be defined"
log.Error().Msg(errMsg)
return errors.New(errMsg)
}
switch driver.AvailabilitiesType {
case internal.Punctual:
availabilities, err = json.Marshal(driver.PunctualAvailabilities)
if err != nil {
errMsg := "Postgresql Storage UpdateDriver error converting Punctual availabilities"
log.Error().Msg(errMsg)
return errors.New(errMsg + err.Error())
}
case internal.Regular:
availabilities, err = json.Marshal(driver.RegularAvailabilities)
if err != nil {
errMsg := "Postgresql Storage UpdateDriver error converting Regular availabilities"
log.Error().Msg(errMsg)
return errors.New(errMsg + err.Error())
}
}
_, err = s.DbConnection.Exec(fmt.Sprintf(`
UPDATE %s
SET
driver_departure_route = $2,
driver_radius = $3,
last_name = $4,
first_name = $5,
grade = $6,
alias = $7,
picture = $8,
verified_identity = $9,
preferences = $10,
availabilities_type = $11,
availabilities = $12,
operator = $13,
car = $14
WHERE driver_id = $1
`, s.Tables["drivers"]),
driver.Driver.ID,
departureJSON,
driver.Radius,
driver.Driver.LastName,
driver.Driver.FirstName,
driver.Driver.Grade,
driver.Driver.Alias,
driver.Driver.Picture,
driver.Driver.VerifiedIdentity,
preferencesJSON,
driver.AvailabilitiesType,
string(availabilities),
driver.Driver.Operator,
carJSON,
)
if err != nil {
errMsg := "Postgresql Storage UpdateDriver Error updating data in the database"
log.Error().Err(err).Msg(errMsg)
return errors.New(errMsg + err.Error())
}
return nil
}
func (s PostgresqlStorage) GetDriver(driverID string) (driver internal.Driver, err error) {
var preferencesJSON []byte
var departureAddress []byte
var availabilitiesJSON []byte
var carJSON []byte
err = s.DbConnection.QueryRow(fmt.Sprintf("SELECT driver_departure_route, driver_radius, last_name, first_name, grade, alias, picture, verified_identity, preferences, availabilities_type, availabilities, operator , driver_id , car FROM %s WHERE driver_id = $1", s.Tables["drivers"]), driverID).
Scan(
&departureAddress,
&driver.Radius,
&driver.Driver.LastName,
&driver.Driver.FirstName,
&driver.Driver.Grade,
&driver.Driver.Alias,
&driver.Driver.Picture,
&driver.Driver.VerifiedIdentity,
&preferencesJSON,
&driver.AvailabilitiesType,
&availabilitiesJSON,
&driver.Driver.Operator,
&driver.Driver.ID,
&carJSON,
)
if err != nil {
errMsg := "Postgresql Storage GetDriver Error querying data from the database"
log.Error().Err(err).Msg(errMsg)
return driver, errors.New(errMsg + err.Error())
}
err = json.Unmarshal(preferencesJSON, &driver.Preferences)
if err != nil {
errMsg := "Postgresql Storage GetDriver Error decoding Preferences from JSON"
log.Error().Err(err).Msg(errMsg)
return driver, errors.New(errMsg + err.Error())
}
err = json.Unmarshal(carJSON, &driver.Car)
if err != nil {
errMsg := "Postgresql Storage GetDriver Error decoding Car from JSON"
log.Error().Err(err).Msg(errMsg)
return driver, errors.New(errMsg + err.Error())
}
driver.Driver_departure_address, err = geojson.UnmarshalFeature(departureAddress)
if err != nil {
errMsg := "Postgresql Storage GetDriver Error decoding Driver departure route into GeoJSON"
log.Error().Err(err).Msg(errMsg)
return driver, errors.New(errMsg + err.Error())
}
switch driver.AvailabilitiesType {
case internal.Regular:
err = json.Unmarshal(availabilitiesJSON, &driver.RegularAvailabilities)
if err != nil {
errMsg := "Postgresql Storage GetDriver Error decoding Regular Availabilities from JSON"
log.Error().Err(err).Msg(errMsg)
return driver, errors.New(errMsg + err.Error())
}
case internal.Punctual:
err = json.Unmarshal(availabilitiesJSON, &driver.PunctualAvailabilities)
if err != nil {
errMsg := "Postgresql Storage GetDriver Error decoding Punctual Availabilities from JSON"
log.Error().Err(err).Msg(errMsg)
return driver, errors.New(errMsg + err.Error())
}
default:
errMsg := "Postgresql Storage GetDriver Invalid Availabilities Type"
log.Error().Msg(errMsg)
return driver, errors.New(errMsg + err.Error())
}
return driver, nil
}
func (s PostgresqlStorage) CreateBooking(booking internal.BookingRequest) (err error) {
_, err = uuid.Parse(booking.Driver_id)
if err != nil {
log.Error().Err(err).Msg("Postgresql Storage CreateBooking invalid Driver ID")
return err
}
_, err = uuid.Parse(booking.Passenger_id)
if err != nil {
log.Error().Err(err).Msg("Postgresql Storage CreateBooking invalid Passenger ID")
return err
}
_, err = uuid.Parse(booking.ID)
if err != nil {
log.Error().Err(err).Msg("Postgresql Storage CreateBooking invalid Booking ID")
return err
}
if booking.Operator == "" {
log.Error().Err(err).Msg("Postgresql Storage CreateBooking empty operator")
return err
}
departureJSON, err := json.Marshal(booking.Departure_address)
if err != nil {
errMsg := "Postgresql Storage CreateBooking Error encoding departure Feature to JSON"
log.Error().Err(err).Msg(errMsg)
return errors.New(errMsg + err.Error())
}
destinationJSON, err := json.Marshal(booking.Destination_address)
if err != nil {
errMsg := "Postgresql Storage CreateBooking Error converting destination Feature to JSON"
log.Error().Err(err).Msg(errMsg)
return errors.New(errMsg + err.Error())
}
if booking.Pickup_date == 0 {
errMsg := "Postgresql Storage CreateBooking empty UNIX pickup Date timestamp"
log.Error().Err(err).Msg(errMsg)
return errors.New(errMsg)
}
booking.Distance = utils.Haversine(booking.Departure_address.Point().Lat(), booking.Departure_address.Point().Lon(),
booking.Destination_address.Point().Lat(), booking.Destination_address.Point().Lon())
booking.Duration, _ = utils.CalculateDurationBetweenFeatures(booking.Departure_address, booking.Destination_address)
_, err = s.DbConnection.Exec(fmt.Sprintf("INSERT INTO %s (booking_id , passenger_id , driver_id , operator, booking_status,departure_address,destination_address,pickup_date,duration,distance) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10)", s.Tables["bookings"]),
booking.ID,
booking.Passenger_id,
booking.Driver_id,
booking.Operator,
booking.Status,
departureJSON,
destinationJSON,
booking.Pickup_date,
booking.Duration,
booking.Distance,
)
if err != nil {
errMsg := "Postgresql Storage CreateBooking Error inserting data into the database"
log.Error().Err(err).Msg(errMsg)
return errors.New(errMsg + err.Error())
}
return nil
}
func (s PostgresqlStorage) GetBooking(id string) (booking internal.Booking, err error) {
var departureAddress []byte
var destinationAddress []byte
err = s.DbConnection.QueryRow(fmt.Sprintf("SELECT booking_id , passenger_id , driver_id , booking_status , departure_address,destination_address,pickup_date,duration,distance FROM %s WHERE booking_id = $1", s.Tables["bookings"]), id).
Scan(
&booking.ID,
&booking.Passenger.ID,
&booking.Driver.ID,
&booking.Status,
&departureAddress,
&destinationAddress,
&booking.Pickup_date,
&booking.Duration,
&booking.Distance,
)
if err != nil {
errMsg := "Postgresql Storage GetBooking Error getting booking"
log.Error().Err(err).Msg(errMsg)
return internal.Booking{}, errors.New(errMsg + err.Error())
}
booking.PassengerPickupAddress, err = geojson.UnmarshalFeature(departureAddress)
if err != nil {
errMsg := "Postgresql Storage GetBooking Error decoding Driver departure route into GeoJSON"
log.Error().Err(err).Msg(errMsg)
return internal.Booking{}, errors.New(errMsg + err.Error())
}
booking.PassengerDropAddress, err = geojson.UnmarshalFeature(destinationAddress)
if err != nil {
errMsg := "Postgresql Storage GetBooking Error decoding Driver destination route into GeoJSON"
log.Error().Err(err).Msg(errMsg)
return internal.Booking{}, errors.New(errMsg + err.Error())
}
passenger, err := s.GetPassenger(booking.Passenger.ID)
if err != nil {
errMsg := "Postgresql Storage GetBooking Error getting passenger"
log.Error().Err(err).Msg(errMsg)
return booking, errors.New(errMsg + err.Error())
}
booking.Passenger = passenger.Passenger
driver, err := s.GetDriver(booking.Driver.ID)
if err != nil {
errMsg := "Postgresql Storage GetBooking Error getting driver"
log.Error().Err(err).Msg(errMsg)
return booking, errors.New(errMsg + err.Error())
}
booking.Driver = driver.Driver
return booking, nil
}
func (s PostgresqlStorage) UpdateBookingStatus(id string, status internal.BookingStatus) (err error) {
_, err = uuid.Parse(id)
if err != nil {
log.Error().Err(err).Msg("Postgresql Storage UpdateBookingStatus invalid Booking ID")
return err
}
_, err = s.GetBooking(id)
if err != nil {
return errors.New(err.Error())
}
query := fmt.Sprintf("UPDATE %s SET booking_status = $1 WHERE booking_id = $2", s.Tables["bookings"])
_, err = s.DbConnection.Exec(query, status, id)
if err != nil {
errMsg := "Postgresql Storage UpdateBookingStatus Error updating booking status"
log.Error().Err(err).Msg(errMsg)
return errors.New(errMsg + err.Error())
}
return nil
}
func (s PostgresqlStorage) FilterUserBookingsByStatus(userType string, status internal.BookingStatus, userID string) (bookings []internal.Booking, err error) {
if err != nil {
return nil, errors.New("invalid UUID")
}
if userType != "driver" && userType != "passenger" {
return nil, errors.New("invalid user type")
}
queryTemplate := fmt.Sprintf("SELECT booking_id, passenger_id, driver_id, booking_status, departure_address, destination_address, pickup_date,duration,distance FROM %s WHERE %s_id = $1 AND booking_status = $2", s.Tables["bookings"], userType)
rows, err := s.DbConnection.Query(queryTemplate, userID, status)
if err != nil {
return nil, err
}
defer rows.Close()
var wg sync.WaitGroup
var mu sync.Mutex
for rows.Next() {
var departureAddress []byte
var destinationAddress []byte
var booking internal.Booking
if err := rows.Scan(&booking.ID, &booking.Passenger.ID, &booking.Driver.ID, &booking.Status, &departureAddress, &destinationAddress, &booking.Pickup_date, &booking.Duration, &booking.Distance); err != nil {
return nil, err
}
wg.Add(1)
go func(booking internal.Booking, departureAddress, destinationAddress []byte) {
defer wg.Done()
// Common logic for both "driver" and "passenger" cases
booking, err := s.populateBookingDetails(booking, departureAddress, destinationAddress)
if err != nil {
mu.Lock()
err = fmt.Errorf("Error populating booking details: %w", err)
mu.Unlock()
return
}
mu.Lock()
bookings = append(bookings, booking)
mu.Unlock()
}(booking, departureAddress, destinationAddress)
}
// Wait for all goroutines to finish
wg.Wait()
if err := rows.Err(); err != nil {
return nil, err
}
return bookings, nil
}
func (s PostgresqlStorage) populateBookingDetails(booking internal.Booking, departureAddress, destinationAddress []byte) (internal.Booking, error) {
var wg sync.WaitGroup
var mu sync.Mutex
errCh := make(chan error, 4) // Buffered channel to handle potential errors
// Concurrently fetch passenger information
wg.Add(1)
go func() {
defer wg.Done()
passenger, err := s.GetPassenger(booking.Passenger.ID)
if err != nil {
errCh <- fmt.Errorf("Postgresql Storage GetBooking Error getting passenger: %w", err)
return
}
mu.Lock()
booking.Passenger = passenger.Passenger
mu.Unlock()
}()
// Concurrently fetch driver information
wg.Add(1)
go func() {
defer wg.Done()
driver, err := s.GetDriver(booking.Driver.ID)
if err != nil {
errCh <- fmt.Errorf("Postgresql Storage GetBooking Error getting driver: %w", err)
return
}
mu.Lock()
booking.Driver = driver.Driver
mu.Unlock()
}()
// Concurrently decode departureAddress into GeoJSON
wg.Add(1)
go func() {
defer wg.Done()
decodedDepartureAddress, err := geojson.UnmarshalFeature(departureAddress)
if err != nil {
errCh <- fmt.Errorf("Postgresql Storage FilterBookingsByStatus Error decoding Driver departure route into GeoJSON: %w", err)
return
}
mu.Lock()
booking.PassengerPickupAddress = decodedDepartureAddress
mu.Unlock()
}()
// Concurrently decode destinationAddress into GeoJSON
wg.Add(1)
go func() {
defer wg.Done()
decodedDestinationAddress, err := geojson.UnmarshalFeature(destinationAddress)
if err != nil {
errCh <- fmt.Errorf("Postgresql Storage FilterBookingsByStatus Error decoding Driver destination route into GeoJSON: %w", err)
return
}
mu.Lock()
booking.PassengerDropAddress = decodedDestinationAddress
mu.Unlock()
}()
// Close the error channel after all goroutines are done
go func() {
wg.Wait()
close(errCh)
}()
// Check for errors using a select statement
for err := range errCh {
return internal.Booking{}, err // Return the first error encountered
}
return booking, nil
}
func (s PostgresqlStorage) GetAllDrivers(date int64) (drivers []internal.Driver, err error) {
rows, err := s.DbConnection.Query(fmt.Sprintf("SELECT driver_id, driver_departure_route, driver_radius, last_name, first_name, grade, alias, picture, verified_identity, preferences, availabilities_type, availabilities, operator, car FROM %s", s.Tables["drivers"]))
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var driver internal.Driver
var departureRoute []byte
var preferencesJSON []byte
var availabilitiesJSON []byte
var carJSON []byte
if err := rows.Scan(
&driver.Driver.ID,
&departureRoute,
&driver.Radius,
&driver.Driver.LastName,
&driver.Driver.FirstName,
&driver.Driver.Grade,
&driver.Driver.Alias,
&driver.Driver.Picture,
&driver.Driver.VerifiedIdentity,
&preferencesJSON,
&driver.AvailabilitiesType,
&availabilitiesJSON,
&driver.Driver.Operator,
&carJSON,
); err != nil {
return nil, err
}
if err := json.Unmarshal(departureRoute, &driver.Driver_departure_address); err != nil {
return nil, err
}
if err := json.Unmarshal(preferencesJSON, &driver.Preferences); err != nil {
return nil, err
}
if err := json.Unmarshal(carJSON, &driver.Car); err != nil {
return nil, err
}
// Filter drivers based on date and time matching punctual or regular availabilities.
if driver.AvailabilitiesType == internal.Punctual {
if err := json.Unmarshal(availabilitiesJSON, &driver.PunctualAvailabilities); err != nil {
return nil, err
}
year, month, day := time.Unix(date, 0).Date()
dateHour := time.Unix(date, 0).UTC().Local()
dateHour_string := dateHour.Format("3:04 PM")
for _, avail := range driver.PunctualAvailabilities {
availyear, availmonth, availday := time.Unix(avail.Date, 0).Date()
if availyear == year && availmonth == month && availday == day {
startTime, err := time.Parse("3:04 PM", avail.StartTime)
if err != nil {
return nil, err
}
endTime, err := time.Parse("3:04 PM", avail.EndTime)
if err != nil {
return nil, err
}
// Convert DateHour_string to time.Time
dateHourTime, err := time.Parse("3:04 PM", dateHour_string)
if err != nil {
return nil, err
}
if dateHourTime.After(startTime) || dateHourTime.Equal(startTime) {
if dateHourTime.Before(endTime) || dateHourTime.Equal(endTime) {
drivers = append(drivers, driver)
}
}
}
// Check if the date and time match the punctual driver's availability.
}
} else if driver.AvailabilitiesType == internal.Regular {
if err := json.Unmarshal(availabilitiesJSON, &driver.RegularAvailabilities); err != nil {
return nil, err
}
dayOfWeek := strings.ToLower(time.Unix(date, 0).Local().Format("Mon"))
dateHour := time.Unix(date, 0).UTC().Local()
dateHour_string := dateHour.Format("3:04 PM")
for _, avail := range driver.RegularAvailabilities {
// Check if the day and time match the regular driver's availability.
if strings.ToLower(avail.DayOfWeek) == dayOfWeek {
startTime, err := time.Parse("3:04 PM", avail.StartTime)
if err != nil {
return nil, err
}
endTime, err := time.Parse("3:04 PM", avail.EndTime)
if err != nil {
return nil, err
}
// Convert DateHour_string to time.Time
dateHourTime, err := time.Parse("3:04 PM", dateHour_string)
if err != nil {
return nil, err
}
if dateHourTime.After(startTime) || dateHourTime.Equal(startTime) {
if dateHourTime.Before(endTime) || dateHourTime.Equal(endTime) {
drivers = append(drivers, driver)
}
}
}
}
}
}
if err := rows.Err(); err != nil {
return nil, err
}
return drivers, nil
}
func (s PostgresqlStorage) DriverJourneys(departure_route *geojson.Feature, departure_date int64) (drivers []internal.Driver, err error) {
allDrivers, err := s.GetAllDrivers(departure_date)
if err != nil {
return nil, err
}
for _, driver := range allDrivers {
// Check if the departure route is the same as the driver's departure route.
if departure_route.Point().Y() == driver.Driver_departure_address.Point().Y() && departure_route.Point().X() == driver.Driver_departure_address.Point().X() {
drivers = append(drivers, driver)
} else {
// Calculate the distance between the departure point and the driver's departure route.
coords1 := departure_route.Geometry.(orb.Point)
coords2 := driver.Driver_departure_address.Geometry.(orb.Point)
distance := utils.Haversine(coords1[1], coords1[0], coords2[1], coords2[0])
if int64(distance) <= int64(driver.Radius) {
drivers = append(drivers, driver)
}
}
}
return drivers, nil
}