Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
SERVER_NAME=evolution
SERVER_TYPE=http
SERVER_PORT=8080
# Server URL - Set your application url
Expand Down Expand Up @@ -96,6 +97,35 @@ SQS_SECRET_ACCESS_KEY=
SQS_ACCOUNT_ID=
SQS_REGION=

SQS_GLOBAL_ENABLED=false
SQS_GLOBAL_APPLICATION_STARTUP=false
SQS_GLOBAL_CALL=false
SQS_GLOBAL_CHATS_DELETE=false
SQS_GLOBAL_CHATS_SET=false
SQS_GLOBAL_CHATS_UPDATE=false
SQS_GLOBAL_CHATS_UPSERT=false
SQS_GLOBAL_CONNECTION_UPDATE=false
SQS_GLOBAL_CONTACTS_SET=false
SQS_GLOBAL_CONTACTS_UPDATE=false
SQS_GLOBAL_CONTACTS_UPSERT=false
SQS_GLOBAL_GROUP_PARTICIPANTS_UPDATE=false
SQS_GLOBAL_GROUP_UPDATE=false
SQS_GLOBAL_GROUPS_UPSERT=false
SQS_GLOBAL_LABELS_ASSOCIATION=false
SQS_GLOBAL_LABELS_EDIT=false
SQS_GLOBAL_LOGOUT_INSTANCE=false
SQS_GLOBAL_MESSAGES_DELETE=false
SQS_GLOBAL_MESSAGES_EDITED=false
SQS_GLOBAL_MESSAGES_SET=false
SQS_GLOBAL_MESSAGES_UPDATE=false
SQS_GLOBAL_MESSAGES_UPSERT=false
SQS_GLOBAL_PRESENCE_UPDATE=false
SQS_GLOBAL_QRCODE_UPDATED=false
SQS_GLOBAL_REMOVE_INSTANCE=false
SQS_GLOBAL_SEND_MESSAGE=false
SQS_GLOBAL_TYPEBOT_CHANGE_STATUS=false
SQS_GLOBAL_TYPEBOT_START=false

# Websocket - Environment variables
WEBSOCKET_ENABLED=false
WEBSOCKET_GLOBAL_EVENTS=false
Expand Down
189 changes: 122 additions & 67 deletions src/api/integrations/event/sqs/sqs.controller.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import { PrismaRepository } from '@api/repository/repository.service';
import { WAMonitoringService } from '@api/services/monitor.service';
import { CreateQueueCommand, DeleteQueueCommand, ListQueuesCommand, SQS } from '@aws-sdk/client-sqs';
import { configService, Log, Sqs } from '@config/env.config';
import { configService, Log, HttpServer, Sqs, S3 } from '@config/env.config';
import { Logger } from '@config/logger.config';
import * as s3Service from '@api/integrations/storage/s3/libs/minio.server';
import { join } from 'path';

import { EmitData, EventController, EventControllerInterface } from '../event.controller';
import { EventDto } from '../event.dto';
Expand All @@ -20,7 +22,7 @@ export class SqsController extends EventController implements EventControllerInt
return;
}

new Promise<void>((resolve) => {
new Promise<void>(async (resolve) => {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: Using an async executor in a Promise is discouraged.

Refactor to avoid passing an async function to the Promise constructor, as this may cause unhandled rejections. Use an async function directly or restructure the logic.

const awsConfig = configService.get<Sqs>('SQS');

this.sqs = new SQS({
Expand All @@ -34,6 +36,12 @@ export class SqsController extends EventController implements EventControllerInt

this.logger.info('SQS initialized');

const sqsConfig = configService.get<Sqs>('SQS');
if (this.sqs && sqsConfig.GLOBAL_ENABLED) {
const sqsEvents = Object.keys(sqsConfig.EVENTS).filter(e => sqsConfig.EVENTS[e]);
await this.saveQueues(sqsConfig.GLOBAL_PREFIX_NAME, sqsEvents, true);
}

resolve();
});
}
Expand All @@ -47,7 +55,7 @@ export class SqsController extends EventController implements EventControllerInt
}

override async set(instanceName: string, data: EventDto): Promise<any> {
if (!this.status) {
if (!this.status || configService.get<Sqs>('SQS').GLOBAL_ENABLED) {
return;
}

Expand Down Expand Up @@ -75,6 +83,7 @@ export class SqsController extends EventController implements EventControllerInt
instanceId: this.monitor.waInstances[instanceName].instanceId,
},
};

console.log('*** payload: ', payload);
return this.prisma[this.name].upsert(payload);
}
Expand All @@ -98,66 +107,108 @@ export class SqsController extends EventController implements EventControllerInt
return;
}

const instanceSqs = await this.get(instanceName);
const sqsLocal = instanceSqs?.events;
const we = event.replace(/[.-]/gm, '_').toUpperCase();

