Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 231 additions & 0 deletions convertors/move-timestamp-out-of-payload.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
require('dotenv').config();
const { MongoClient } = require('mongodb');

/**
* Limit for one time documents selection, used to reduce the load on the database
*/
const documentsSelectionLimit = 10000;

/**

Check warning on line 9 in convertors/move-timestamp-out-of-payload.js

View workflow job for this annotation

GitHub Actions / ESlint

Missing JSDoc @returns declaration
* @param db - mongo db instance

Check warning on line 10 in convertors/move-timestamp-out-of-payload.js

View workflow job for this annotation

GitHub Actions / ESlint

Missing JSDoc @param "db" type
* @param collectionName - name of the collection to be updated

Check warning on line 11 in convertors/move-timestamp-out-of-payload.js

View workflow job for this annotation

GitHub Actions / ESlint

Missing JSDoc @param "collectionName" type
*/
async function movePayloadTimestampToEventLevel(db, collectionName) {
const collection = db.collection(collectionName);

const docsToUpdate = collection.find(
{ timestamp: { $exists: false } },
{ projection: { _id: 1, 'payload.timestamp': 1 } }
).limit(documentsSelectionLimit);

const batchedOps = [];

let currentCount = 0;

for await (const doc of docsToUpdate) {
process.stdout.write(`\r${currentCount} documents added to batch`);

if (!doc.payload.timestamp) {
continue;
}

batchedOps.push({
updateOne: {
filter: { _id: doc._id },
update: {
$set: { timestamp: Number(doc.payload.timestamp)},
$unset: {'payload.timestamp': ''},
}
}
})

currentCount++;
}

if (currentCount > 0) {
await collection.bulkWrite(batchedOps);
}

return currentCount
}
/**
* @param db - mongo db instance
* @param repetitionCollectionName - repetitions collection to be updated
* @param projectId - project id of current repetitions collection
*/
async function backfillTimestampsFromEvents(db, repetitionCollectionName, projectId) {

Check warning on line 56 in convertors/move-timestamp-out-of-payload.js

View workflow job for this annotation

GitHub Actions / ESlint

Missing JSDoc @returns declaration
const repetitions = db.collection(repetitionCollectionName);

Check warning on line 57 in convertors/move-timestamp-out-of-payload.js

View workflow job for this annotation

GitHub Actions / ESlint

Missing JSDoc @param "db" type
const events = db.collection(`events:${projectId}`);

Check warning on line 58 in convertors/move-timestamp-out-of-payload.js

View workflow job for this annotation

GitHub Actions / ESlint

Missing JSDoc @param "repetitionCollectionName" type

Check warning on line 59 in convertors/move-timestamp-out-of-payload.js

View workflow job for this annotation

GitHub Actions / ESlint

Missing JSDoc @param "projectId" type
let bulkOps = [];
let repetitionCount = 1;

const repetitionsList = await repetitions.find(
{
timestamp: { $exists: false },
},
{ projection: { _id: 1, groupHash: 1 } }
).limit(documentsSelectionLimit).toArray();

const groupHashList = [];

for (const repetition of repetitionsList) {
process.stdout.write(`\r[${repetitionCount} repetition] update with timestamp now have [${bulkOps.length + 1}] ops in bulkOps`);
groupHashList.push(repetition.groupHash);
repetitionCount++;
}

const relatedEvents = await events.find(
{ groupHash: { $in: groupHashList } },
{ projection: { timestamp: 1, groupHash: 1 } }
).toArray();

const relatedEventsMap = new Map()

relatedEvents.forEach(e => {
relatedEventsMap.set(e.groupHash, e);
})

for (const repetition of repetitionsList) {
const relatedEvent = relatedEventsMap.get(repetition.groupHash);

if (!relatedEvent) {
bulkOps.push({
deleteOne: {
filter: { _id: repetition._id }
}
})
} else if (relatedEvent?.timestamp !== null) {
bulkOps.push({
updateOne: {
filter: { _id: repetition._id },
update: { $set: { timestamp: Number(relatedEvent.timestamp) } },
},
});
}
}

let processed = 0;

if (bulkOps.length > 0) {
const result = await repetitions.bulkWrite(bulkOps);
const updated = result.modifiedCount;
const deleted = result.deletedCount;
processed = bulkOps.length;
console.log(` updates (${processed} processed, ${updated} updated, ${deleted} deleted)`);

if (updated + deleted === 0) {
repetitionCollectionsToCheck.filter(collection => collection !== repetition)
}
}

return processed;
}

