-
Notifications
You must be signed in to change notification settings - Fork 1
imp(workers): move timestamp out of event payload #425
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
6cc15f7
0732960
fcb37e9
f9effb8
74c48dc
78c9223
1b9f811
da487bf
99805e6
5c52181
d252bce
2363bba
74a7612
ff11216
c3d2145
986cd37
d289f51
9945f2b
399286f
97721a2
3e207c2
b5f3b4e
e837f20
869a2d6
494c26f
cb09cba
dfe374b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,231 @@ | ||
| require('dotenv').config(); | ||
| const { MongoClient } = require('mongodb'); | ||
|
|
||
| /** | ||
| * Limit for one time documents selection, used to reduce the load on the database | ||
| */ | ||
| const documentsSelectionLimit = 10000; | ||
|
|
||
| /** | ||
|
Check warning on line 9 in convertors/move-timestamp-out-of-payload.js
|
||
| * @param db - mongo db instance | ||
|
Check warning on line 10 in convertors/move-timestamp-out-of-payload.js
|
||
| * @param collectionName - name of the collection to be updated | ||
|
Check warning on line 11 in convertors/move-timestamp-out-of-payload.js
|
||
| */ | ||
| async function movePayloadTimestampToEventLevel(db, collectionName) { | ||
| const collection = db.collection(collectionName); | ||
|
|
||
| const docsToUpdate = collection.find( | ||
| { timestamp: { $exists: false } }, | ||
| { projection: { _id: 1, 'payload.timestamp': 1 } } | ||
| ).limit(documentsSelectionLimit); | ||
|
|
||
| const batchedOps = []; | ||
|
|
||
| let currentCount = 0; | ||
|
|
||
| for await (const doc of docsToUpdate) { | ||
| process.stdout.write(`\r${currentCount} documents added to batch`); | ||
|
|
||
| if (!doc.payload.timestamp) { | ||
| continue; | ||
| } | ||
|
|
||
| batchedOps.push({ | ||
| updateOne: { | ||
| filter: { _id: doc._id }, | ||
| update: { | ||
| $set: { timestamp: Number(doc.payload.timestamp)}, | ||
| $unset: {'payload.timestamp': ''}, | ||
| } | ||
| } | ||
| }) | ||
|
|
||
| currentCount++; | ||
| } | ||
|
|
||
| if (currentCount > 0) { | ||
| await collection.bulkWrite(batchedOps); | ||
| } | ||
|
|
||
| return currentCount | ||
| } | ||
| /** | ||
| * @param db - mongo db instance | ||
| * @param repetitionCollectionName - repetitions collection to be updated | ||
| * @param projectId - project id of current repetitions collection | ||
| */ | ||
| async function backfillTimestampsFromEvents(db, repetitionCollectionName, projectId) { | ||
|
Check warning on line 56 in convertors/move-timestamp-out-of-payload.js
|
||
| const repetitions = db.collection(repetitionCollectionName); | ||
|
Check warning on line 57 in convertors/move-timestamp-out-of-payload.js
|
||
| const events = db.collection(`events:${projectId}`); | ||
|
Check warning on line 58 in convertors/move-timestamp-out-of-payload.js
|
||
|
|
||
|
Check warning on line 59 in convertors/move-timestamp-out-of-payload.js
|
||
| let bulkOps = []; | ||
| let repetitionCount = 1; | ||
|
|
||
| const repetitionsList = await repetitions.find( | ||
| { | ||
| timestamp: { $exists: false }, | ||
| }, | ||
| { projection: { _id: 1, groupHash: 1 } } | ||
| ).limit(documentsSelectionLimit).toArray(); | ||
|
|
||
| const groupHashList = []; | ||
|
|
||
| for (const repetition of repetitionsList) { | ||
| process.stdout.write(`\r[${repetitionCount} repetition] update with timestamp now have [${bulkOps.length + 1}] ops in bulkOps`); | ||
| groupHashList.push(repetition.groupHash); | ||
| repetitionCount++; | ||
| } | ||
|
|
||
| const relatedEvents = await events.find( | ||
| { groupHash: { $in: groupHashList } }, | ||
| { projection: { timestamp: 1, groupHash: 1 } } | ||
| ).toArray(); | ||
|
|
||
| const relatedEventsMap = new Map() | ||
|
|
||
| relatedEvents.forEach(e => { | ||
| relatedEventsMap.set(e.groupHash, e); | ||
| }) | ||
|
|
||
| for (const repetition of repetitionsList) { | ||
| const relatedEvent = relatedEventsMap.get(repetition.groupHash); | ||
|
|
||
| if (!relatedEvent) { | ||
| bulkOps.push({ | ||
| deleteOne: { | ||
| filter: { _id: repetition._id } | ||
| } | ||
| }) | ||
| } else if (relatedEvent?.timestamp !== null) { | ||
| bulkOps.push({ | ||
| updateOne: { | ||
| filter: { _id: repetition._id }, | ||
| update: { $set: { timestamp: Number(relatedEvent.timestamp) } }, | ||
| }, | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| let processed = 0; | ||
|
|
||
| if (bulkOps.length > 0) { | ||
| const result = await repetitions.bulkWrite(bulkOps); | ||
| const updated = result.modifiedCount; | ||
| const deleted = result.deletedCount; | ||
| processed = bulkOps.length; | ||
| console.log(` updates (${processed} processed, ${updated} updated, ${deleted} deleted)`); | ||
|
|
||
| if (updated + deleted === 0) { | ||
| repetitionCollectionsToCheck.filter(collection => collection !== repetition) | ||
| } | ||
| } | ||
|
|
||
| return processed; | ||
| } | ||
|
|
||
| /** | ||
| * Method that runs convertor script | ||
| */ | ||
| async function run() { | ||
| const fullUri = 'mongodb://hawk_new:evieg9bauK0ahs2youhoh7aer7kohT@rc1d-2jltinutse1eadfs.mdb.yandexcloud.net:27018/hawk_events?authSource=admin&replicaSet=rs01&tls=true&tlsInsecure=true'; | ||
|
|
||
| // Parse the Mongo URL manually | ||
| const mongoUrl = new URL(fullUri); | ||
| const databaseName = 'hawk_events'; | ||
|
|
||
| // Extract query parameters | ||
|
Check warning on line 135 in convertors/move-timestamp-out-of-payload.js
|
||
| const queryParams = Object.fromEntries(mongoUrl.searchParams.entries()); | ||
|
|
||
| // Compose connection options manually | ||
| const options = { | ||
| useNewUrlParser: true, | ||
| useUnifiedTopology: true, | ||
| authSource: queryParams.authSource || 'admin', | ||
| replicaSet: queryParams.replicaSet || undefined, | ||
| tls: queryParams.tls === 'true', | ||
| tlsInsecure: queryParams.tlsInsecure === 'true', | ||
| // connectTimeoutMS: 3600000, | ||
| // socketTimeoutMS: 3600000, | ||
| }; | ||
|
|
||
| // Remove query string from URI | ||
| mongoUrl.search = ''; | ||
| const cleanUri = mongoUrl.toString(); | ||
|
|
||
| console.log('Connecting to:', cleanUri); | ||
| console.log('With options:', options); | ||
|
|
||
| const client = new MongoClient(cleanUri, options); | ||
|
|
||
| await client.connect(); | ||
| const db = client.db(databaseName); | ||
|
|
||
| console.log(`Connected to database: ${databaseName}`); | ||
|
|
||
| const collections = await db.listCollections({}, { | ||
| authorizedCollections: true, | ||
| nameOnly: true, | ||
| }).toArray(); | ||
|
|
||
| let eventCollectionsToCheck = collections.filter(col => /^events:/.test(col.name)).map(col => col.name); | ||
| let repetitionCollectionsToCheck = collections.filter(col => /^repetitions:/.test(col.name)).map(col => col.name); | ||
|
|
||
| console.log(`Found ${eventCollectionsToCheck.length} event collections.`); | ||
| console.log(`Found ${repetitionCollectionsToCheck.length} repetition collections.`); | ||
|
|
||
| // Convert events | ||
| let i = 1; | ||
| let documentsUpdatedCount = 1 | ||
|
|
||
| while (documentsUpdatedCount != 0) { | ||
| documentsUpdatedCount = 0; | ||
| i = 1; | ||
| const collectionsToUpdateCount = eventCollectionsToCheck.length; | ||
|
|
||
| for (const collectionName of eventCollectionsToCheck) { | ||
| console.log(`[${i}/${collectionsToUpdateCount}] Processing ${collectionName}`); | ||
| const updated = await movePayloadTimestampToEventLevel(db, collectionName); | ||
|
|
||
| if (updated === 0) { | ||
| eventCollectionsToCheck = eventCollectionsToCheck.filter(collection => collection !== collectionName); | ||
| } | ||
|
|
||
| documentsUpdatedCount += updated | ||
| i++; | ||
| } | ||
| } | ||
|
|
||
| // Convert repetitions + backfill from events | ||
| documentsUpdatedCount = 1; | ||
|
|
||
| while (documentsUpdatedCount != 0) { | ||
| documentsUpdatedCount = 0; | ||
| i = 1; | ||
| const collectionsToUpdateCount = repetitionCollectionsToCheck.length; | ||
|
|
||
| for (const collectionName of repetitionCollectionsToCheck) { | ||
| console.log(`[${i}/${collectionsToUpdateCount}] Processing ${collectionName}`); | ||
| const projectId = collectionName.split(':')[1]; | ||
|
|
||
| let updated = 0; | ||
|
|
||
| updated += await movePayloadTimestampToEventLevel(db, collectionName); | ||
| updated += await backfillTimestampsFromEvents(db, collectionName, projectId); | ||
|
|
||
| if (updated === 0) { | ||
| repetitionCollectionsToCheck = repetitionCollectionsToCheck.filter(collection => collection !== collectionName); | ||
| } | ||
|
|
||
| documentsUpdatedCount += updated; | ||
| i++; | ||
| } | ||
|
|
||
| console.log(`Conversion iteration complete. ${documentsUpdatedCount} documents updated`); | ||
| } | ||
|
|
||
| await client.close(); | ||
| } | ||
|
|
||
| run().catch(err => { | ||
| console.error('❌ Script failed:', err); | ||
| process.exit(1); | ||
| }); | ||
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment pls