Skip to content

Commit 393d00a

Browse files
e11syneSpecc
andauthored
imp(workers): move timestamp out of event payload (#425)
* chore(): update types deps * imp(): update worker task types * imp(): update worker handlers * feat(): migration for moving timestamp out of payload * fix(): tests fixed * imp(): naming, worker tasks (default, sentry, js, grouper) * imp(): imp worker tasks * imp(sentry): types cast * fix types * chore(): clean up * fix(): tests fixed * fix(): tests minor fixes * deps: update types dependency * fix(): tests minor fixes * fix(): types fix * chore(): get rid of EventDataAccepted * chore: lint remove redundant imports * fix(): sentry tests fixed * chore: fix lint * imp(): change migration to use bulkWrite instead of cursor * imp(): js worker event type improved * chore(): fix lint * imp(): add convertor script * chore: lint fix * imp: convertor * chore(): converter script improvements --------- Co-authored-by: Peter Savchenko <specc.dev@gmail.com>
1 parent d752e39 commit 393d00a

39 files changed

+544
-388
lines changed
Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
require('dotenv').config();
2+
const { MongoClient } = require('mongodb');
3+
4+
/**
5+
* Limit for one time documents selection, used to reduce the load on the database
6+
*/
7+
const documentsSelectionLimit = 10000;
8+
9+
/**
10+
* @param db - mongo db instance
11+
* @param collectionName - name of the collection to be updated
12+
*/
13+
async function movePayloadTimestampToEventLevel(db, collectionName) {
14+
const collection = db.collection(collectionName);
15+
16+
const docsToUpdate = collection.find(
17+
{ timestamp: { $exists: false } },
18+
{ projection: { _id: 1, 'payload.timestamp': 1 } }
19+
).limit(documentsSelectionLimit);
20+
21+
const batchedOps = [];
22+
23+
let currentCount = 0;
24+
25+
for await (const doc of docsToUpdate) {
26+
process.stdout.write(`\r${currentCount} documents added to batch`);
27+
28+
if (!doc.payload.timestamp) {
29+
continue;
30+
}
31+
32+
batchedOps.push({
33+
updateOne: {
34+
filter: { _id: doc._id },
35+
update: {
36+
$set: { timestamp: Number(doc.payload.timestamp)},
37+
$unset: {'payload.timestamp': ''},
38+
}
39+
}
40+
})
41+
42+
currentCount++;
43+
}
44+
45+
if (currentCount > 0) {
46+
await collection.bulkWrite(batchedOps);
47+
}
48+
49+
return currentCount
50+
}
51+
/**
52+
* @param db - mongo db instance
53+
* @param repetitionCollectionName - repetitions collection to be updated
54+
* @param projectId - project id of current repetitions collection
55+
*/
56+
async function backfillTimestampsFromEvents(db, repetitionCollectionName, projectId) {
57+
const repetitions = db.collection(repetitionCollectionName);
58+
const events = db.collection(`events:${projectId}`);
59+
60+
let bulkOps = [];
61+
let repetitionCount = 1;
62+
63+
const repetitionsList = await repetitions.find(
64+
{
65+
timestamp: { $exists: false },
66+
},
67+
{ projection: { _id: 1, groupHash: 1 } }
68+
).limit(documentsSelectionLimit).toArray();
69+
70+
const groupHashList = [];
71+
72+
for (const repetition of repetitionsList) {
73+
process.stdout.write(`\r[${repetitionCount} repetition] update with timestamp now have [${bulkOps.length + 1}] ops in bulkOps`);
74+
groupHashList.push(repetition.groupHash);
75+
repetitionCount++;
76+
}
77+
78+
const relatedEvents = await events.find(
79+
{ groupHash: { $in: groupHashList } },
80+
{ projection: { timestamp: 1, groupHash: 1 } }
81+
).toArray();
82+
83+
const relatedEventsMap = new Map()
84+
85+
relatedEvents.forEach(e => {
86+
relatedEventsMap.set(e.groupHash, e);
87+
})
88+
89+
for (const repetition of repetitionsList) {
90+
const relatedEvent = relatedEventsMap.get(repetition.groupHash);
91+
92+
if (!relatedEvent) {
93+
bulkOps.push({
94+
deleteOne: {
95+
filter: { _id: repetition._id }
96+
}
97+
})
98+
} else if (relatedEvent?.timestamp !== null) {
99+
bulkOps.push({
100+
updateOne: {
101+
filter: { _id: repetition._id },
102+
update: { $set: { timestamp: Number(relatedEvent.timestamp) } },
103+
},
104+
});
105+
}
106+
}
107+
108+
let processed = 0;
109+
110+
if (bulkOps.length > 0) {
111+
const result = await repetitions.bulkWrite(bulkOps);
112+
const updated = result.modifiedCount;
113+
const deleted = result.deletedCount;
114+
processed = bulkOps.length;
115+
console.log(` updates (${processed} processed, ${updated} updated, ${deleted} deleted)`);
116+
117+
if (updated + deleted === 0) {
118+
repetitionCollectionsToCheck.filter(collection => collection !== repetition)
119+
}
120+
}
121+
122+
return processed;
123+
}
124+
125+
/**
126+
* Method that runs convertor script
127+
*/
128+
async function run() {
129+
const fullUri = 'mongodb://hawk_new:evieg9bauK0ahs2youhoh7aer7kohT@rc1d-2jltinutse1eadfs.mdb.yandexcloud.net:27018/hawk_events?authSource=admin&replicaSet=rs01&tls=true&tlsInsecure=true';
130+
131+
// Parse the Mongo URL manually
132+
const mongoUrl = new URL(fullUri);
133+
const databaseName = 'hawk_events';
134+
135+
// Extract query parameters
136+
const queryParams = Object.fromEntries(mongoUrl.searchParams.entries());
137+
138+
// Compose connection options manually
139+
const options = {
140+
useNewUrlParser: true,
141+
useUnifiedTopology: true,
142+
authSource: queryParams.authSource || 'admin',
143+
replicaSet: queryParams.replicaSet || undefined,
144+
tls: queryParams.tls === 'true',
145+
tlsInsecure: queryParams.tlsInsecure === 'true',
146+
// connectTimeoutMS: 3600000,
147+
// socketTimeoutMS: 3600000,
148+
};
149+
150+
// Remove query string from URI
151+
mongoUrl.search = '';
152+
const cleanUri = mongoUrl.toString();
153+
154+
console.log('Connecting to:', cleanUri);
155+
console.log('With options:', options);
156+
157+
const client = new MongoClient(cleanUri, options);
158+
159+
await client.connect();
160+
const db = client.db(databaseName);
161+
162+
console.log(`Connected to database: ${databaseName}`);
163+
164+
const collections = await db.listCollections({}, {
165+
authorizedCollections: true,
166+
nameOnly: true,
167+
}).toArray();
168+
169+
let eventCollectionsToCheck = collections.filter(col => /^events:/.test(col.name)).map(col => col.name);
170+
let repetitionCollectionsToCheck = collections.filter(col => /^repetitions:/.test(col.name)).map(col => col.name);
171+
172+
console.log(`Found ${eventCollectionsToCheck.length} event collections.`);
173+
console.log(`Found ${repetitionCollectionsToCheck.length} repetition collections.`);
174+
175+
// Convert events
176+
let i = 1;
177+
let documentsUpdatedCount = 1
178+
179+
while (documentsUpdatedCount != 0) {
180+
documentsUpdatedCount = 0;
181+
i = 1;
182+
const collectionsToUpdateCount = eventCollectionsToCheck.length;
183+
184+
for (const collectionName of eventCollectionsToCheck) {
185+
console.log(`[${i}/${collectionsToUpdateCount}] Processing ${collectionName}`);
186+
const updated = await movePayloadTimestampToEventLevel(db, collectionName);
187+
188+
if (updated === 0) {
189+
eventCollectionsToCheck = eventCollectionsToCheck.filter(collection => collection !== collectionName);
190+
}
191+
192+
documentsUpdatedCount += updated
193+
i++;
194+
}
195+
}
196+
197+
// Convert repetitions + backfill from events
198+
documentsUpdatedCount = 1;
199+
200+
while (documentsUpdatedCount != 0) {
201+
documentsUpdatedCount = 0;
202+
i = 1;
203+
const collectionsToUpdateCount = repetitionCollectionsToCheck.length;
204+
205+
for (const collectionName of repetitionCollectionsToCheck) {
206+
console.log(`[${i}/${collectionsToUpdateCount}] Processing ${collectionName}`);
207+
const projectId = collectionName.split(':')[1];
208+
209+
let updated = 0;
210+
211+
updated += await movePayloadTimestampToEventLevel(db, collectionName);
212+
updated += await backfillTimestampsFromEvents(db, collectionName, projectId);
213+
214+
if (updated === 0) {
215+
repetitionCollectionsToCheck = repetitionCollectionsToCheck.filter(collection => collection !== collectionName);
216+
}
217+
218+
documentsUpdatedCount += updated;
219+
i++;
220+
}
221+
222+
console.log(`Conversion iteration complete. ${documentsUpdatedCount} documents updated`);
223+
}
224+
225+
await client.close();
226+
}
227+
228+
run().catch(err => {
229+
console.error('❌ Script failed:', err);
230+
process.exit(1);
231+
});

lib/event-worker.ts

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { Worker } from './worker';
22
import * as WorkerNames from './workerNames';
33
import { GroupWorkerTask } from 'hawk-worker-grouper/types/group-worker-task';
4-
import { EventWorkerTask } from './types/event-worker-task';
4+
import { CatcherMessageType, CatcherMessagePayload, CatcherMessageAccepted, ErrorsCatcherType } from '@hawk.so/types';
55

66
/**
77
* Defines a Worker that handles events from Catcher.
@@ -12,30 +12,31 @@ export abstract class EventWorker extends Worker {
1212
* Worker type (will pull tasks from Registry queue with the same name)
1313
* 'errors/nodejs' for example
1414
*/
15-
public type = '';
15+
public type: ErrorsCatcherType;
1616

1717
/**
1818
* Message handle function
1919
*
20-
* @param {EventWorkerTask} event - event to handle
20+
* @param {CatcherMessageAccepted<CatcherMessageType>} task - worker task to handle
2121
*/
22-
public async handle(event: EventWorkerTask): Promise<void> {
23-
this.validate(event);
22+
public async handle(task: CatcherMessageAccepted<CatcherMessageType>): Promise<void> {
23+
this.validate(task);
2424

2525
await this.addTask(WorkerNames.GROUPER, {
26-
projectId: event.projectId,
27-
catcherType: this.type,
28-
event: event.payload,
29-
} as GroupWorkerTask);
26+
projectId: task.projectId,
27+
catcherType: this.type as CatcherMessageType,
28+
payload: task.payload as CatcherMessagePayload<ErrorsCatcherType>,
29+
timestamp: task.timestamp,
30+
} as GroupWorkerTask<ErrorsCatcherType>);
3031
}
3132