/**
* Method that runs convertor script
*/
async function run() {
const fullUri = 'mongodb://hawk_new:evieg9bauK0ahs2youhoh7aer7kohT@rc1d-2jltinutse1eadfs.mdb.yandexcloud.net:27018/hawk_events?authSource=admin&replicaSet=rs01&tls=true&tlsInsecure=true';

// Parse the Mongo URL manually
const mongoUrl = new URL(fullUri);
const databaseName = 'hawk_events';

// Extract query parameters

Check warning on line 135 in convertors/move-timestamp-out-of-payload.js

View workflow job for this annotation

GitHub Actions / ESlint

'repetition' is not defined

Check warning on line 135 in convertors/move-timestamp-out-of-payload.js

View workflow job for this annotation

GitHub Actions / ESlint

'repetitionCollectionsToCheck' is not defined
const queryParams = Object.fromEntries(mongoUrl.searchParams.entries());

// Compose connection options manually
const options = {
useNewUrlParser: true,
useUnifiedTopology: true,
authSource: queryParams.authSource || 'admin',
replicaSet: queryParams.replicaSet || undefined,
tls: queryParams.tls === 'true',
tlsInsecure: queryParams.tlsInsecure === 'true',
// connectTimeoutMS: 3600000,
// socketTimeoutMS: 3600000,
};

// Remove query string from URI
mongoUrl.search = '';
const cleanUri = mongoUrl.toString();

console.log('Connecting to:', cleanUri);
console.log('With options:', options);

const client = new MongoClient(cleanUri, options);

await client.connect();
const db = client.db(databaseName);

console.log(`Connected to database: ${databaseName}`);

const collections = await db.listCollections({}, {
authorizedCollections: true,
nameOnly: true,
}).toArray();

let eventCollectionsToCheck = collections.filter(col => /^events:/.test(col.name)).map(col => col.name);
let repetitionCollectionsToCheck = collections.filter(col => /^repetitions:/.test(col.name)).map(col => col.name);

console.log(`Found ${eventCollectionsToCheck.length} event collections.`);
console.log(`Found ${repetitionCollectionsToCheck.length} repetition collections.`);

// Convert events
let i = 1;
let documentsUpdatedCount = 1

while (documentsUpdatedCount != 0) {
documentsUpdatedCount = 0;
i = 1;
const collectionsToUpdateCount = eventCollectionsToCheck.length;

for (const collectionName of eventCollectionsToCheck) {
console.log(`[${i}/${collectionsToUpdateCount}] Processing ${collectionName}`);
const updated = await movePayloadTimestampToEventLevel(db, collectionName);

if (updated === 0) {
eventCollectionsToCheck = eventCollectionsToCheck.filter(collection => collection !== collectionName);
}

documentsUpdatedCount += updated
i++;
}
}

// Convert repetitions + backfill from events
documentsUpdatedCount = 1;

while (documentsUpdatedCount != 0) {
documentsUpdatedCount = 0;
i = 1;
const collectionsToUpdateCount = repetitionCollectionsToCheck.length;

for (const collectionName of repetitionCollectionsToCheck) {
console.log(`[${i}/${collectionsToUpdateCount}] Processing ${collectionName}`);
const projectId = collectionName.split(':')[1];

let updated = 0;

updated += await movePayloadTimestampToEventLevel(db, collectionName);
updated += await backfillTimestampsFromEvents(db, collectionName, projectId);

if (updated === 0) {
repetitionCollectionsToCheck = repetitionCollectionsToCheck.filter(collection => collection !== collectionName);
}

documentsUpdatedCount += updated;
i++;
}

console.log(`Conversion iteration complete. ${documentsUpdatedCount} documents updated`);
}

await client.close();
}

run().catch(err => {
console.error('❌ Script failed:', err);
process.exit(1);
});
25 changes: 13 additions & 12 deletions lib/event-worker.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Worker } from './worker';
import * as WorkerNames from './workerNames';
import { GroupWorkerTask } from 'hawk-worker-grouper/types/group-worker-task';
import { EventWorkerTask } from './types/event-worker-task';
import { CatcherMessageType, CatcherMessagePayload, CatcherMessageAccepted, ErrorsCatcherType } from '@hawk.so/types';

