diff --git a/src/app.constants.ts b/src/app.constants.ts index ff407a4..c4fa1ac 100644 --- a/src/app.constants.ts +++ b/src/app.constants.ts @@ -14,13 +14,9 @@ export const AD_UPDATED_QUEUE = 'matcher-ad-updated'; export const AD_DELETED_MESSAGE_HANDLER = 'adDeleted'; export const AD_DELETED_ROUTING_KEY = 'ad.deleted'; export const AD_DELETED_QUEUE = 'matcher-ad-deleted'; - -// configuration -export const SERVICE_CONFIGURATION_SET_QUEUE = 'matcher-configuration-set'; -export const SERVICE_CONFIGURATION_DELETE_QUEUE = - 'matcher-configuration-delete'; -export const SERVICE_CONFIGURATION_PROPAGATE_QUEUE = - 'matcher-configuration-propagate'; +export const MATCHER_AD_CREATED_ROUTING_KEY = 'matcher.ad.created'; +export const MATCHER_AD_CREATION_FAILED_ROUTING_KEY = + 'matcher.ad.creation.failed'; // health export const GRPC_HEALTH_PACKAGE_NAME = 'health'; diff --git a/src/modules/ad/ad.module.ts b/src/modules/ad/ad.module.ts index 7d98ae0..c1a6b37 100644 --- a/src/modules/ad/ad.module.ts +++ b/src/modules/ad/ad.module.ts @@ -43,6 +43,7 @@ import { RedisModuleOptions, } from '@songkeys/nestjs-redis'; import { ConfigurationRepository } from '@mobicoop/configuration-module'; +import { PublishMessageWhenMatcherAdIsCreatedDomainEventHandler } from './core/application/event-handlers/publish-message-when-matcher-ad-is-created.domain-event-handler'; const imports = [ CqrsModule, @@ -80,6 +81,10 @@ const grpcControllers = [MatchGrpcController]; const messageHandlers = [AdCreatedMessageHandler]; +const eventHandlers: Provider[] = [ + PublishMessageWhenMatcherAdIsCreatedDomainEventHandler, +]; + const commandHandlers: Provider[] = [CreateAdService]; const queryHandlers: Provider[] = [MatchQueryHandler]; @@ -150,6 +155,7 @@ const adapters: Provider[] = [ controllers: [...grpcControllers], providers: [ ...messageHandlers, + ...eventHandlers, ...commandHandlers, ...queryHandlers, ...mappers, diff --git a/src/modules/ad/core/application/commands/create-ad/create-ad.service.ts b/src/modules/ad/core/application/commands/create-ad/create-ad.service.ts index 92c2e72..6b061e9 100644 --- a/src/modules/ad/core/application/commands/create-ad/create-ad.service.ts +++ b/src/modules/ad/core/application/commands/create-ad/create-ad.service.ts @@ -1,10 +1,18 @@ import { CommandHandler, ICommandHandler } from '@nestjs/cqrs'; import { CreateAdCommand } from './create-ad.command'; import { Inject } from '@nestjs/common'; -import { AD_REPOSITORY, AD_ROUTE_PROVIDER } from '@modules/ad/ad.di-tokens'; +import { + AD_MESSAGE_PUBLISHER, + AD_REPOSITORY, + AD_ROUTE_PROVIDER, +} from '@modules/ad/ad.di-tokens'; import { AdEntity } from '@modules/ad/core/domain/ad.entity'; import { AdRepositoryPort } from '../../ports/ad.repository.port'; -import { AggregateID, ConflictException } from '@mobicoop/ddd-library'; +import { + AggregateID, + ConflictException, + MessagePublisherPort, +} from '@mobicoop/ddd-library'; import { AdAlreadyExistsException } from '@modules/ad/core/domain/ad.errors'; import { RouteProviderPort } from '../../ports/route-provider.port'; import { Role } from '@modules/ad/core/domain/ad.types'; @@ -17,10 +25,14 @@ import { import { Waypoint } from '../../types/waypoint.type'; import { Point as PointValueObject } from '@modules/ad/core/domain/value-objects/point.value-object'; import { Point } from '@modules/geography/core/domain/route.types'; +import { MatcherAdCreationFailedIntegrationEvent } from '../../events/matcher-ad-creation-failed.integration-event'; +import { MATCHER_AD_CREATION_FAILED_ROUTING_KEY } from '@src/app.constants'; @CommandHandler(CreateAdCommand) export class CreateAdService implements ICommandHandler { constructor( + @Inject(AD_MESSAGE_PUBLISHER) + private readonly messagePublisher: MessagePublisherPort, @Inject(AD_REPOSITORY) private readonly repository: AdRepositoryPort, @Inject(AD_ROUTE_PROVIDER) @@ -44,17 +56,6 @@ export class CreateAdService implements ICommandHandler { ); let typedRoutes: TypedRoute[]; - try { - typedRoutes = await Promise.all( - pathCreator.getBasePaths().map(async (path: Path) => ({ - type: path.type, - route: await this.routeProvider.getBasic(path.waypoints), - })), - ); - } catch (e: any) { - throw new Error('Unable to find a route for given waypoints'); - } - let driverDistance: number | undefined; let driverDuration: number | undefined; let passengerDistance: number | undefined; @@ -62,25 +63,24 @@ export class CreateAdService implements ICommandHandler { let points: PointValueObject[] | undefined; let fwdAzimuth: number | undefined; let backAzimuth: number | undefined; + try { - typedRoutes.forEach((typedRoute: TypedRoute) => { - if ([PathType.DRIVER, PathType.GENERIC].includes(typedRoute.type)) { - driverDistance = typedRoute.route.distance; - driverDuration = typedRoute.route.duration; - points = typedRoute.route.points.map( - (point: Point) => - new PointValueObject({ - lon: point.lon, - lat: point.lat, - }), - ); - fwdAzimuth = typedRoute.route.fwdAzimuth; - backAzimuth = typedRoute.route.backAzimuth; - } - if ([PathType.PASSENGER, PathType.GENERIC].includes(typedRoute.type)) { - passengerDistance = typedRoute.route.distance; - passengerDuration = typedRoute.route.duration; - if (!points) + try { + typedRoutes = await Promise.all( + pathCreator.getBasePaths().map(async (path: Path) => ({ + type: path.type, + route: await this.routeProvider.getBasic(path.waypoints), + })), + ); + } catch (e: any) { + throw new Error('Unable to find a route for given waypoints'); + } + + try { + typedRoutes.forEach((typedRoute: TypedRoute) => { + if ([PathType.DRIVER, PathType.GENERIC].includes(typedRoute.type)) { + driverDistance = typedRoute.route.distance; + driverDuration = typedRoute.route.duration; points = typedRoute.route.points.map( (point: Point) => new PointValueObject({ @@ -88,40 +88,74 @@ export class CreateAdService implements ICommandHandler { lat: point.lat, }), ); - if (!fwdAzimuth) fwdAzimuth = typedRoute.route.fwdAzimuth; - if (!backAzimuth) backAzimuth = typedRoute.route.backAzimuth; - } - }); - } catch (error: any) { - throw new Error('Invalid route'); - } - const ad = AdEntity.create({ - id: command.id, - driver: command.driver, - passenger: command.passenger, - frequency: command.frequency, - fromDate: command.fromDate, - toDate: command.toDate, - schedule: command.schedule, - seatsProposed: command.seatsProposed, - seatsRequested: command.seatsRequested, - strict: command.strict, - waypoints: command.waypoints, - points: points as PointValueObject[], - driverDistance, - driverDuration, - passengerDistance, - passengerDuration, - fwdAzimuth: fwdAzimuth as number, - backAzimuth: backAzimuth as number, - }); - try { - await this.repository.insertExtra(ad, 'ad'); - return ad.id; - } catch (error: any) { - if (error instanceof ConflictException) { - throw new AdAlreadyExistsException(error); + fwdAzimuth = typedRoute.route.fwdAzimuth; + backAzimuth = typedRoute.route.backAzimuth; + } + if ( + [PathType.PASSENGER, PathType.GENERIC].includes(typedRoute.type) + ) { + passengerDistance = typedRoute.route.distance; + passengerDuration = typedRoute.route.duration; + if (!points) + points = typedRoute.route.points.map( + (point: Point) => + new PointValueObject({ + lon: point.lon, + lat: point.lat, + }), + ); + if (!fwdAzimuth) fwdAzimuth = typedRoute.route.fwdAzimuth; + if (!backAzimuth) backAzimuth = typedRoute.route.backAzimuth; + } + }); + } catch (error: any) { + throw new Error('Invalid route'); } + + const ad = AdEntity.create({ + id: command.id, + driver: command.driver, + passenger: command.passenger, + frequency: command.frequency, + fromDate: command.fromDate, + toDate: command.toDate, + schedule: command.schedule, + seatsProposed: command.seatsProposed, + seatsRequested: command.seatsRequested, + strict: command.strict, + waypoints: command.waypoints, + points: points as PointValueObject[], + driverDistance, + driverDuration, + passengerDistance, + passengerDuration, + fwdAzimuth: fwdAzimuth as number, + backAzimuth: backAzimuth as number, + }); + + try { + await this.repository.insertExtra(ad, 'ad'); + return ad.id; + } catch (error: any) { + if (error instanceof ConflictException) { + throw new AdAlreadyExistsException(error); + } + throw error; + } + } catch (error: any) { + const matcherAdCreationFailedIntegrationEvent = + new MatcherAdCreationFailedIntegrationEvent({ + id: command.id, + metadata: { + correlationId: command.id, + timestamp: Date.now(), + }, + cause: error.message, + }); + this.messagePublisher.publish( + MATCHER_AD_CREATION_FAILED_ROUTING_KEY, + JSON.stringify(matcherAdCreationFailedIntegrationEvent), + ); throw error; } } diff --git a/src/modules/ad/core/application/event-handlers/publish-message-when-matcher-ad-is-created.domain-event-handler.ts b/src/modules/ad/core/application/event-handlers/publish-message-when-matcher-ad-is-created.domain-event-handler.ts new file mode 100644 index 0000000..9bc92d8 --- /dev/null +++ b/src/modules/ad/core/application/event-handlers/publish-message-when-matcher-ad-is-created.domain-event-handler.ts @@ -0,0 +1,34 @@ +import { Inject, Injectable } from '@nestjs/common'; +import { OnEvent } from '@nestjs/event-emitter'; +import { MatcherAdCreatedDomainEvent } from '../../domain/events/matcher-ad-created.domain-event'; +import { MessagePublisherPort } from '@mobicoop/ddd-library'; +import { AD_MESSAGE_PUBLISHER } from '@modules/ad/ad.di-tokens'; +import { MATCHER_AD_CREATED_ROUTING_KEY } from '@src/app.constants'; +import { MatcherAdCreatedIntegrationEvent } from '../events/matcher-ad-created.integration-event'; + +@Injectable() +export class PublishMessageWhenMatcherAdIsCreatedDomainEventHandler { + constructor( + @Inject(AD_MESSAGE_PUBLISHER) + private readonly messagePublisher: MessagePublisherPort, + ) {} + + @OnEvent(MatcherAdCreatedDomainEvent.name, { async: true, promisify: true }) + async handle(event: MatcherAdCreatedDomainEvent): Promise { + const matcherAdCreatedIntegrationEvent = + new MatcherAdCreatedIntegrationEvent({ + id: event.aggregateId, + driverDuration: event.driverDuration, + driverDistance: event.driverDistance, + passengerDuration: event.passengerDuration, + passengerDistance: event.passengerDistance, + fwdAzimuth: event.fwdAzimuth, + backAzimuth: event.backAzimuth, + metadata: event.metadata, + }); + this.messagePublisher.publish( + MATCHER_AD_CREATED_ROUTING_KEY, + JSON.stringify(matcherAdCreatedIntegrationEvent), + ); + } +} diff --git a/src/modules/ad/core/application/events/matcher-ad-created.integration-event.ts b/src/modules/ad/core/application/events/matcher-ad-created.integration-event.ts new file mode 100644 index 0000000..9db4135 --- /dev/null +++ b/src/modules/ad/core/application/events/matcher-ad-created.integration-event.ts @@ -0,0 +1,20 @@ +import { IntegrationEvent, IntegrationEventProps } from '@mobicoop/ddd-library'; + +export class MatcherAdCreatedIntegrationEvent extends IntegrationEvent { + readonly driverDuration?: number; + readonly driverDistance?: number; + readonly passengerDuration?: number; + readonly passengerDistance?: number; + readonly fwdAzimuth: number; + readonly backAzimuth: number; + + constructor(props: IntegrationEventProps) { + super(props); + this.driverDuration = props.driverDuration; + this.driverDistance = props.driverDistance; + this.passengerDuration = props.passengerDuration; + this.passengerDistance = props.passengerDistance; + this.fwdAzimuth = props.fwdAzimuth; + this.backAzimuth = props.backAzimuth; + } +} diff --git a/src/modules/ad/core/application/events/matcher-ad-creation-failed.integration-event.ts b/src/modules/ad/core/application/events/matcher-ad-creation-failed.integration-event.ts new file mode 100644 index 0000000..c932c6f --- /dev/null +++ b/src/modules/ad/core/application/events/matcher-ad-creation-failed.integration-event.ts @@ -0,0 +1,12 @@ +import { IntegrationEvent, IntegrationEventProps } from '@mobicoop/ddd-library'; + +export class MatcherAdCreationFailedIntegrationEvent extends IntegrationEvent { + readonly cause?: string; + + constructor( + props: IntegrationEventProps, + ) { + super(props); + this.cause = props.cause; + } +} diff --git a/src/modules/ad/core/application/queries/match/algorithm.abstract.ts b/src/modules/ad/core/application/queries/match/algorithm.abstract.ts index ac902fc..41c9ba3 100644 --- a/src/modules/ad/core/application/queries/match/algorithm.abstract.ts +++ b/src/modules/ad/core/application/queries/match/algorithm.abstract.ts @@ -21,7 +21,6 @@ export abstract class Algorithm { for (const processor of this.processors) { this.candidates = await processor.execute(this.candidates); } - // console.log(JSON.stringify(this.candidates, null, 2)); return this.candidates.map((candidate: CandidateEntity) => MatchEntity.create({ adId: candidate.id, diff --git a/src/modules/ad/core/application/queries/match/selector/passenger-oriented.selector.ts b/src/modules/ad/core/application/queries/match/selector/passenger-oriented.selector.ts index 013dd7e..92f6b7e 100644 --- a/src/modules/ad/core/application/queries/match/selector/passenger-oriented.selector.ts +++ b/src/modules/ad/core/application/queries/match/selector/passenger-oriented.selector.ts @@ -1,4 +1,4 @@ -import { Frequency, Role } from '@modules/ad/core/domain/ad.types'; +import { AdStatus, Frequency, Role } from '@modules/ad/core/domain/ad.types'; import { Selector } from '../algorithm.abstract'; import { Waypoint } from '../../../types/waypoint.type'; import { Point } from '../../../types/point.type'; @@ -133,6 +133,7 @@ export class PassengerOrientedSelector extends Selector { private _createWhere = (role: Role): string => [ + this._whereStatus(), this._whereRole(role), this._whereStrict(), this._whereDate(), @@ -144,6 +145,8 @@ export class PassengerOrientedSelector extends Selector { .filter((where: string) => where != '') .join(' AND '); + private _whereStatus = (): string => `status='${AdStatus.VALID}'`; + private _whereRole = (role: Role): string => role == Role.PASSENGER ? 'driver=True' : 'passenger=True'; @@ -174,7 +177,7 @@ export class PassengerOrientedSelector extends Selector { private _whereSchedule = (role: Role): string => { const schedule: string[] = []; // we need full dates to compare times, because margins can lead to compare on previous or next day - // -first we establish a base calendar (up to a week) + // - first we establish a base calendar (up to a week) const scheduleDates: Date[] = this._datesBetweenBoundaries( this.query.fromDate, this.query.toDate, diff --git a/src/modules/ad/core/domain/ad.entity.ts b/src/modules/ad/core/domain/ad.entity.ts index 011f7c7..d8a6c37 100644 --- a/src/modules/ad/core/domain/ad.entity.ts +++ b/src/modules/ad/core/domain/ad.entity.ts @@ -1,12 +1,29 @@ import { AggregateRoot, AggregateID } from '@mobicoop/ddd-library'; import { AdProps, CreateAdProps } from './ad.types'; +import { MatcherAdCreatedDomainEvent } from './events/matcher-ad-created.domain-event'; export class AdEntity extends AggregateRoot { protected readonly _id: AggregateID; static create = (create: CreateAdProps): AdEntity => { const props: AdProps = { ...create }; - return new AdEntity({ id: create.id, props }); + const ad = new AdEntity({ id: create.id, props }); + ad.addEvent( + new MatcherAdCreatedDomainEvent({ + metadata: { + correlationId: create.id, + timestamp: Date.now(), + }, + aggregateId: create.id, + driverDistance: create.driverDistance, + driverDuration: create.driverDuration, + passengerDistance: create.passengerDistance, + passengerDuration: create.passengerDuration, + fwdAzimuth: create.fwdAzimuth, + backAzimuth: create.backAzimuth, + }), + ); + return ad; }; validate(): void { diff --git a/src/modules/ad/core/domain/ad.types.ts b/src/modules/ad/core/domain/ad.types.ts index 10f906b..391ad42 100644 --- a/src/modules/ad/core/domain/ad.types.ts +++ b/src/modules/ad/core/domain/ad.types.ts @@ -53,3 +53,10 @@ export enum Role { DRIVER = 'DRIVER', PASSENGER = 'PASSENGER', } + +export enum AdStatus { + PENDING = 'PENDING', + VALID = 'VALID', + INVALID = 'INVALID', + SUSPENDED = 'SUSPENDED', +} diff --git a/src/modules/ad/core/domain/events/matcher-ad-created.domain-event.ts b/src/modules/ad/core/domain/events/matcher-ad-created.domain-event.ts new file mode 100644 index 0000000..68acd3b --- /dev/null +++ b/src/modules/ad/core/domain/events/matcher-ad-created.domain-event.ts @@ -0,0 +1,20 @@ +import { DomainEvent, DomainEventProps } from '@mobicoop/ddd-library'; + +export class MatcherAdCreatedDomainEvent extends DomainEvent { + readonly driverDuration?: number; + readonly driverDistance?: number; + readonly passengerDuration?: number; + readonly passengerDistance?: number; + readonly fwdAzimuth: number; + readonly backAzimuth: number; + + constructor(props: DomainEventProps) { + super(props); + this.driverDuration = props.driverDuration; + this.driverDistance = props.driverDistance; + this.passengerDuration = props.passengerDuration; + this.passengerDistance = props.passengerDistance; + this.fwdAzimuth = props.fwdAzimuth; + this.backAzimuth = props.backAzimuth; + } +} diff --git a/src/modules/ad/interface/message-handlers/ad-created.message-handler.ts b/src/modules/ad/interface/message-handlers/ad-created.message-handler.ts index b8447b7..16006b4 100644 --- a/src/modules/ad/interface/message-handlers/ad-created.message-handler.ts +++ b/src/modules/ad/interface/message-handlers/ad-created.message-handler.ts @@ -13,25 +13,21 @@ export class AdCreatedMessageHandler { name: AD_CREATED_MESSAGE_HANDLER, }) public async adCreated(message: string) { - try { - const createdAd: Ad = JSON.parse(message); - await this.commandBus.execute( - new CreateAdCommand({ - id: createdAd.aggregateId, - driver: createdAd.driver, - passenger: createdAd.passenger, - frequency: createdAd.frequency, - fromDate: createdAd.fromDate, - toDate: createdAd.toDate, - schedule: createdAd.schedule, - seatsProposed: createdAd.seatsProposed, - seatsRequested: createdAd.seatsRequested, - strict: createdAd.strict, - waypoints: createdAd.waypoints, - }), - ); - } catch (e: any) { - console.log(e); - } + const createdAd: Ad = JSON.parse(message); + await this.commandBus.execute( + new CreateAdCommand({ + id: createdAd.aggregateId, + driver: createdAd.driver, + passenger: createdAd.passenger, + frequency: createdAd.frequency, + fromDate: createdAd.fromDate, + toDate: createdAd.toDate, + schedule: createdAd.schedule, + seatsProposed: createdAd.seatsProposed, + seatsRequested: createdAd.seatsRequested, + strict: createdAd.strict, + waypoints: createdAd.waypoints, + }), + ); } } diff --git a/src/modules/ad/tests/unit/core/create-ad.service.spec.ts b/src/modules/ad/tests/unit/core/create-ad.service.spec.ts index 9b3052b..c19beb0 100644 --- a/src/modules/ad/tests/unit/core/create-ad.service.spec.ts +++ b/src/modules/ad/tests/unit/core/create-ad.service.spec.ts @@ -1,5 +1,9 @@ import { Test, TestingModule } from '@nestjs/testing'; -import { AD_REPOSITORY, AD_ROUTE_PROVIDER } from '@modules/ad/ad.di-tokens'; +import { + AD_MESSAGE_PUBLISHER, + AD_REPOSITORY, + AD_ROUTE_PROVIDER, +} from '@modules/ad/ad.di-tokens'; import { AggregateID } from '@mobicoop/ddd-library'; import { AdEntity } from '@modules/ad/core/domain/ad.entity'; import { ConflictException } from '@mobicoop/ddd-library'; @@ -96,6 +100,10 @@ const mockRouteProvider: RouteProviderPort = { getDetailed: jest.fn(), }; +const mockMessagePublisher = { + publish: jest.fn().mockImplementation(), +}; + describe('create-ad.service', () => { let createAdService: CreateAdService; @@ -110,6 +118,10 @@ describe('create-ad.service', () => { provide: AD_ROUTE_PROVIDER, useValue: mockRouteProvider, }, + { + provide: AD_MESSAGE_PUBLISHER, + useValue: mockMessagePublisher, + }, CreateAdService, ], }).compile(); diff --git a/src/modules/ad/tests/unit/core/publish-message-when-matcher-ad-is-created.domain-event-handler.spec.ts b/src/modules/ad/tests/unit/core/publish-message-when-matcher-ad-is-created.domain-event-handler.spec.ts new file mode 100644 index 0000000..9f51f97 --- /dev/null +++ b/src/modules/ad/tests/unit/core/publish-message-when-matcher-ad-is-created.domain-event-handler.spec.ts @@ -0,0 +1,57 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { AD_MESSAGE_PUBLISHER } from '@modules/ad/ad.di-tokens'; +import { MATCHER_AD_CREATED_ROUTING_KEY } from '@src/app.constants'; +import { PublishMessageWhenMatcherAdIsCreatedDomainEventHandler } from '@modules/ad/core/application/event-handlers/publish-message-when-matcher-ad-is-created.domain-event-handler'; +import { MatcherAdCreatedDomainEvent } from '@modules/ad/core/domain/events/matcher-ad-created.domain-event'; + +const mockMessagePublisher = { + publish: jest.fn().mockImplementation(), +}; + +describe('Publish message when matcher ad is created domain event handler', () => { + let publishMessageWhenMatcherAdIsCreatedDomainEventHandler: PublishMessageWhenMatcherAdIsCreatedDomainEventHandler; + + beforeAll(async () => { + const module: TestingModule = await Test.createTestingModule({ + providers: [ + { + provide: AD_MESSAGE_PUBLISHER, + useValue: mockMessagePublisher, + }, + PublishMessageWhenMatcherAdIsCreatedDomainEventHandler, + ], + }).compile(); + + publishMessageWhenMatcherAdIsCreatedDomainEventHandler = + module.get( + PublishMessageWhenMatcherAdIsCreatedDomainEventHandler, + ); + }); + + it('should publish a message', () => { + jest.spyOn(mockMessagePublisher, 'publish'); + const matcherAdCreatedDomainEvent: MatcherAdCreatedDomainEvent = { + id: 'some-domain-event-id', + aggregateId: 'some-aggregate-id', + metadata: { + timestamp: new Date('2023-06-28T05:00:00Z').getTime(), + correlationId: 'some-correlation-id', + }, + driverDistance: 65845, + driverDuration: 3254, + fwdAzimuth: 90, + backAzimuth: 270, + }; + publishMessageWhenMatcherAdIsCreatedDomainEventHandler.handle( + matcherAdCreatedDomainEvent, + ); + expect( + publishMessageWhenMatcherAdIsCreatedDomainEventHandler, + ).toBeDefined(); + expect(mockMessagePublisher.publish).toHaveBeenCalledTimes(1); + expect(mockMessagePublisher.publish).toHaveBeenCalledWith( + MATCHER_AD_CREATED_ROUTING_KEY, + '{"id":"some-aggregate-id","metadata":{"correlationId":"some-correlation-id","timestamp":1687928400000},"driverDuration":3254,"driverDistance":65845,"fwdAzimuth":90,"backAzimuth":270}', + ); + }); +});