3233
/**
3334
* Validate passed event data
3435
*
35-
* @param {EventWorkerTask} event - event to be validated
36+
* @param {CatcherMessageAccepted<CatcherMessageType>} task - task to be validated
3637
*/
37-
protected validate(event: EventWorkerTask): void {
38-
if (!event.projectId || !event.payload) {
38+
protected validate(task: CatcherMessageAccepted<CatcherMessageType>): void {
39+
if (!task.projectId || !task.payload || !task.timestamp) {
3940
throw new Error('Bad data was given');
4041
}
4142
}

lib/types/event-worker-task.d.ts

Lines changed: 0 additions & 22 deletions
This file was deleted.

lib/types/worker-task.d.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@
44
* When you create new type of worker, describe its task's structure with interface inherited from WorkerTask
55
*/
66
export interface WorkerTask {
7-
}
7+
}

lib/utils/unsafeFields.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,14 @@ export function encodeUnsafeFields(event: GroupedEventDBScheme | RepetitionDBSch
5050
* Repetition includes delta field, grouped event includes payload
5151
*/
5252
if ('delta' in event) {
53+
/**
54+
* We need to check if delta field exists but with undefined value
55+
* It would mean that repetition payload is same with original event paylaod
56+
*/
57+
if (event.delta === undefined) {
58+
return;
59+
}
60+
5361
fieldValue = event.delta[field];
5462
} else {
5563
fieldValue = event.payload[field];

lib/workerErrors.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
* Class for critical errors
33
* have to stop process
44
*/
5-
import { EventAddons, EventContext, EventDataAccepted } from '@hawk.so/types';
5+
import { EventAddons, EventContext, EventData } from '@hawk.so/types';
66

77
/**
88
* Error class with additional error context for debugging
@@ -71,8 +71,8 @@ export class DiffCalculationError extends NonCriticalError {
7171
*/
7272
constructor(
7373
msg: string | Error,
74-
originalEvent: EventDataAccepted<EventAddons>,
75-
eventToCompare: EventDataAccepted<EventAddons>
74+
originalEvent: EventData<EventAddons>,
75+
eventToCompare: EventData<EventAddons>
7676
) {
7777
super(msg);
7878
this.context = {

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
"migration-add": "migrate-mongo create",
1515
"migrate:up": "migrate-mongo up",
1616
"migrate:down": "migrate-mongo down",
17+
"convert": "node ./convertors/move-timestamp-out-of-payload.js",
1718
"lint": "eslint -c ./.eslintrc.js --ext .ts,.js --fix .",
1819
"test": "jest --coverage --runInBand",
1920
"test:archiver": "jest workers/archiver",
@@ -48,7 +49,7 @@
4849
},
4950
"dependencies": {
5051
"@hawk.so/nodejs": "^3.1.1",
51-
"@hawk.so/types": "^0.1.29",
52+
"@hawk.so/types": "^0.1.32",
5253
"@types/amqplib": "^0.8.2",
5354
"@types/jest": "^29.2.3",
5455
"@types/mongodb": "^3.5.15",

workers/archiver/src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ export default class ArchiverWorker extends Worker {
168168
const repetitionsBulk = repetitionsCollection.initializeUnorderedBulkOp();
169169

170170
repetitionsBulk.find({
171-
'payload.timestamp': {
171+
timestamp: {
172172
$lt: maxOldTimestamp,
173173
},
174174
}).delete();

0 commit comments

Comments
 (0)