if (instanceSqs?.enabled) {
if (this.sqs) {
if (Array.isArray(sqsLocal) && sqsLocal.includes(we)) {
const eventFormatted = `${event.replace('.', '_').toLowerCase()}`;
const queueName = `${instanceName}_${eventFormatted}.fifo`;
const sqsConfig = configService.get<Sqs>('SQS');
const sqsUrl = `https://sqs.${sqsConfig.REGION}.amazonaws.com/${sqsConfig.ACCOUNT_ID}/${queueName}`;

const message = {
event,
instance: instanceName,
data,
server_url: serverUrl,
date_time: dateTime,
sender,
apikey: apiKey,
};

const params = {
MessageBody: JSON.stringify(message),
MessageGroupId: 'evolution',
MessageDeduplicationId: `${instanceName}_${eventFormatted}_${Date.now()}`,
QueueUrl: sqsUrl,
};
if (this.sqs) {
const sqsConfig = configService.get<Sqs>('SQS');

this.sqs.sendMessage(params, (err) => {
if (err) {
this.logger.error({
local: `${origin}.sendData-SQS`,
message: err?.message,
hostName: err?.hostname,
code: err?.code,
stack: err?.stack,
name: err?.name,
url: queueName,
server_url: serverUrl,
});
} else {
if (configService.get<Log>('LOG').LEVEL.includes('WEBHOOKS')) {
const logData = {
local: `${origin}.sendData-SQS`,
...message,
};

this.logger.log(logData);
}
}
const we = event.replace(/[.-]/gm, '_').toUpperCase();

let sqsEvents = [];
if (sqsConfig.GLOBAL_ENABLED) {
sqsEvents = Object.keys(sqsConfig.EVENTS).filter(e => sqsConfig.EVENTS[e]);
} else {
const instanceSqs = await this.get(instanceName);
if (instanceSqs?.enabled && Array.isArray(instanceSqs?.events)) {
sqsEvents = instanceSqs?.events;
}
}

if (Array.isArray(sqsEvents) && sqsEvents.includes(we)) {
const eventFormatted = `${event.replace('.', '_').toLowerCase()}`;
const prefixName = sqsConfig.GLOBAL_ENABLED ? sqsConfig.GLOBAL_PREFIX_NAME : instanceName;
const queueName = `${prefixName}_${eventFormatted}.fifo`;

const sqsUrl = `https://sqs.${sqsConfig.REGION}.amazonaws.com/${sqsConfig.ACCOUNT_ID}/${queueName}`;

const message = {
event,
instance: instanceName,
dataType: 'json',
data,
server: configService.get<HttpServer>('SERVER').NAME,
server_url: serverUrl,
date_time: dateTime,
sender,
apikey: apiKey,
};

const jsonStr = JSON.stringify(message);
const size = Buffer.byteLength(jsonStr, 'utf8');
if (size > sqsConfig.MAX_PAYLOAD_SIZE) {
if (!configService.get<S3>('S3').ENABLE) {
this.logger.error(`${instanceName} - ${eventFormatted} - SQS ignored: payload (${size} bytes) exceeds SQS size limit (${sqsConfig.MAX_PAYLOAD_SIZE} bytes) and S3 storage is not enabled.`);
return;
}

const buffer = Buffer.from(jsonStr, 'utf8');
const fileName = `${instanceName}_${eventFormatted}_${Date.now()}.json`;
const fullName = join(
'messages',
fileName
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (javascript.lang.security.audit.path-traversal.path-join-resolve-traversal): Detected possible user input going into a path.join or path.resolve function. This could possibly lead to a path traversal vulnerability, where the attacker can access arbitrary files stored in the file system. Instead, be sure to sanitize or validate user input first.

Source: opengrep

);

await s3Service.uploadFile(fullName, buffer, size, {
'Content-Type': 'application/json',
'Cache-Control': 'no-store'
});

const fileUrl = await s3Service.getObjectUrl(fullName);

message.data = { fileUrl };
message.dataType = 's3';
}

const isGlobalEnabled = configService.get<Sqs>('SQS').GLOBAL_ENABLED;
const params = {
MessageBody: JSON.stringify(message),
MessageGroupId: 'evolution',
QueueUrl: sqsUrl,
...(!isGlobalEnabled && {
MessageDeduplicationId: `${instanceName}_${eventFormatted}_${Date.now()}`,
}),
};

this.sqs.sendMessage(params, (err) => {
if (err) {
this.logger.error({
local: `${origin}.sendData-SQS`,
params: JSON.stringify(message),
sqsUrl: sqsUrl,
message: err?.message,
hostName: err?.hostname,
code: err?.code,
stack: err?.stack,
name: err?.name,
url: queueName,
server_url: serverUrl,
});
} else {
if (configService.get<Log>('LOG').LEVEL.includes('WEBHOOKS')) {
const logData = {
local: `${origin}.sendData-SQS`,
...message,
};

this.logger.log(logData);
}
}
Comment on lines +181 to +196
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): Merge else clause's nested if statement into else if (merge-else-if)

Suggested change
if (err) {
this.logger.error({
local: `${origin}.sendData-SQS`,
params: JSON.stringify(message),
sqsUrl: sqsUrl,
message: err?.message,
hostName: err?.hostname,
code: err?.code,
stack: err?.stack,
name: err?.name,
url: queueName,
server_url: serverUrl,
});
} else {
if (configService.get<Log>('LOG').LEVEL.includes('WEBHOOKS')) {
const logData = {
local: `${origin}.sendData-SQS`,
...message,
};
this.logger.log(logData);
}
}
if (err) {
this.logger.error({
local: `${origin}.sendData-SQS`,
params: JSON.stringify(message),
sqsUrl: sqsUrl,
message: err?.message,
hostName: err?.hostname,
code: err?.code,
stack: err?.stack,
name: err?.name,
url: queueName,
server_url: serverUrl,
});
}
else if (configService.get<Log>('LOG').LEVEL.includes('WEBHOOKS')) {
const logData = {
local: `${origin}.sendData-SQS`,
...message,
};
this.logger.log(logData);
}


ExplanationFlattening if statements nested within else clauses generates code that is
easier to read and expand upon.

});
}
}
}

private async saveQueues(instanceName: string, events: string[], enable: boolean) {
private async saveQueues(prefixName: string, events: string[], enable: boolean) {
if (enable) {
const eventsFinded = await this.listQueuesByInstance(instanceName);
const eventsFinded = await this.listQueues(prefixName);
console.log('eventsFinded', eventsFinded);

for (const event of events) {
Expand All @@ -168,15 +219,17 @@ export class SqsController extends EventController implements EventControllerInt
continue;
}

const queueName = `${instanceName}_${normalizedEvent}.fifo`;

const queueName = `${prefixName}_${normalizedEvent}.fifo`;
try {
const isGlobalEnabled = configService.get<Sqs>('SQS').GLOBAL_ENABLED;
const createCommand = new CreateQueueCommand({
QueueName: queueName,
Attributes: {
FifoQueue: 'true',
...(isGlobalEnabled && { ContentBasedDeduplication: 'true' }),
},
});

const data = await this.sqs.send(createCommand);
this.logger.info(`Queue ${queueName} criada: ${data.QueueUrl}`);
} catch (err: any) {
Expand All @@ -186,12 +239,14 @@ export class SqsController extends EventController implements EventControllerInt
}
}

private async listQueuesByInstance(instanceName: string) {
private async listQueues(prefixName: string) {
let existingQueues: string[] = [];

try {
const listCommand = new ListQueuesCommand({
QueueNamePrefix: `${instanceName}_`,
let listCommand = new ListQueuesCommand({
QueueNamePrefix: `${prefixName}_`,
});

const listData = await this.sqs.send(listCommand);
if (listData.QueueUrls && listData.QueueUrls.length > 0) {
// Extrai o nome da fila a partir da URL
Expand All @@ -201,32 +256,32 @@ export class SqsController extends EventController implements EventControllerInt
});
}
} catch (error: any) {
this.logger.error(`Erro ao listar filas para a instância ${instanceName}: ${error.message}`);
this.logger.error(`Erro ao listar filas para ${prefixName}: ${error.message}`);
return;
}

// Mapeia os eventos já existentes nas filas: remove o prefixo e o sufixo ".fifo"
return existingQueues
.map((queueName) => {
// Espera-se que o nome seja `${instanceName}_${event}.fifo`
if (queueName.startsWith(`${instanceName}_`) && queueName.endsWith('.fifo')) {
return queueName.substring(instanceName.length + 1, queueName.length - 5).toLowerCase();
if (queueName.startsWith(`${prefixName}_`) && queueName.endsWith('.fifo')) {
return queueName.substring(prefixName.length + 1, queueName.length - 5).toLowerCase();
}
return '';
})
.filter((event) => event !== '');
}

// Para uma futura feature de exclusão forçada das queues
private async removeQueuesByInstance(instanceName: string) {
private async removeQueuesByInstance(prefixName: string) {
try {
const listCommand = new ListQueuesCommand({
QueueNamePrefix: `${instanceName}_`,
QueueNamePrefix: `${prefixName}_`,
});
const listData = await this.sqs.send(listCommand);

if (!listData.QueueUrls || listData.QueueUrls.length === 0) {
this.logger.info(`No queues found for instance ${instanceName}`);
this.logger.info(`No queues found for ${prefixName}`);
return;
}

Expand All @@ -240,7 +295,7 @@ export class SqsController extends EventController implements EventControllerInt
}
}
} catch (err: any) {
this.logger.error(`Error listing queues for instance ${instanceName}: ${err.message}`);
this.logger.error(`Error listing queues for ${prefixName}: ${err.message}`);
}
}
}
Loading