lot of new functionalities
This commit is contained in:
401
core/application/journeys.go
Executable file
401
core/application/journeys.go
Executable file
@@ -0,0 +1,401 @@
|
||||
package application
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"slices"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.coopgo.io/coopgo-apps/parcoursmob/core/utils/sorting"
|
||||
carpoolproto "git.coopgo.io/coopgo-platform/carpool-service/servers/grpc/proto"
|
||||
fleets "git.coopgo.io/coopgo-platform/fleets/grpcapi"
|
||||
fleetsstorage "git.coopgo.io/coopgo-platform/fleets/storage"
|
||||
mobilityaccountsstorage "git.coopgo.io/coopgo-platform/mobility-accounts/storage"
|
||||
"git.coopgo.io/coopgo-platform/multimodal-routing/libs/transit/transitous"
|
||||
savedsearchtypes "git.coopgo.io/coopgo-platform/saved-search/data/types"
|
||||
savedsearchproto "git.coopgo.io/coopgo-platform/saved-search/servers/grpc/proto/gen"
|
||||
savedsearchtransformers "git.coopgo.io/coopgo-platform/saved-search/servers/grpc/transformers"
|
||||
"git.coopgo.io/coopgo-platform/solidarity-transport/servers/grpc/proto/gen"
|
||||
"git.coopgo.io/coopgo-platform/solidarity-transport/servers/grpc/transformers"
|
||||
"github.com/paulmach/orb/geojson"
|
||||
"github.com/rs/zerolog/log"
|
||||
|
||||
"google.golang.org/protobuf/types/known/structpb"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
)
|
||||
|
||||
type SearchJourneysResult struct {
|
||||
CarpoolResults []*geojson.FeatureCollection
|
||||
TransitResults []*transitous.Itinerary
|
||||
VehicleResults []fleetsstorage.Vehicle
|
||||
Searched bool
|
||||
DriverJourneys []*gen.SolidarityTransportDriverJourney
|
||||
Drivers map[string]mobilityaccountsstorage.Account
|
||||
OrganizedCarpools []*carpoolproto.CarpoolServiceDriverJourney
|
||||
KnowledgeBaseResults []any
|
||||
}
|
||||
|
||||
// SearchJourneys performs the business logic for journey search
|
||||
func (h *ApplicationHandler) SearchJourneys(
|
||||
ctx context.Context,
|
||||
departureDateTime time.Time,
|
||||
departureGeo *geojson.Feature,
|
||||
destinationGeo *geojson.Feature,
|
||||
passengerID string,
|
||||
solidarityTransportExcludeDriver string,
|
||||
) (*SearchJourneysResult, error) {
|
||||
var (
|
||||
// Results
|
||||
transitResults []*transitous.Itinerary
|
||||
carpoolResults []*geojson.FeatureCollection
|
||||
vehicleResults []fleetsstorage.Vehicle
|
||||
solidarityTransportResults []*gen.SolidarityTransportDriverJourney
|
||||
organizedCarpoolResults []*carpoolproto.CarpoolServiceDriverJourney
|
||||
knowledgeBaseResults []any
|
||||
|
||||
drivers = map[string]mobilityaccountsstorage.Account{}
|
||||
searched = false
|
||||
)
|
||||
|
||||
// Only search if we have complete departure and destination info
|
||||
if departureGeo != nil && destinationGeo != nil && !departureDateTime.IsZero() {
|
||||
searched = true
|
||||
|
||||
// SOLIDARITY TRANSPORT
|
||||
var err error
|
||||
drivers, err = h.services.GetAccountsInNamespacesMap([]string{"solidarity_drivers", "organized_carpool_drivers"})
|
||||
if err != nil {
|
||||
drivers = map[string]mobilityaccountsstorage.Account{}
|
||||
}
|
||||
|
||||
protodep, _ := transformers.GeoJsonToProto(departureGeo)
|
||||
protodest, _ := transformers.GeoJsonToProto(destinationGeo)
|
||||
|
||||
log.Debug().Time("departure time", departureDateTime).Msg("calling driver journeys with ...")
|
||||
|
||||
res, err := h.services.GRPC.SolidarityTransport.GetDriverJourneys(ctx, &gen.GetDriverJourneysRequest{
|
||||
Departure: protodep,
|
||||
Arrival: protodest,
|
||||
DepartureDate: timestamppb.New(departureDateTime),
|
||||
})
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("error in grpc call to GetDriverJourneys")
|
||||
} else {
|
||||
solidarityTransportResults = slices.Collect(func(yield func(*gen.SolidarityTransportDriverJourney) bool) {
|
||||
for _, dj := range res.DriverJourneys {
|
||||
if a, ok := drivers[dj.DriverId].Data["archived"]; ok {
|
||||
if archived, ok := a.(bool); ok {
|
||||
if archived {
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
if dj.DriverId == solidarityTransportExcludeDriver {
|
||||
continue
|
||||
}
|
||||
if !yield(dj) {
|
||||
return
|
||||
}
|
||||
}
|
||||
})
|
||||
sort.Slice(solidarityTransportResults, func(i, j int) bool {
|
||||
return solidarityTransportResults[i].DriverDistance < solidarityTransportResults[j].DriverDistance
|
||||
})
|
||||
}
|
||||
|
||||
// Get departure and destination addresses from properties
|
||||
var departureAddress, destinationAddress string
|
||||
if departureGeo.Properties != nil {
|
||||
if label, ok := departureGeo.Properties["label"].(string); ok {
|
||||
departureAddress = label
|
||||
}
|
||||
}
|
||||
if destinationGeo.Properties != nil {
|
||||
if label, ok := destinationGeo.Properties["label"].(string); ok {
|
||||
destinationAddress = label
|
||||
}
|
||||
}
|
||||
|
||||
radius := float64(5)
|
||||
// ORGANIZED CARPOOL
|
||||
organizedCarpoolResultsRes, err := h.services.GRPC.CarpoolService.DriverJourneys(ctx, &carpoolproto.DriverJourneysRequest{
|
||||
DepartureLat: departureGeo.Point().Lat(),
|
||||
DepartureLng: departureGeo.Point().Lon(),
|
||||
ArrivalLat: destinationGeo.Point().Lat(),
|
||||
ArrivalLng: destinationGeo.Point().Lon(),
|
||||
DepartureDate: timestamppb.New(departureDateTime),
|
||||
DepartureAddress: &departureAddress,
|
||||
ArrivalAddress: &destinationAddress,
|
||||
DepartureRadius: &radius,
|
||||
ArrivalRadius: &radius,
|
||||
})
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("error retrieving organized carpools")
|
||||
} else {
|
||||
organizedCarpoolResults = organizedCarpoolResultsRes.DriverJourneys
|
||||
sort.Slice(organizedCarpoolResults, func(i, j int) bool {
|
||||
return *organizedCarpoolResults[i].Distance < *organizedCarpoolResults[j].Distance
|
||||
})
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
// CARPOOL OPERATORS
|
||||
carpools := make(chan *geojson.FeatureCollection)
|
||||
go h.services.InteropCarpool.Search(carpools, *departureGeo, *destinationGeo, departureDateTime)
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for c := range carpools {
|
||||
carpoolResults = append(carpoolResults, c)
|
||||
}
|
||||
}()
|
||||
|
||||
// TRANSIT
|
||||
transitch := make(chan *transitous.Itinerary)
|
||||
go func(transitch chan *transitous.Itinerary, departure *geojson.Feature, destination *geojson.Feature, datetime *time.Time) {
|
||||
defer close(transitch)
|
||||
response, err := h.services.TransitRouting.PlanWithResponse(ctx, &transitous.PlanParams{
|
||||
FromPlace: fmt.Sprintf("%f,%f", departure.Point().Lat(), departure.Point().Lon()),
|
||||
ToPlace: fmt.Sprintf("%f,%f", destination.Point().Lat(), destination.Point().Lon()),
|
||||
Time: datetime,
|
||||
})
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("error retrieving transit data from Transitous server")
|
||||
return
|
||||
}
|
||||
for _, i := range response.Itineraries {
|
||||
transitch <- &i
|
||||
}
|
||||
}(transitch, departureGeo, destinationGeo, &departureDateTime)
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
paris, _ := time.LoadLocation("Europe/Paris")
|
||||
requestedDay := departureDateTime.In(paris).Truncate(24 * time.Hour)
|
||||
|
||||
for itinerary := range transitch {
|
||||
// Only include journeys that start on the requested day (in Paris timezone)
|
||||
if !itinerary.StartTime.IsZero() && !itinerary.EndTime.IsZero() {
|
||||
log.Info().
|
||||
Time("startTime", itinerary.StartTime).
|
||||
Time("endTime", itinerary.EndTime).
|
||||
Str("startTimezone", itinerary.StartTime.Location().String()).
|
||||
Str("endTimezone", itinerary.EndTime.Location().String()).
|
||||
Str("startTimeRFC3339", itinerary.StartTime.Format(time.RFC3339)).
|
||||
Str("endTimeRFC3339", itinerary.EndTime.Format(time.RFC3339)).
|
||||
Msg("Journey search - received transit itinerary from Transitous")
|
||||
|
||||
startInParis := itinerary.StartTime.In(paris)
|
||||
startDay := startInParis.Truncate(24 * time.Hour)
|
||||
|
||||
// Check if journey starts on the requested day
|
||||
if startDay.Equal(requestedDay) {
|
||||
transitResults = append(transitResults, itinerary)
|
||||
} else {
|
||||
log.Info().
|
||||
Str("requestedDay", requestedDay.Format("2006-01-02")).
|
||||
Str("startDay", startDay.Format("2006-01-02")).
|
||||
Msg("Journey search - filtered out transit journey (not on requested day)")
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// VEHICLES
|
||||
vehiclech := make(chan fleetsstorage.Vehicle)
|
||||
go h.vehicleRequest(vehiclech, departureDateTime.Add(-24*time.Hour), departureDateTime.Add(168*time.Hour))
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for vehicle := range vehiclech {
|
||||
vehicleResults = append(vehicleResults, vehicle)
|
||||
}
|
||||
slices.SortFunc(vehicleResults, sorting.VehiclesByDistanceFrom(*departureGeo))
|
||||
}()
|
||||
wg.Wait()
|
||||
|
||||
// KNOWLEDGE BASE
|
||||
departureGeoSearch, _ := h.services.Geography.GeoSearch(departureGeo)
|
||||
kbData := h.config.Get("knowledge_base")
|
||||
if kb, ok := kbData.([]any); ok {
|
||||
for _, sol := range kb {
|
||||
if solution, ok := sol.(map[string]any); ok {
|
||||
if g, ok := solution["geography"]; ok {
|
||||
if geography, ok := g.([]any); ok {
|
||||
for _, gg := range geography {
|
||||
if geog, ok := gg.(map[string]any); ok {
|
||||
if layer, ok := geog["layer"].(string); ok {
|
||||
code := geog["code"]
|
||||
geo, err := h.services.Geography.Find(layer, fmt.Sprintf("%v", code))
|
||||
if err == nil {
|
||||
geog["geography"] = geo
|
||||
geog["name"] = geo.Properties.MustString("nom")
|
||||
}
|
||||
if strings.Compare(fmt.Sprintf("%v", code), departureGeoSearch[layer].Properties.MustString("code")) == 0 {
|
||||
knowledgeBaseResults = append(knowledgeBaseResults, solution)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return &SearchJourneysResult{
|
||||
CarpoolResults: carpoolResults,
|
||||
TransitResults: transitResults,
|
||||
VehicleResults: vehicleResults,
|
||||
Searched: searched,
|
||||
DriverJourneys: solidarityTransportResults,
|
||||
Drivers: drivers,
|
||||
OrganizedCarpools: organizedCarpoolResults,
|
||||
KnowledgeBaseResults: knowledgeBaseResults,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (h *ApplicationHandler) vehicleRequest(vehiclech chan fleetsstorage.Vehicle, start time.Time, end time.Time) {
|
||||
defer close(vehiclech)
|
||||
vehiclerequest := &fleets.GetVehiclesRequest{
|
||||
Namespaces: []string{"parcoursmob"},
|
||||
}
|
||||
vehicleresp, err := h.services.GRPC.Fleets.GetVehicles(context.TODO(), vehiclerequest)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("")
|
||||
return
|
||||
}
|
||||
for _, vehicle := range vehicleresp.Vehicles {
|
||||
v := vehicle.ToStorageType()
|
||||
if v.Free(start, end) {
|
||||
vehiclech <- v
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// SaveSearch saves a group's search to the saved-search microservice
|
||||
func (h *ApplicationHandler) SaveSearch(
|
||||
ctx context.Context,
|
||||
groupID string,
|
||||
departureDateTime time.Time,
|
||||
departureGeo *geojson.Feature,
|
||||
destinationGeo *geojson.Feature,
|
||||
additionalData map[string]interface{},
|
||||
) error {
|
||||
// Convert geojson.Feature to proto format
|
||||
var protoDepart, protoDest *savedsearchproto.SavedSearchGeoJsonFeature
|
||||
|
||||
log.Debug().
|
||||
Bool("departure_nil", departureGeo == nil).
|
||||
Bool("destination_nil", destinationGeo == nil).
|
||||
Msg("SaveSearch: checking geo features")
|
||||
|
||||
if departureGeo != nil {
|
||||
departureBytes, err := departureGeo.MarshalJSON()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error marshaling departure: %w", err)
|
||||
}
|
||||
protoDepart = &savedsearchproto.SavedSearchGeoJsonFeature{
|
||||
Serialized: string(departureBytes),
|
||||
}
|
||||
log.Debug().Str("departure_json", string(departureBytes)).Msg("SaveSearch: departure converted")
|
||||
}
|
||||
|
||||
if destinationGeo != nil {
|
||||
destinationBytes, err := destinationGeo.MarshalJSON()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error marshaling destination: %w", err)
|
||||
}
|
||||
protoDest = &savedsearchproto.SavedSearchGeoJsonFeature{
|
||||
Serialized: string(destinationBytes),
|
||||
}
|
||||
log.Debug().Str("destination_json", string(destinationBytes)).Msg("SaveSearch: destination converted")
|
||||
}
|
||||
|
||||
// Convert additional data to protobuf Struct
|
||||
var protoData *structpb.Struct
|
||||
if additionalData != nil && len(additionalData) > 0 {
|
||||
var err error
|
||||
protoData, err = structpb.NewStruct(additionalData)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error converting additional data: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Handle zero time value
|
||||
var protoDateTime *timestamppb.Timestamp
|
||||
if !departureDateTime.IsZero() {
|
||||
protoDateTime = timestamppb.New(departureDateTime)
|
||||
}
|
||||
|
||||
// Call the saved-search service
|
||||
_, err := h.services.GRPC.SavedSearch.CreateSavedSearch(ctx, &savedsearchproto.CreateSavedSearchRequest{
|
||||
OwnerId: groupID,
|
||||
Departure: protoDepart,
|
||||
Destination: protoDest,
|
||||
Datetime: protoDateTime,
|
||||
Data: protoData,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("error calling saved-search service: %w", err)
|
||||
}
|
||||
|
||||
log.Info().Str("group_id", groupID).Msg("search saved successfully")
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetSavedSearchesByOwner retrieves saved searches for a group
|
||||
func (h *ApplicationHandler) GetSavedSearchesByOwner(
|
||||
ctx context.Context,
|
||||
groupID string,
|
||||
) ([]*savedsearchtypes.SavedSearch, error) {
|
||||
// Call the saved-search service to get searches by owner
|
||||
response, err := h.services.GRPC.SavedSearch.GetSavedSearchesByOwner(ctx, &savedsearchproto.GetSavedSearchesByOwnerRequest{
|
||||
OwnerId: groupID,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error calling saved-search service: %w", err)
|
||||
}
|
||||
|
||||
// Convert protobuf searches to domain types
|
||||
var searches []*savedsearchtypes.SavedSearch
|
||||
for _, protoSearch := range response.SavedSearches {
|
||||
search, err := savedsearchtransformers.SavedSearchProtoToType(protoSearch)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Str("search_id", protoSearch.Id).Msg("failed to convert saved search")
|
||||
continue
|
||||
}
|
||||
searches = append(searches, search)
|
||||
}
|
||||
|
||||
// Sort searches by datetime (earliest first)
|
||||
sort.Slice(searches, func(i, j int) bool {
|
||||
return searches[i].DateTime.Before(searches[j].DateTime)
|
||||
})
|
||||
|
||||
return searches, nil
|
||||
}
|
||||
|
||||
// DeleteSavedSearch deletes a saved search by ID for the specified owner
|
||||
func (h *ApplicationHandler) DeleteSavedSearch(
|
||||
ctx context.Context,
|
||||
searchID string,
|
||||
ownerID string,
|
||||
) error {
|
||||
// Call the saved-search service to delete the search
|
||||
_, err := h.services.GRPC.SavedSearch.DeleteSavedSearch(ctx, &savedsearchproto.DeleteSavedSearchRequest{
|
||||
Id: searchID,
|
||||
OwnerId: ownerID, // For authorization - ensure only the owner can delete
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("error calling saved-search service: %w", err)
|
||||
}
|
||||
|
||||
log.Info().Str("search_id", searchID).Str("owner_id", ownerID).Msg("saved search deleted successfully")
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user