Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 139 additions & 61 deletions src/api/integrations/chatbot/chatwoot/services/chatwoot.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ export class ChatwootService {
private readonly cache: CacheService,
) {}

private pgClient = postgresClient.getChatwootConnection();
private async getPgClient() {
return postgresClient.getChatwootConnection();
}

private async getProvider(instance: InstanceDto): Promise<ChatwootModel | null> {
const cacheKey = `${instance.instanceName}:getProvider`;
Expand Down Expand Up @@ -382,7 +384,8 @@ export class ChatwootService {
if (!uri) return false;

const sqlTags = `SELECT id, taggings_count FROM tags WHERE name = $1 LIMIT 1`;
const tagData = (await this.pgClient.query(sqlTags, [nameInbox]))?.rows[0];
const pgClient = await this.getPgClient();
const tagData = (await pgClient.query(sqlTags, [nameInbox]))?.rows[0];
let tagId = tagData?.id;
const taggingsCount = tagData?.taggings_count || 0;

Expand All @@ -392,18 +395,18 @@ export class ChatwootService {
DO UPDATE SET taggings_count = tags.taggings_count + 1
RETURNING id`;

tagId = (await this.pgClient.query(sqlTag, [nameInbox, taggingsCount + 1]))?.rows[0]?.id;
tagId = (await pgClient.query(sqlTag, [nameInbox, taggingsCount + 1]))?.rows[0]?.id;

const sqlCheckTagging = `SELECT 1 FROM taggings
WHERE tag_id = $1 AND taggable_type = 'Contact' AND taggable_id = $2 AND context = 'labels' LIMIT 1`;

const taggingExists = (await this.pgClient.query(sqlCheckTagging, [tagId, contactId]))?.rowCount > 0;
const taggingExists = (await pgClient.query(sqlCheckTagging, [tagId, contactId]))?.rowCount > 0;

if (!taggingExists) {
const sqlInsertLabel = `INSERT INTO taggings (tag_id, taggable_type, taggable_id, context, created_at)
VALUES ($1, 'Contact', $2, 'labels', NOW())`;

await this.pgClient.query(sqlInsertLabel, [tagId, contactId]);
await pgClient.query(sqlInsertLabel, [tagId, contactId]);
}

return true;
Expand Down Expand Up @@ -861,6 +864,7 @@ export class ChatwootService {
messageBody?: any,
sourceId?: string,
quotedMsg?: MessageModel,
messageBodyForRetry?: any,
) {
const client = await this.clientCw(instance);

Expand All @@ -869,32 +873,66 @@ export class ChatwootService {
return null;
}

const replyToIds = await this.getReplyToIds(messageBody, instance);
const doCreateMessage = async (convId: number) => {
const replyToIds = await this.getReplyToIds(messageBody, instance);

const sourceReplyId = quotedMsg?.chatwootMessageId || null;
const sourceReplyId = quotedMsg?.chatwootMessageId || null;

const message = await client.messages.create({
accountId: this.provider.accountId,
conversationId: conversationId,
data: {
content: content,
message_type: messageType,
attachments: attachments,
private: privateMessage || false,
source_id: sourceId,
content_attributes: {
...replyToIds,
const message = await client.messages.create({
accountId: this.provider.accountId,
conversationId: convId,
data: {
content: content,
message_type: messageType,
attachments: attachments,
private: privateMessage || false,
source_id: sourceId,
content_attributes: {
...replyToIds,
},
source_reply_id: sourceReplyId ? sourceReplyId.toString() : null,
},
source_reply_id: sourceReplyId ? sourceReplyId.toString() : null,
},
});
});

if (!message) {
this.logger.warn('message not found');
return null;
}
if (!message) {
this.logger.warn('message not found');
return null;
}

return message;
return message;
};

try {
return await doCreateMessage(conversationId);
} catch (error) {
const errorMessage = error.toString().toLowerCase();
const status = error.response?.status;
if (errorMessage.includes('not found') || status === 404) {
this.logger.warn(`Conversation ${conversationId} not found. Retrying...`);
const bodyForRetry = messageBodyForRetry || messageBody;

if (!bodyForRetry) {
this.logger.error('Cannot retry createMessage without a message body for context.');
return null;
}

const remoteJid = bodyForRetry.key.remoteJid;
const cacheKey = `${instance.instanceName}:createConversation-${remoteJid}`;
await this.cache.delete(cacheKey);

const newConversationId = await this.createConversation(instance, bodyForRetry);
if (!newConversationId) {
this.logger.error(`Failed to create new conversation for ${remoteJid}`);
return null;
}

this.logger.log(`Retrying message creation for ${remoteJid} with new conversation ${newConversationId}`);
return await doCreateMessage(newConversationId);
} else {
this.logger.error(`Error creating message: ${error}`);
throw error;
}
}
}

public async getOpenConversationByContact(
Expand Down Expand Up @@ -987,6 +1025,7 @@ export class ChatwootService {
messageBody?: any,
sourceId?: string,
quotedMsg?: MessageModel,
messageBodyForRetry?: any,
) {
if (sourceId && this.isImportHistoryAvailable()) {
const messageAlreadySaved = await chatwootImport.getExistingSourceIds([sourceId], conversationId);
Expand All @@ -997,54 +1036,84 @@ export class ChatwootService {
}
}
}
const data = new FormData();
const doSendData = async (convId: number) => {
const data = new FormData();

if (content) {
data.append('content', content);
}
if (content) {
data.append('content', content);
}

data.append('message_type', messageType);
data.append('message_type', messageType);

data.append('attachments[]', fileStream, { filename: fileName });
data.append('attachments[]', fileStream, { filename: fileName });

const sourceReplyId = quotedMsg?.chatwootMessageId || null;
const sourceReplyId = quotedMsg?.chatwootMessageId || null;

if (messageBody && instance) {
const replyToIds = await this.getReplyToIds(messageBody, instance);
if (messageBody && instance) {
const replyToIds = await this.getReplyToIds(messageBody, instance);

if (replyToIds.in_reply_to || replyToIds.in_reply_to_external_id) {
const content = JSON.stringify({
...replyToIds,
});
data.append('content_attributes', content);
if (replyToIds.in_reply_to || replyToIds.in_reply_to_external_id) {
const content = JSON.stringify({
...replyToIds,
});
data.append('content_attributes', content);
}
}
}

if (sourceReplyId) {
data.append('source_reply_id', sourceReplyId.toString());
}
if (sourceReplyId) {
data.append('source_reply_id', sourceReplyId.toString());
}

if (sourceId) {
data.append('source_id', sourceId);
}
if (sourceId) {
data.append('source_id', sourceId);
}

const config = {
method: 'post',
maxBodyLength: Infinity,
url: `${this.provider.url}/api/v1/accounts/${this.provider.accountId}/conversations/${conversationId}/messages`,
headers: {
api_access_token: this.provider.token,
...data.getHeaders(),
},
data: data,
const config = {
method: 'post',
maxBodyLength: Infinity,
url: `${this.provider.url}/api/v1/accounts/${this.provider.accountId}/conversations/${convId}/messages`,
headers: {
api_access_token: this.provider.token,
...data.getHeaders(),
},
data: data,
};

const { data: responseData } = await axios.request(config);
return responseData;
};

try {
const { data } = await axios.request(config);

return data;
return await doSendData(conversationId);
} catch (error) {
this.logger.error(error);
const errorMessage = error.toString().toLowerCase();
const status = error.response?.status;

if (errorMessage.includes('not found') || status === 404) {
this.logger.warn(`Conversation ${conversationId} not found. Retrying...`);
const bodyForRetry = messageBodyForRetry || messageBody;

if (!bodyForRetry) {
this.logger.error('Cannot retry sendData without a message body for context.');
return null;
}

const remoteJid = bodyForRetry.key.remoteJid;
const cacheKey = `${instance.instanceName}:createConversation-${remoteJid}`;
await this.cache.delete(cacheKey);

const newConversationId = await this.createConversation(instance, bodyForRetry);
if (!newConversationId) {
this.logger.error(`Failed to create new conversation for ${remoteJid}`);
return null;
}

this.logger.log(`Retrying sendData for ${remoteJid} with new conversation ${newConversationId}`);
return await doSendData(newConversationId);
} else {
this.logger.error(error);
return null;
}
}
}

Expand Down Expand Up @@ -2032,6 +2101,7 @@ export class ChatwootService {
body,
'WAID:' + body.key.id,
quotedMsg,
null,
);

if (!send) {
Expand All @@ -2051,6 +2121,7 @@ export class ChatwootService {
body,
'WAID:' + body.key.id,
quotedMsg,
null,
);

if (!send) {
Expand All @@ -2076,6 +2147,7 @@ export class ChatwootService {
},
'WAID:' + body.key.id,
quotedMsg,
body,
);
if (!send) {
this.logger.warn('message not sent');
Expand Down Expand Up @@ -2132,6 +2204,8 @@ export class ChatwootService {
instance,
body,
'WAID:' + body.key.id,
quotedMsg,
null,
);

if (!send) {
Expand Down Expand Up @@ -2173,6 +2247,7 @@ export class ChatwootService {
body,
'WAID:' + body.key.id,
quotedMsg,
null,
);

if (!send) {
Expand All @@ -2192,6 +2267,7 @@ export class ChatwootService {
body,
'WAID:' + body.key.id,
quotedMsg,
null,
);

if (!send) {
Expand Down Expand Up @@ -2262,6 +2338,7 @@ export class ChatwootService {
},
'WAID:' + body.key.id,
null,
body,
);
if (!send) {
this.logger.warn('edited message not sent');
Expand Down Expand Up @@ -2515,7 +2592,8 @@ export class ChatwootService {
and created_at >= now() - interval '6h'
order by created_at desc`;

const messagesData = (await this.pgClient.query(sqlMessages))?.rows;
const pgClient = await this.getPgClient();
const messagesData = (await pgClient.query(sqlMessages))?.rows;
const ids: string[] = messagesData
.filter((message) => !!message.source_id)
.map((message) => message.source_id.replace('WAID:', ''));
Expand Down