/**
* Defines a Worker that handles events from Catcher.
Expand All @@ -12,30 +12,31 @@ export abstract class EventWorker extends Worker {
* Worker type (will pull tasks from Registry queue with the same name)
* 'errors/nodejs' for example
*/
public type = '';
public type: ErrorsCatcherType;

/**
* Message handle function
*
* @param {EventWorkerTask} event - event to handle
* @param {CatcherMessageAccepted<CatcherMessageType>} task - worker task to handle
*/
public async handle(event: EventWorkerTask): Promise<void> {
this.validate(event);
public async handle(task: CatcherMessageAccepted<CatcherMessageType>): Promise<void> {
this.validate(task);

await this.addTask(WorkerNames.GROUPER, {
projectId: event.projectId,
catcherType: this.type,
event: event.payload,
} as GroupWorkerTask);
projectId: task.projectId,
catcherType: this.type as CatcherMessageType,
payload: task.payload as CatcherMessagePayload<ErrorsCatcherType>,
timestamp: task.timestamp,
} as GroupWorkerTask<ErrorsCatcherType>);
}

/**
* Validate passed event data
*
* @param {EventWorkerTask} event - event to be validated
* @param {CatcherMessageAccepted<CatcherMessageType>} task - task to be validated
*/
protected validate(event: EventWorkerTask): void {
if (!event.projectId || !event.payload) {
protected validate(task: CatcherMessageAccepted<CatcherMessageType>): void {
if (!task.projectId || !task.payload || !task.timestamp) {
throw new Error('Bad data was given');
}
}
Expand Down
22 changes: 0 additions & 22 deletions lib/types/event-worker-task.d.ts

This file was deleted.

2 changes: 1 addition & 1 deletion lib/types/worker-task.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
* When you create new type of worker, describe its task's structure with interface inherited from WorkerTask
*/
export interface WorkerTask {
}
}
8 changes: 8 additions & 0 deletions lib/utils/unsafeFields.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ export function encodeUnsafeFields(event: GroupedEventDBScheme | RepetitionDBSch
* Repetition includes delta field, grouped event includes payload
*/
if ('delta' in event) {
/**
* We need to check if delta field exists but with undefined value
* It would mean that repetition payload is same with original event paylaod
*/
if (event.delta === undefined) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment pls

return;
}

fieldValue = event.delta[field];
} else {
fieldValue = event.payload[field];
Expand Down
6 changes: 3 additions & 3 deletions lib/workerErrors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Class for critical errors
* have to stop process
*/
import { EventAddons, EventContext, EventDataAccepted } from '@hawk.so/types';
import { EventAddons, EventContext, EventData } from '@hawk.so/types';

/**
* Error class with additional error context for debugging
Expand Down Expand Up @@ -71,8 +71,8 @@ export class DiffCalculationError extends NonCriticalError {
*/
constructor(
msg: string | Error,
originalEvent: EventDataAccepted<EventAddons>,
eventToCompare: EventDataAccepted<EventAddons>
originalEvent: EventData<EventAddons>,
eventToCompare: EventData<EventAddons>
) {
super(msg);
this.context = {
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"migration-add": "migrate-mongo create",
"migrate:up": "migrate-mongo up",
"migrate:down": "migrate-mongo down",
"convert": "node ./convertors/move-timestamp-out-of-payload.js",
"lint": "eslint -c ./.eslintrc.js --ext .ts,.js --fix .",
"test": "jest --coverage --runInBand",
"test:archiver": "jest workers/archiver",
Expand Down Expand Up @@ -48,7 +49,7 @@
},
"dependencies": {
"@hawk.so/nodejs": "^3.1.1",
"@hawk.so/types": "^0.1.29",
"@hawk.so/types": "^0.1.32",
"@types/amqplib": "^0.8.2",
"@types/jest": "^29.2.3",
"@types/mongodb": "^3.5.15",
Expand Down
2 changes: 1 addition & 1 deletion workers/archiver/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ export default class ArchiverWorker extends Worker {
const repetitionsBulk = repetitionsCollection.initializeUnorderedBulkOp();

repetitionsBulk.find({
'payload.timestamp': {
timestamp: {
$lt: maxOldTimestamp,
},
}).delete();
Expand Down
Loading
Loading