Skip to content

Commit 922ab25

Browse files
committed
Refactor message processing in QueueManager
- Removed deprecated methods for handling delete and scrape operations. - Introduced a new `processMsg2` method to handle function execution based on message topics. - Enhanced message handling with dynamic function execution and improved listener management.
1 parent 0d6a650 commit 922ab25

1 file changed

Lines changed: 84 additions & 80 deletions

File tree

utils/jobs-worker-queue-manager/queueManagerClassV2.js

Lines changed: 84 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
var amqp = require('amqplib/callback_api');
22
const winston = require('../../config/winston');
3-
const { TiledeskWorker } = require('../tiledesk/TiledeskWorker')
4-
const { StatusManager } = require('../tiledesk/StatusManager');
5-
const { ContentManager } = require('../tiledesk/ContentManager');
63

74
var listeners = [];
85

@@ -237,87 +234,94 @@ class QueueManager {
237234
});
238235
}
239236

240-
/**
241-
* Removes indexed content for the payload. Messages are consumed from the dedicated delete queue
242-
* (deleteTopic / TRAIN_DELETE_QUEUE); publish to the same exchange with routing key deleteRoutingKey.
243-
*/
244-
async processDeleteMsg(msg, ch) {
237+
processMsg2(msg) {
245238

239+
// console.log("processMsg2:", msg);
240+
246241
const message_string = msg.content.toString();
247-
let fdata = JSON.parse(message_string);
248-
let source = fdata.payload;
249-
250-
winston.debug("Delete job payload: ", source);
251-
252-
const tiledesk_worker = new TiledeskWorker({ gptkey: null, interval: null });
253-
const status_manager = new StatusManager();
254-
const content_manager = new ContentManager();
255-
256-
await status_manager.changeStatus(source.id, 500);
257-
258-
tiledesk_worker.deleteFromIndex(source, async (err, response) => {
259-
if (err) {
260-
winston.error("Error on delete from index: " + err);
261-
let error_message = err.response?.data?.error || "An unexpected error occurred";
262-
status_manager.changeStatus(source.id, 300, error_message).then((updateResponse) => {
263-
winston.verbose("changeStatus response: ", updateResponse);
264-
ch.ack(msg);
265-
}).catch((err) => {
266-
winston.error("changeStatus error: ", err);
267-
ch.ack(msg);
268-
});
269-
} else {
270-
content_manager.deleteContent(source.id).then((deleteResponse) => {
271-
winston.verbose("deleteContent response: ", deleteResponse);
272-
ch.ack(msg);
273-
}).catch((err) => {
274-
winston.error("deleteContent error: ", err);
275-
ch.ack(msg);
276-
});
242+
// console.log("processMsg2.1:", msg);
243+
244+
const topic = msg.fields.routingKey //.replace(/[.]/g, '/');
245+
// console.log("processMsg2.2:", msg);
246+
247+
// if (this.debug) {console.log("Got msg topic:" + topic);} //this is undefined in this method
248+
// console.log("Got msg topic:" + topic);
249+
250+
// if (this.debug) {console.log("Got msg1:"+ message_string + " topic:" + topic);}
251+
// console.log("Got msg1:"+ message_string + " topic:" + topic);
252+
253+
if (topic === 'functions') {
254+
// if (this.debug) {console.log("Got msg2:"+ JSON.stringify(message_string) + " topic:" + topic);}
255+
// console.log("Got msg2:"+ JSON.stringify(message_string) + " topic:" + topic);
256+
257+
var fdata = JSON.parse(message_string)
258+
259+
// if (this.debug) {console.log("Got msg3:"+ fdata.function + " fdata.function:", fdata.payload);}
260+
261+
262+
263+
/*
264+
265+
// var fields = Object.keys(fdata.payload).map((key) => [key, fdata.payload[key]]);
266+
267+
// var fields = Object.keys(fdata.payload)
268+
269+
// if (this.debug) {console.log("Got fields:"+ fields );
270+
271+
// eval(fdata.function)
272+
273+
*/
274+
275+
if (fdata.function) {
276+
var fn = new Function("payload", fdata.function);
277+
278+
// if (this.debug) {console.log("Got fn:"+ fn);}
279+
280+
/*
281+
// var fn = new Function(fields, fdata.function);
282+
283+
// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Function/Function
284+
// var fn = new Function("name",'if (this.debug) {console.log("ciao: " + name);');
285+
// fn("andrea")
286+
287+
// var dataArray = Object.keys(fdata.payload).map(function(k){return fdata.payload[k]});
288+
// if (this.debug) {console.log("Got dataArray:", dataArray );
289+
290+
// fn(dataArray);
291+
*/
292+
293+
294+
var ret = fn(fdata.payload)
295+
// if (this.debug) {console.log("Got ret:"+ ret);}
296+
// console.log("Got ret:"+ ret);
297+
277298
}
278-
});
279-
280-
}
281-
282-
/**
283-
* Specific processMsg function for Scrape operation
284-
*/
285-
async processMsg3(msg, ch) {
286-
287-
const message_string = msg.content.toString();
288-
let fdata = JSON.parse(message_string);
289-
let source = fdata.payload
290-
291-
winston.debug("Source: ", source)
292-
// console.log("fdata.payload.resources[0]: ", fdata.payload.resources[0]);
293-
294-
const tiledesk_worker = new TiledeskWorker({ gptkey: null, interval: null });
295-
const status_manager = new StatusManager();
296-
297-
await status_manager.changeStatus(source.id, 200);
298-
299-
tiledesk_worker.train(source, async (err, response) => {
300-
if (err) {
301-
winston.error("Error on train: " + err)
302-
let error_message = err.response?.data?.error || "An unexpected error occurred";
303-
status_manager.changeStatus(source.id, 400, error_message).then((updateResponse) => {
304-
winston.verbose("changeStatus response: ", updateResponse)
305-
ch.ack(msg);
306-
}).catch((err) => {
307-
winston.error("changeStatus error: ", err)
308-
ch.ack(msg);
309-
})
310-
} else {
311-
status_manager.changeStatus(response.id, response.status).then((updateResponse) => {
312-
winston.verbose("changeStatus response: ", updateResponse)
313-
ch.ack(msg);
314-
}).catch((err) => {
315-
winston.error("changeStatus error: ", err)
316-
ch.ack(msg);
317-
})
299+
300+
// else {
301+
// console.log("no function found");
302+
// }
303+
304+
305+
}
306+
307+
// if (topic === 'subscription_run') {
308+
// if (this.debug) {console.log("here topic:" + topic);
309+
// // requestEvent.emit('request.create.queue', msg.content);
310+
// subscriptionEvent.emit('subscription.run.queue', JSON.parse(message_string));
311+
// }
312+
313+
314+
// serve?
315+
// if (this.debug) {console.log("listeners.length:" + listeners.length);}
316+
317+
if (listeners && listeners.length>0) {
318+
for( var i = 0; i< listeners.length; i++) {
319+
// if (this.debug) {console.log("listeners[i]:" + listeners[i]);}
320+
listeners[i](fdata);
318321
}
319-
});
320-
322+
}
323+
324+
// if (this.debug) {console.log("listeners", this.listeners);
321325
}
322326

323327
closeOnErr(err) {

0 commit comments

Comments
 (0)