refactor message broker

This commit is contained in:
sbriat 2023-03-16 10:58:25 +01:00
parent 39e99daadb
commit 74cc5dadca
18 changed files with 100 additions and 139 deletions

View File

@ -8,6 +8,7 @@ DATABASE_URL="postgresql://territory:territory@v3-territory-db:5432/territory?sc
# RABBIT MQ
RMQ_URI=amqp://v3-broker:5672
RMQ_EXCHANGE=mobicoop
# POSTGRES
POSTGRES_IMAGE=postgis/postgis:15-3.3

View File

@ -16,9 +16,7 @@ export class ConfigurationMessagerController {
) {}
@RabbitSubscribe({
exchange: 'configuration',
routingKey: ['create', 'update'],
queue: 'territory-configuration-update',
name: 'setConfiguration',
})
public async setConfigurationHandler(message: string) {
const configuration: Configuration = JSON.parse(message);
@ -38,9 +36,7 @@ export class ConfigurationMessagerController {
}
@RabbitSubscribe({
exchange: 'configuration',
routingKey: 'delete',
queue: 'territory-configuration-delete',
name: 'deleteConfiguration',
})
public async configurationDeletedHandler(message: string) {
const deletedConfiguration: Configuration = JSON.parse(message);
@ -58,9 +54,7 @@ export class ConfigurationMessagerController {
}
@RabbitSubscribe({
exchange: 'configuration',
routingKey: 'propagate',
queue: 'territory-configuration-propagate',
name: 'propagateConfiguration',
})
public async propagateConfigurationsHandler(message: string) {
const configurations: Array<Configuration> = JSON.parse(message);

View File

@ -32,10 +32,24 @@ import { SetConfigurationUseCase } from './domain/usecases/set-configuration.use
): Promise<RabbitMQConfig> => ({
exchanges: [
{
name: 'configuration',
name: configService.get<string>('RMQ_EXCHANGE'),
type: 'topic',
},
],
handlers: {
setConfiguration: {
exchange: configService.get<string>('RMQ_EXCHANGE'),
routingKey: ['configuration.create', 'configuration.update'],
},
deleteConfiguration: {
exchange: configService.get<string>('RMQ_EXCHANGE'),
routingKey: 'configuration.delete',
},
propagateConfiguration: {
exchange: configService.get<string>('RMQ_EXCHANGE'),
routingKey: 'configuration.propagate',
},
},
uri: configService.get<string>('RMQ_URI'),
connectionInitOptions: { wait: false },
enableControllerDiscovery: true,

View File

@ -1,11 +1,15 @@
import { AmqpConnection } from '@golevelup/nestjs-rabbitmq';
import { Injectable } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { IMessageBroker } from '../../domain/interfaces/message-broker';
@Injectable()
export class LoggingMessager extends IMessageBroker {
constructor(private readonly _amqpConnection: AmqpConnection) {
super('logging');
export class Messager extends IMessageBroker {
constructor(
private readonly _amqpConnection: AmqpConnection,
configService: ConfigService,
) {
super(configService.get<string>('RMQ_EXCHANGE'));
}
publish(routingKey: string, message: string): void {

View File

@ -1,14 +0,0 @@
import { AmqpConnection } from '@golevelup/nestjs-rabbitmq';
import { Injectable } from '@nestjs/common';
import { IMessageBroker } from '../../domain/interfaces/message-broker';
@Injectable()
export class TerritoryMessager extends IMessageBroker {
constructor(private readonly _amqpConnection: AmqpConnection) {
super('territory');
}
publish(routingKey: string, message: string): void {
this._amqpConnection.publish(this.exchange, routingKey, message);
}
}

View File

@ -0,0 +1,9 @@
import { AutoMap } from '@automapper/classes';
export class TerritoryPresenter {
@AutoMap()
uuid: string;
@AutoMap()
name: string;
}

View File

@ -1,20 +1,19 @@
import { Mapper } from '@automapper/core';
import { InjectMapper } from '@automapper/nestjs';
import { CommandHandler } from '@nestjs/cqrs';
import { LoggingMessager } from '../../adapters/secondaries/logging.messager';
import { TerritoriesRepository } from '../../adapters/secondaries/territories.repository';
import { CreateTerritoryCommand } from '../../commands/create-territory.command';
import { TerritoryMessager } from '../../adapters/secondaries/territory.messager';
import { Messager } from '../../adapters/secondaries/messager';
import { Territory } from '../entities/territory';
import { CreateTerritoryRequest } from '../dtos/create-territory.request';
import { TerritoryLoggingPresenter } from '../../adapters/secondaries/territory-logging.presenter';
import { TerritoryPresenter } from '../../adapters/secondaries/territory.presenter';
@CommandHandler(CreateTerritoryCommand)
export class CreateTerritoryUseCase {
constructor(
private readonly _repository: TerritoriesRepository,
private readonly _territoryMessager: TerritoryMessager,
private readonly _loggingMessager: LoggingMessager,
private readonly _messager: Messager,
@InjectMapper() private readonly _mapper: Mapper,
) {}
@ -27,25 +26,25 @@ export class CreateTerritoryUseCase {
try {
const territory = await this._repository.createTerritory(entity);
this._territoryMessager.publish(
'create',
this._messager.publish(
'territory.create',
JSON.stringify(
this._mapper.map(territory, Territory, TerritoryLoggingPresenter),
this._mapper.map(territory, Territory, TerritoryPresenter),
),
);
this._loggingMessager.publish(
'territory.create.info',
this._messager.publish(
'logging.territory.create.info',
JSON.stringify(
this._mapper.map(territory, Territory, TerritoryLoggingPresenter),
),
);
return territory;
} catch (error) {
let key = 'territory.create.crit';
let key = 'logging.territory.create.crit';
if (error.message.includes('already exists')) {
key = 'territory.create.warning';
key = 'logging.territory.create.warning';
}
this._loggingMessager.publish(
this._messager.publish(
key,
JSON.stringify({
command,

View File

@ -1,7 +1,6 @@
import { CommandHandler } from '@nestjs/cqrs';
import { LoggingMessager } from '../../adapters/secondaries/logging.messager';
import { TerritoriesRepository } from '../../adapters/secondaries/territories.repository';
import { TerritoryMessager } from '../../adapters/secondaries/territory.messager';
import { Messager } from '../../adapters/secondaries/messager';
import { DeleteTerritoryCommand } from '../../commands/delete-territory.command';
import { Territory } from '../entities/territory';
@ -9,25 +8,24 @@ import { Territory } from '../entities/territory';
export class DeleteTerritoryUseCase {
constructor(
private readonly _repository: TerritoriesRepository,
private readonly _territoryMessager: TerritoryMessager,
private readonly _loggingMessager: LoggingMessager,
private readonly _messager: Messager,
) {}
async execute(command: DeleteTerritoryCommand): Promise<Territory> {
try {
const territory = await this._repository.delete(command.uuid);
this._territoryMessager.publish(
'delete',
this._messager.publish(
'territory.delete',
JSON.stringify({ uuid: territory.uuid }),
);
this._loggingMessager.publish(
'territory.delete.info',
this._messager.publish(
'logging.territory.delete.info',
JSON.stringify({ uuid: territory.uuid }),
);
return territory;
} catch (error) {
this._loggingMessager.publish(
'territory.delete.crit',
this._messager.publish(
'logging.territory.delete.crit',
JSON.stringify({
command,
error,

View File

@ -1,7 +1,7 @@
import { NotFoundException } from '@nestjs/common';
import { QueryHandler } from '@nestjs/cqrs';
import { LoggingMessager } from '../../adapters/secondaries/logging.messager';
import { TerritoriesRepository } from '../../adapters/secondaries/territories.repository';
import { Messager } from '../../adapters/secondaries/messager';
import { FindTerritoryByUuidQuery } from '../../queries/find-territory-by-uuid.query';
import { Territory } from '../entities/territory';
@ -9,7 +9,7 @@ import { Territory } from '../entities/territory';
export class FindTerritoryByUuidUseCase {
constructor(
private readonly _repository: TerritoriesRepository,
private readonly _loggingMessager: LoggingMessager,
private readonly _messager: Messager,
) {}
async execute(
@ -22,8 +22,8 @@ export class FindTerritoryByUuidUseCase {
if (!territory) throw new NotFoundException();
return territory;
} catch (error) {
this._loggingMessager.publish(
'territory.read.warning',
this._messager.publish(
'logging.territory.read.warning',
JSON.stringify({
query: findTerritoryByUuidQuery,
error,

View File

@ -1,20 +1,19 @@
import { Mapper } from '@automapper/core';
import { InjectMapper } from '@automapper/nestjs';
import { CommandHandler } from '@nestjs/cqrs';
import { LoggingMessager } from '../../adapters/secondaries/logging.messager';
import { TerritoriesRepository } from '../../adapters/secondaries/territories.repository';
import { TerritoryMessager } from '../../adapters/secondaries/territory.messager';
import { Messager } from '../../adapters/secondaries/messager';
import { TerritoryLoggingPresenter } from '../../adapters/secondaries/territory-logging.presenter';
import { UpdateTerritoryCommand } from '../../commands/update-territory.command';
import { UpdateTerritoryRequest } from '../dtos/update-territory.request';
import { Territory } from '../entities/territory';
import { TerritoryPresenter } from '../../adapters/secondaries/territory.presenter';
@CommandHandler(UpdateTerritoryCommand)
export class UpdateTerritoryUseCase {
constructor(
private readonly _repository: TerritoriesRepository,
private readonly _territoryMessager: TerritoryMessager,
private readonly _loggingMessager: LoggingMessager,
private readonly _messager: Messager,
@InjectMapper() private readonly _mapper: Mapper,
) {}
@ -28,18 +27,18 @@ export class UpdateTerritoryUseCase {
Territory,
),
);
this._territoryMessager.publish(
'update',
this._messager.publish(
'territory.update',
JSON.stringify(
this._mapper.map(
command.updateTerritoryRequest,
UpdateTerritoryRequest,
TerritoryLoggingPresenter,
TerritoryPresenter,
),
),
);
this._loggingMessager.publish(
'territory.update.info',
this._messager.publish(
'logging.territory.update.info',
JSON.stringify(
this._mapper.map(
command.updateTerritoryRequest,
@ -50,8 +49,8 @@ export class UpdateTerritoryUseCase {
);
return territory;
} catch (error) {
this._loggingMessager.publish(
'territory.update.crit',
this._messager.publish(
'logging.territory.update.crit',
JSON.stringify({
command,
error,

View File

@ -2,6 +2,7 @@ import { createMap, forMember, ignore, Mapper } from '@automapper/core';
import { AutomapperProfile, InjectMapper } from '@automapper/nestjs';
import { Injectable } from '@nestjs/common';
import { TerritoryPresenter } from '../adapters/primaries/territory.presenter';
import { TerritoryPresenter as SecondaryTerritoryPresenter } from '../adapters/secondaries/territory.presenter';
import { TerritoryLoggingPresenter } from '../adapters/secondaries/territory-logging.presenter';
import { CreateTerritoryRequest } from '../domain/dtos/create-territory.request';
import { UpdateTerritoryRequest } from '../domain/dtos/update-territory.request';
@ -16,6 +17,7 @@ export class TerritoryProfile extends AutomapperProfile {
override get profile() {
return (mapper) => {
createMap(mapper, Territory, TerritoryPresenter);
createMap(mapper, Territory, SecondaryTerritoryPresenter);
createMap(mapper, Territory, TerritoryLoggingPresenter);
createMap(mapper, CreateTerritoryRequest, Territory);
@ -27,6 +29,7 @@ export class TerritoryProfile extends AutomapperProfile {
forMember((dest) => dest.uuid, ignore()),
);
createMap(mapper, UpdateTerritoryRequest, SecondaryTerritoryPresenter);
createMap(mapper, UpdateTerritoryRequest, TerritoryLoggingPresenter);
};
}

View File

@ -6,9 +6,8 @@ import { CqrsModule } from '@nestjs/cqrs';
import { redisStore } from 'cache-manager-ioredis-yet';
import { DatabaseModule } from '../database/database.module';
import { TerritoriesController } from './adapters/primaries/territories.controller';
import { LoggingMessager } from './adapters/secondaries/logging.messager';
import { TerritoriesRepository } from './adapters/secondaries/territories.repository';
import { TerritoryMessager } from './adapters/secondaries/territory.messager';
import { Messager } from './adapters/secondaries/messager';
import { CreateTerritoryUseCase } from './domain/usecases/create-territory.usecase';
import { DeleteTerritoryUseCase } from './domain/usecases/delete-territory.usecase';
import { FindAllTerritoriesForPointUseCase } from './domain/usecases/find-all-territories-for-point.usecase';
@ -27,11 +26,7 @@ import { TerritoryProfile } from './mappers/territory.profile';
useFactory: async (configService: ConfigService) => ({
exchanges: [
{
name: 'territory',
type: 'topic',
},
{
name: 'logging',
name: configService.get<string>('RMQ_EXCHANGE'),
type: 'topic',
},
],
@ -56,8 +51,7 @@ import { TerritoryProfile } from './mappers/territory.profile';
providers: [
TerritoryProfile,
TerritoriesRepository,
TerritoryMessager,
LoggingMessager,
Messager,
FindAllTerritoriesForPointUseCase,
FindAllTerritoriesForPointsUseCase,
FindAllTerritoriesUseCase,

View File

@ -1,9 +1,8 @@
import { classes } from '@automapper/classes';
import { AutomapperModule } from '@automapper/nestjs';
import { Test, TestingModule } from '@nestjs/testing';
import { LoggingMessager } from '../../adapters/secondaries/logging.messager';
import { TerritoriesRepository } from '../../adapters/secondaries/territories.repository';
import { TerritoryMessager } from '../../adapters/secondaries/territory.messager';
import { Messager } from '../../adapters/secondaries/messager';
import { CreateTerritoryCommand } from '../../commands/create-territory.command';
import { CreateTerritoryRequest } from '../../domain/dtos/create-territory.request';
import { Territory } from '../../domain/entities/territory';
@ -51,11 +50,7 @@ describe('CreateTerritoryUseCase', () => {
CreateTerritoryUseCase,
TerritoryProfile,
{
provide: TerritoryMessager,
useValue: mockMessager,
},
{
provide: LoggingMessager,
provide: Messager,
useValue: mockMessager,
},
],

View File

@ -1,7 +1,6 @@
import { Test, TestingModule } from '@nestjs/testing';
import { LoggingMessager } from '../../adapters/secondaries/logging.messager';
import { TerritoriesRepository } from '../../adapters/secondaries/territories.repository';
import { TerritoryMessager } from '../../adapters/secondaries/territory.messager';
import { Messager } from '../../adapters/secondaries/messager';
import { DeleteTerritoryCommand } from '../../commands/delete-territory.command';
import { DeleteTerritoryUseCase } from '../../domain/usecases/delete-territory.usecase';
@ -60,11 +59,7 @@ describe('DeleteTerritoryUseCase', () => {
},
DeleteTerritoryUseCase,
{
provide: TerritoryMessager,
useValue: mockMessager,
},
{
provide: LoggingMessager,
provide: Messager,
useValue: mockMessager,
},
],

View File

@ -1,6 +1,6 @@
import { NotFoundException } from '@nestjs/common';
import { Test, TestingModule } from '@nestjs/testing';
import { LoggingMessager } from '../../adapters/secondaries/logging.messager';
import { Messager } from '../../adapters/secondaries/messager';
import { TerritoriesRepository } from '../../adapters/secondaries/territories.repository';
import { FindTerritoryByUuidRequest } from '../../domain/dtos/find-territory-by-uuid.request';
import { FindTerritoryByUuidUseCase } from '../../domain/usecases/find-territory-by-uuid.usecase';
@ -41,7 +41,7 @@ describe('FindTerritoryByUuidUseCase', () => {
useValue: mockTerritoriesRepository,
},
{
provide: LoggingMessager,
provide: Messager,
useValue: mockMessager,
},
FindTerritoryByUuidUseCase,

View File

@ -1,36 +1,47 @@
import { AmqpConnection } from '@golevelup/nestjs-rabbitmq';
import { ConfigService } from '@nestjs/config';
import { Test, TestingModule } from '@nestjs/testing';
import { LoggingMessager } from '../../adapters/secondaries/logging.messager';
import { Messager } from '../../adapters/secondaries/messager';
const mockAmqpConnection = {
publish: jest.fn().mockImplementation(),
};
describe('LoggingMessager', () => {
let loggingMessager: LoggingMessager;
const mockConfigService = {
get: jest.fn().mockResolvedValue({
RMQ_EXCHANGE: 'mobicoop',
}),
};
describe('Messager', () => {
let messager: Messager;
beforeAll(async () => {
const module: TestingModule = await Test.createTestingModule({
imports: [],
providers: [
LoggingMessager,
Messager,
{
provide: AmqpConnection,
useValue: mockAmqpConnection,
},
{
provide: ConfigService,
useValue: mockConfigService,
},
],
}).compile();
loggingMessager = module.get<LoggingMessager>(LoggingMessager);
messager = module.get<Messager>(Messager);
});
it('should be defined', () => {
expect(LoggingMessager).toBeDefined();
expect(messager).toBeDefined();
});
it('should publish a message', async () => {
jest.spyOn(mockAmqpConnection, 'publish');
await loggingMessager.publish('territory.create.info', 'my-test');
messager.publish('territory.create.info', 'my-test');
expect(mockAmqpConnection.publish).toHaveBeenCalledTimes(1);
});
});

View File

@ -1,36 +0,0 @@
import { AmqpConnection } from '@golevelup/nestjs-rabbitmq';
import { Test, TestingModule } from '@nestjs/testing';
import { TerritoryMessager } from '../../adapters/secondaries/territory.messager';
const mockAmqpConnection = {
publish: jest.fn().mockImplementation(),
};
describe('TerritoryMessager', () => {
let territoryMessager: TerritoryMessager;
beforeAll(async () => {
const module: TestingModule = await Test.createTestingModule({
imports: [],
providers: [
TerritoryMessager,
{
provide: AmqpConnection,
useValue: mockAmqpConnection,
},
],
}).compile();
territoryMessager = module.get<TerritoryMessager>(TerritoryMessager);
});
it('should be defined', () => {
expect(territoryMessager).toBeDefined();
});
it('should publish a message', async () => {
jest.spyOn(mockAmqpConnection, 'publish');
await territoryMessager.publish('territory.create.info', 'my-test');
expect(mockAmqpConnection.publish).toHaveBeenCalledTimes(1);
});
});

View File

@ -1,9 +1,8 @@
import { classes } from '@automapper/classes';
import { AutomapperModule } from '@automapper/nestjs';
import { Test, TestingModule } from '@nestjs/testing';
import { LoggingMessager } from '../../adapters/secondaries/logging.messager';
import { TerritoriesRepository } from '../../adapters/secondaries/territories.repository';
import { TerritoryMessager } from '../../adapters/secondaries/territory.messager';
import { Messager } from '../../adapters/secondaries/messager';
import { UpdateTerritoryCommand } from '../../commands/update-territory.command';
import { UpdateTerritoryRequest } from '../../domain/dtos/update-territory.request';
import { Territory } from '../../domain/entities/territory';
@ -54,11 +53,7 @@ describe('UpdateTerritoryUseCase', () => {
UpdateTerritoryUseCase,
TerritoryProfile,
{
provide: TerritoryMessager,
useValue: mockMessager,
},
{
provide: LoggingMessager,
provide: Messager,
useValue: mockMessager,
},
],