refactor message broker

This commit is contained in:
sbriat 2023-03-16 09:55:51 +01:00
parent 2e3e7c2a04
commit 69dbd6daa0
17 changed files with 92 additions and 180 deletions

View File

@ -7,6 +7,7 @@ DATABASE_URL="postgresql://configuration:configuration@v3-configuration-db:5432/
# RABBIT MQ
RMQ_URI=amqp://v3-broker:5672
RMQ_EXCHANGE=mobicoop
# POSTGRES
POSTGRES_IMAGE=postgres:15.0

View File

@ -5,8 +5,5 @@ SERVICE_PORT=5003
# PRISMA
DATABASE_URL="postgresql://configuration:configuration@localhost:5603/configuration?schema=public"
# RABBIT MQ
RMQ_URI=amqp://v3-broker:5672
# POSTGRES
POSTGRES_IMAGE=postgres:15.0

View File

@ -1,14 +0,0 @@
import { AmqpConnection } from '@golevelup/nestjs-rabbitmq';
import { Injectable } from '@nestjs/common';
import { IMessageBroker } from '../../domain/interfaces/message-broker';
@Injectable()
export class LoggingMessager extends IMessageBroker {
constructor(private readonly _amqpConnection: AmqpConnection) {
super('logging');
}
publish(routingKey: string, message: string): void {
this._amqpConnection.publish(this.exchange, routingKey, message);
}
}

View File

@ -1,11 +1,15 @@
import { AmqpConnection } from '@golevelup/nestjs-rabbitmq';
import { Injectable } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { IMessageBroker } from '../../domain/interfaces/message-broker';
@Injectable()
export class ConfigurationMessager extends IMessageBroker {
constructor(private readonly _amqpConnection: AmqpConnection) {
super('configuration');
export class Messager extends IMessageBroker {
constructor(
private readonly _amqpConnection: AmqpConnection,
configService: ConfigService,
) {
super(configService.get<string>('RMQ_EXCHANGE'));
}
publish(routingKey: string, message: string): void {

View File

@ -4,9 +4,8 @@ import { ConfigModule, ConfigService } from '@nestjs/config';
import { CqrsModule } from '@nestjs/cqrs';
import { DatabaseModule } from '../database/database.module';
import { ConfigurationController } from './adapters/primaries/configuration.controller';
import { ConfigurationMessager } from './adapters/secondaries/configuration.messager';
import { Messager } from './adapters/secondaries/messager';
import { ConfigurationRepository } from './adapters/secondaries/configuration.repository';
import { LoggingMessager } from './adapters/secondaries/logging.messager';
import { CreateConfigurationUseCase } from './domain/usecases/create-configuration.usecase';
import { DeleteConfigurationUseCase } from './domain/usecases/delete-configuration.usecase';
import { FindAllConfigurationsUseCase } from './domain/usecases/find-all-configurations.usecase';
@ -25,11 +24,7 @@ import { ConfigurationProfile } from './mappers/configuration.profile';
useFactory: async (configService: ConfigService) => ({
exchanges: [
{
name: 'configuration',
type: 'topic',
},
{
name: 'logging',
name: configService.get<string>('RMQ_EXCHANGE'),
type: 'topic',
},
],
@ -43,8 +38,7 @@ import { ConfigurationProfile } from './mappers/configuration.profile';
providers: [
ConfigurationProfile,
ConfigurationRepository,
ConfigurationMessager,
LoggingMessager,
Messager,
FindAllConfigurationsUseCase,
FindConfigurationByUuidUseCase,
CreateConfigurationUseCase,

View File

@ -2,9 +2,8 @@ import { Mapper } from '@automapper/core';
import { InjectMapper } from '@automapper/nestjs';
import { CommandHandler } from '@nestjs/cqrs';
import { ConfigurationMessagerPresenter } from '../../adapters/secondaries/configuration-messager.presenter';
import { ConfigurationMessager } from '../../adapters/secondaries/configuration.messager';
import { Messager } from '../../adapters/secondaries/messager';
import { ConfigurationRepository } from '../../adapters/secondaries/configuration.repository';
import { LoggingMessager } from '../../adapters/secondaries/logging.messager';
import { CreateConfigurationCommand } from '../../commands/create-configuration.command';
import { CreateConfigurationRequest } from '../dtos/create-configuration.request';
import { Configuration } from '../entities/configuration';
@ -13,8 +12,7 @@ import { Configuration } from '../entities/configuration';
export class CreateConfigurationUseCase {
constructor(
private readonly _repository: ConfigurationRepository,
private readonly _configurationMessager: ConfigurationMessager,
private readonly _loggingMessager: LoggingMessager,
private readonly _messager: Messager,
@InjectMapper() private readonly _mapper: Mapper,
) {}
@ -27,7 +25,7 @@ export class CreateConfigurationUseCase {
try {
const configuration = await this._repository.create(entity);
this._configurationMessager.publish(
this._messager.publish(
'create',
JSON.stringify(
this._mapper.map(
@ -37,17 +35,17 @@ export class CreateConfigurationUseCase {
),
),
);
this._loggingMessager.publish(
'configuration.create.info',
this._messager.publish(
'logging.configuration.create.info',
JSON.stringify(configuration),
);
return configuration;
} catch (error) {
let key = 'configuration.create.crit';
let key = 'logging.configuration.create.crit';
if (error.message.includes('Already exists')) {
key = 'configuration.create.warning';
key = 'logging.configuration.create.warning';
}
this._loggingMessager.publish(
this._messager.publish(
key,
JSON.stringify({
command,

View File

@ -2,9 +2,8 @@ import { Mapper } from '@automapper/core';
import { InjectMapper } from '@automapper/nestjs';
import { CommandHandler } from '@nestjs/cqrs';
import { ConfigurationMessagerPresenter } from '../../adapters/secondaries/configuration-messager.presenter';
import { ConfigurationMessager } from '../../adapters/secondaries/configuration.messager';
import { Messager } from '../../adapters/secondaries/messager';
import { ConfigurationRepository } from '../../adapters/secondaries/configuration.repository';
import { LoggingMessager } from '../../adapters/secondaries/logging.messager';
import { DeleteConfigurationCommand } from '../../commands/delete-configuration.command';
import { Configuration } from '../entities/configuration';
@ -12,15 +11,14 @@ import { Configuration } from '../entities/configuration';
export class DeleteConfigurationUseCase {
constructor(
private readonly _repository: ConfigurationRepository,
private readonly _configurationMessager: ConfigurationMessager,
private readonly _loggingMessager: LoggingMessager,
private readonly _messager: Messager,
@InjectMapper() private readonly _mapper: Mapper,
) {}
async execute(command: DeleteConfigurationCommand): Promise<Configuration> {
try {
const configuration = await this._repository.delete(command.uuid);
this._configurationMessager.publish(
this._messager.publish(
'delete',
JSON.stringify(
this._mapper.map(
@ -30,14 +28,14 @@ export class DeleteConfigurationUseCase {
),
),
);
this._loggingMessager.publish(
'configuration.delete.info',
this._messager.publish(
'logging.configuration.delete.info',
JSON.stringify({ uuid: configuration.uuid }),
);
return configuration;
} catch (error) {
this._loggingMessager.publish(
'configuration.delete.crit',
this._messager.publish(
'logging.configuration.delete.crit',
JSON.stringify({
command,
error,

View File

@ -1,7 +1,7 @@
import { NotFoundException } from '@nestjs/common';
import { QueryHandler } from '@nestjs/cqrs';
import { ConfigurationRepository } from '../../adapters/secondaries/configuration.repository';
import { LoggingMessager } from '../../adapters/secondaries/logging.messager';
import { Messager } from '../../adapters/secondaries/messager';
import { FindConfigurationByUuidQuery } from '../../queries/find-configuration-by-uuid.query';
import { Configuration } from '../entities/configuration';
@ -9,7 +9,7 @@ import { Configuration } from '../entities/configuration';
export class FindConfigurationByUuidUseCase {
constructor(
private readonly _repository: ConfigurationRepository,
private readonly _loggingMessager: LoggingMessager,
private readonly _messager: Messager,
) {}
async execute(
@ -22,8 +22,8 @@ export class FindConfigurationByUuidUseCase {
if (!configuration) throw new NotFoundException();
return configuration;
} catch (error) {
this._loggingMessager.publish(
'configuration.read.warning',
this._messager.publish(
'logging.configuration.read.warning',
JSON.stringify({
query: findConfigurationByUuid,
error,

View File

@ -2,9 +2,8 @@ import { Mapper } from '@automapper/core';
import { InjectMapper } from '@automapper/nestjs';
import { QueryHandler } from '@nestjs/cqrs';
import { ConfigurationMessagerPresenter } from '../../adapters/secondaries/configuration-messager.presenter';
import { ConfigurationMessager } from '../../adapters/secondaries/configuration.messager';
import { Messager } from '../../adapters/secondaries/messager';
import { ConfigurationRepository } from '../../adapters/secondaries/configuration.repository';
import { LoggingMessager } from '../../adapters/secondaries/logging.messager';
import { PropagateConfigurationsQuery } from '../../queries/propagate-configurations.query';
import { Configuration } from '../entities/configuration';
@ -12,8 +11,7 @@ import { Configuration } from '../entities/configuration';
export class PropagateConfigurationsUseCase {
constructor(
private readonly _repository: ConfigurationRepository,
private readonly _configurationMessager: ConfigurationMessager,
private readonly _loggingMessager: LoggingMessager,
private readonly _messager: Messager,
@InjectMapper() private readonly _mapper: Mapper,
) {}
@ -21,7 +19,7 @@ export class PropagateConfigurationsUseCase {
async execute(propagateConfigurationsQuery: PropagateConfigurationsQuery) {
try {
const configurations = await this._repository.findAll(1, 999999);
this._configurationMessager.publish(
this._messager.publish(
'propagate',
JSON.stringify(
configurations.data.map((configuration) =>
@ -33,9 +31,15 @@ export class PropagateConfigurationsUseCase {
),
),
);
this._loggingMessager.publish('configuration.update.info', 'propagation');
this._messager.publish(
'logging.configuration.update.info',
'propagation',
);
} catch (error) {
this._loggingMessager.publish('configuration.update.crit', 'propagation');
this._messager.publish(
'logging.configuration.update.crit',
'propagation',
);
throw error;
}
}

View File

@ -2,9 +2,8 @@ import { Mapper } from '@automapper/core';
import { InjectMapper } from '@automapper/nestjs';
import { CommandHandler } from '@nestjs/cqrs';
import { ConfigurationMessagerPresenter } from '../../adapters/secondaries/configuration-messager.presenter';
import { ConfigurationMessager } from '../../adapters/secondaries/configuration.messager';
import { Messager } from '../../adapters/secondaries/messager';
import { ConfigurationRepository } from '../../adapters/secondaries/configuration.repository';
import { LoggingMessager } from '../../adapters/secondaries/logging.messager';
import { UpdateConfigurationCommand } from '../../commands/update-configuration.command';
import { UpdateConfigurationRequest } from '../dtos/update-configuration.request';
import { Configuration } from '../entities/configuration';
@ -13,8 +12,7 @@ import { Configuration } from '../entities/configuration';
export class UpdateConfigurationUseCase {
constructor(
private readonly _repository: ConfigurationRepository,
private readonly _configurationMessager: ConfigurationMessager,
private readonly _loggingMessager: LoggingMessager,
private readonly _messager: Messager,
@InjectMapper() private readonly _mapper: Mapper,
) {}
@ -30,7 +28,7 @@ export class UpdateConfigurationUseCase {
command.updateConfigurationRequest.uuid,
entity,
);
this._configurationMessager.publish(
this._messager.publish(
'update',
JSON.stringify(
this._mapper.map(
@ -40,14 +38,14 @@ export class UpdateConfigurationUseCase {
),
),
);
this._loggingMessager.publish(
'configuration.update.info',
this._messager.publish(
'logging.configuration.update.info',
JSON.stringify(command.updateConfigurationRequest),
);
return configuration;
} catch (error) {
this._loggingMessager.publish(
'configuration.update.crit',
this._messager.publish(
'logging.configuration.update.crit',
JSON.stringify({
command,
error,

View File

@ -1,9 +1,8 @@
import { classes } from '@automapper/classes';
import { AutomapperModule } from '@automapper/nestjs';
import { Test, TestingModule } from '@nestjs/testing';
import { ConfigurationMessager } from '../../adapters/secondaries/configuration.messager';
import { Messager } from '../../adapters/secondaries/messager';
import { ConfigurationRepository } from '../../adapters/secondaries/configuration.repository';
import { LoggingMessager } from '../../adapters/secondaries/logging.messager';
import { CreateConfigurationCommand } from '../../commands/create-configuration.command';
import { CreateConfigurationRequest } from '../../domain/dtos/create-configuration.request';
import { Domain } from '../../domain/dtos/domain.enum';
@ -33,14 +32,9 @@ const mockConfigurationRepository = {
}),
};
const mockConfigurationMessager = {
const mockMessager = {
publish: jest.fn().mockImplementation(),
};
const mockLoggingMessager = {
publish: jest.fn().mockImplementation(),
};
describe('CreateConfigurationUseCase', () => {
let createConfigurationUseCase: CreateConfigurationUseCase;
@ -55,12 +49,8 @@ describe('CreateConfigurationUseCase', () => {
CreateConfigurationUseCase,
ConfigurationProfile,
{
provide: ConfigurationMessager,
useValue: mockConfigurationMessager,
},
{
provide: LoggingMessager,
useValue: mockLoggingMessager,
provide: Messager,
useValue: mockMessager,
},
],
}).compile();
@ -76,13 +66,13 @@ describe('CreateConfigurationUseCase', () => {
describe('execute', () => {
it('should create and return a new configuration', async () => {
jest.spyOn(mockConfigurationMessager, 'publish');
jest.spyOn(mockMessager, 'publish');
const newConfiguration: Configuration =
await createConfigurationUseCase.execute(newConfigurationCommand);
expect(newConfiguration.key).toBe(newConfigurationRequest.key);
expect(newConfiguration.uuid).toBeDefined();
expect(mockConfigurationMessager.publish).toHaveBeenCalledTimes(1);
expect(mockMessager.publish).toHaveBeenCalledTimes(2);
});
it('should throw an error if configuration already exists', async () => {

View File

@ -2,9 +2,8 @@ import { classes } from '@automapper/classes';
import { AutomapperModule } from '@automapper/nestjs';
import { Test, TestingModule } from '@nestjs/testing';
import { ICollection } from 'src/modules/database/src/interfaces/collection.interface';
import { ConfigurationMessager } from '../../adapters/secondaries/configuration.messager';
import { Messager } from '../../adapters/secondaries/messager';
import { ConfigurationRepository } from '../../adapters/secondaries/configuration.repository';
import { LoggingMessager } from '../../adapters/secondaries/logging.messager';
import { DeleteConfigurationCommand } from '../../commands/delete-configuration.command';
import { Domain } from '../../domain/dtos/domain.enum';
import { Configuration } from '../../domain/entities/configuration';
@ -53,13 +52,10 @@ const mockConfigurationRepository = {
}),
};
const mockConfigurationMessager = {
const mockMessager = {
publish: jest.fn().mockImplementation(),
};
const mockLoggingMessager = {
publish: jest.fn().mockImplementation(),
};
describe('DeleteConfigurationUseCase', () => {
let deleteConfigurationUseCase: DeleteConfigurationUseCase;
@ -74,12 +70,8 @@ describe('DeleteConfigurationUseCase', () => {
DeleteConfigurationUseCase,
ConfigurationProfile,
{
provide: ConfigurationMessager,
useValue: mockConfigurationMessager,
},
{
provide: LoggingMessager,
useValue: mockLoggingMessager,
provide: Messager,
useValue: mockMessager,
},
],
}).compile();
@ -95,7 +87,7 @@ describe('DeleteConfigurationUseCase', () => {
describe('execute', () => {
it('should delete a configuration', async () => {
jest.spyOn(mockConfigurationMessager, 'publish');
jest.spyOn(mockMessager, 'publish');
const savedUuid = mockConfigurations.data[0].uuid;
const deleteConfigurationCommand = new DeleteConfigurationCommand(
savedUuid,
@ -106,7 +98,7 @@ describe('DeleteConfigurationUseCase', () => {
(configuration) => configuration.uuid === savedUuid,
);
expect(deletedConfiguration).toBeUndefined();
expect(mockConfigurationMessager.publish).toHaveBeenCalledTimes(1);
expect(mockMessager.publish).toHaveBeenCalledTimes(2);
});
it('should throw an error if configuration does not exist', async () => {
await expect(

View File

@ -1,8 +1,7 @@
import { NotFoundException } from '@nestjs/common';
import { Test, TestingModule } from '@nestjs/testing';
import { ConfigurationMessager } from '../../adapters/secondaries/configuration.messager';
import { Messager } from '../../adapters/secondaries/messager';
import { ConfigurationRepository } from '../../adapters/secondaries/configuration.repository';
import { LoggingMessager } from '../../adapters/secondaries/logging.messager';
import { Domain } from '../../domain/dtos/domain.enum';
import { FindConfigurationByUuidRequest } from '../../domain/dtos/find-configuration-by-uuid.request';
import { FindConfigurationByUuidUseCase } from '../../domain/usecases/find-configuration-by-uuid.usecase';
@ -44,11 +43,7 @@ describe('FindConfigurationByUuidUseCase', () => {
},
FindConfigurationByUuidUseCase,
{
provide: ConfigurationMessager,
useValue: mockMessager,
},
{
provide: LoggingMessager,
provide: Messager,
useValue: mockMessager,
},
],

View File

@ -1,36 +0,0 @@
import { AmqpConnection } from '@golevelup/nestjs-rabbitmq';
import { Test, TestingModule } from '@nestjs/testing';
import { LoggingMessager } from '../../adapters/secondaries/logging.messager';
const mockAmqpConnection = {
publish: jest.fn().mockImplementation(),
};
describe('LoggingMessager', () => {
let loggingMessager: LoggingMessager;
beforeAll(async () => {
const module: TestingModule = await Test.createTestingModule({
imports: [],
providers: [
LoggingMessager,
{
provide: AmqpConnection,
useValue: mockAmqpConnection,
},
],
}).compile();
loggingMessager = module.get<LoggingMessager>(LoggingMessager);
});
it('should be defined', () => {
expect(LoggingMessager).toBeDefined();
});
it('should publish a message', async () => {
jest.spyOn(mockAmqpConnection, 'publish');
await loggingMessager.publish('configuration.create.info', 'my-test');
expect(mockAmqpConnection.publish).toHaveBeenCalledTimes(1);
});
});

View File

@ -1,38 +1,47 @@
import { AmqpConnection } from '@golevelup/nestjs-rabbitmq';
import { ConfigService } from '@nestjs/config';
import { Test, TestingModule } from '@nestjs/testing';
import { ConfigurationMessager } from '../../adapters/secondaries/configuration.messager';
import { Messager } from '../../adapters/secondaries/messager';
const mockAmqpConnection = {
publish: jest.fn().mockImplementation(),
};
describe('ConfigurationMessager', () => {
let configurationMessager: ConfigurationMessager;
const mockConfigService = {
get: jest.fn().mockResolvedValue({
RMQ_EXCHANGE: 'mobicoop',
}),
};
describe('Messager', () => {
let messager: Messager;
beforeAll(async () => {
const module: TestingModule = await Test.createTestingModule({
imports: [],
providers: [
ConfigurationMessager,
Messager,
{
provide: AmqpConnection,
useValue: mockAmqpConnection,
},
{
provide: ConfigService,
useValue: mockConfigService,
},
],
}).compile();
configurationMessager = module.get<ConfigurationMessager>(
ConfigurationMessager,
);
messager = module.get<Messager>(Messager);
});
it('should be defined', () => {
expect(configurationMessager).toBeDefined();
expect(messager).toBeDefined();
});
it('should publish a message', async () => {
jest.spyOn(mockAmqpConnection, 'publish');
await configurationMessager.publish('configuration.create.info', 'my-test');
messager.publish('configuration.create.info', 'my-test');
expect(mockAmqpConnection.publish).toHaveBeenCalledTimes(1);
});
});

View File

@ -2,9 +2,8 @@ import { classes } from '@automapper/classes';
import { AutomapperModule } from '@automapper/nestjs';
import { Test, TestingModule } from '@nestjs/testing';
import { ICollection } from 'src/modules/database/src/interfaces/collection.interface';
import { ConfigurationMessager } from '../../adapters/secondaries/configuration.messager';
import { Messager } from '../../adapters/secondaries/messager';
import { ConfigurationRepository } from '../../adapters/secondaries/configuration.repository';
import { LoggingMessager } from '../../adapters/secondaries/logging.messager';
import { Domain } from '../../domain/dtos/domain.enum';
import { Configuration } from '../../domain/entities/configuration';
import { PropagateConfigurationsUseCase } from '../../domain/usecases/propagate-configurations.usecase';
@ -51,11 +50,7 @@ const mockConfigurationRepository = {
}),
};
const mockConfigurationMessager = {
publish: jest.fn().mockImplementation(),
};
const mockLoggingMessager = {
const mockMessager = {
publish: jest.fn().mockImplementation(),
};
@ -71,12 +66,8 @@ describe('PropagateConfigurationsUseCase', () => {
useValue: mockConfigurationRepository,
},
{
provide: ConfigurationMessager,
useValue: mockConfigurationMessager,
},
{
provide: LoggingMessager,
useValue: mockLoggingMessager,
provide: Messager,
useValue: mockMessager,
},
PropagateConfigurationsUseCase,
ConfigurationProfile,
@ -94,11 +85,11 @@ describe('PropagateConfigurationsUseCase', () => {
describe('execute', () => {
it('should propagate configurations', async () => {
jest.spyOn(mockConfigurationMessager, 'publish');
jest.spyOn(mockMessager, 'publish');
await propagateConfigurationsUseCase.execute(
propagateConfigurationsQuery,
);
expect(mockConfigurationMessager.publish).toHaveBeenCalledTimes(1);
expect(mockMessager.publish).toHaveBeenCalledTimes(2);
});
it('should throw an error if repository call fails', async () => {
await expect(

View File

@ -1,9 +1,8 @@
import { classes } from '@automapper/classes';
import { AutomapperModule } from '@automapper/nestjs';
import { Test, TestingModule } from '@nestjs/testing';
import { ConfigurationMessager } from '../../adapters/secondaries/configuration.messager';
import { Messager } from '../../adapters/secondaries/messager';
import { ConfigurationRepository } from '../../adapters/secondaries/configuration.repository';
import { LoggingMessager } from '../../adapters/secondaries/logging.messager';
import { UpdateConfigurationCommand } from '../../commands/update-configuration.command';
import { Domain } from '../../domain/dtos/domain.enum';
import { UpdateConfigurationRequest } from '../../domain/dtos/update-configuration.request';
@ -38,11 +37,7 @@ const mockConfigurationRepository = {
}),
};
const mockConfigurationMessager = {
publish: jest.fn().mockImplementation(),
};
const mockLoggingMessager = {
const mockMessager = {
publish: jest.fn().mockImplementation(),
};
@ -60,12 +55,8 @@ describe('UpdateConfigurationUseCase', () => {
UpdateConfigurationUseCase,
ConfigurationProfile,
{
provide: ConfigurationMessager,
useValue: mockConfigurationMessager,
},
{
provide: LoggingMessager,
useValue: mockLoggingMessager,
provide: Messager,
useValue: mockMessager,
},
],
}).compile();
@ -81,12 +72,12 @@ describe('UpdateConfigurationUseCase', () => {
describe('execute', () => {
it('should update a configuration value', async () => {
jest.spyOn(mockConfigurationMessager, 'publish');
jest.spyOn(mockMessager, 'publish');
const updatedConfiguration: Configuration =
await updateConfigurationUseCase.execute(updateConfigurationCommand);
expect(updatedConfiguration.value).toBe(updateConfigurationRequest.value);
expect(mockConfigurationMessager.publish).toHaveBeenCalledTimes(1);
expect(mockMessager.publish).toHaveBeenCalledTimes(2);
});
it('should throw an error if configuration does not exist', async () => {
await expect(