diff --git a/migrations/20251104000000-add-timestamp-index-to-events.js b/migrations/20251104000000-add-timestamp-index-to-events.js new file mode 100644 index 00000000..83a34d32 --- /dev/null +++ b/migrations/20251104000000-add-timestamp-index-to-events.js @@ -0,0 +1,94 @@ +/** + * This migration creates indexes for all collections events:projectId on timestamp field + */ + +/** + * Index name for timestamp field + */ +const timestampIndexName = 'timestamp'; + +module.exports = { + async up(db) { + const collections = await db.listCollections({}, { + authorizedCollections: true, + nameOnly: true, + }).toArray(); + + const targetCollections = []; + + collections.forEach((collection) => { + if (/events:/.test(collection.name)) { + targetCollections.push(collection.name); + } + }); + + console.log(`${targetCollections.length} events collections will be updated.`); + + let currentCollectionNumber = 1; + + for (const collectionName of targetCollections) { + console.log(`${collectionName} in process.`); + console.log(`${currentCollectionNumber} of ${targetCollections.length} in process.`); + + try { + const hasIndexAlready = await db.collection(collectionName).indexExists(timestampIndexName); + + if (!hasIndexAlready) { + await db.collection(collectionName).createIndex({ + timestamp: 1, + }, { + name: timestampIndexName, + sparse: true, + background: true, + }); + console.log(`Index ${timestampIndexName} created for ${collectionName}`); + } else { + console.log(`Index ${timestampIndexName} already exists for ${collectionName}`); + } + } catch (error) { + console.error(`Error adding index to ${collectionName}:`, error); + } + + currentCollectionNumber++; + } + }, + + async down(db) { + const collections = await db.listCollections({}, { + authorizedCollections: true, + nameOnly: true, + }).toArray(); + + const targetCollections = []; + + collections.forEach((collection) => { + if (/events:/.test(collection.name)) { + targetCollections.push(collection.name); + } + }); + + console.log(`${targetCollections.length} events collections will be updated.`); + + let currentCollectionNumber = 1; + + for (const collectionName of targetCollections) { + console.log(`${collectionName} in process.`); + console.log(`${currentCollectionNumber} of ${targetCollections.length} in process.`); + + try { + const hasIndexAlready = await db.collection(collectionName).indexExists(timestampIndexName); + + if (hasIndexAlready) { + await db.collection(collectionName).dropIndex(timestampIndexName); + console.log(`Index ${timestampIndexName} dropped for ${collectionName}`); + } else { + console.log(`Index ${timestampIndexName} does not exist for ${collectionName}, skipping drop.`); + } + } catch (error) { + console.error(`Error dropping index from ${collectionName}:`, error); + } + + currentCollectionNumber++; + } + } +}; diff --git a/migrations/20251105000000-add-compound-index-to-daily-events.js b/migrations/20251105000000-add-compound-index-to-daily-events.js new file mode 100644 index 00000000..4ffe380d --- /dev/null +++ b/migrations/20251105000000-add-compound-index-to-daily-events.js @@ -0,0 +1,94 @@ +/** + * Create compound index for all collections dailyEvents:projectId on + * (groupingTimestamp, lastRepetitionTime, _id desc) + */ +const indexName = 'groupingTimestampAndLastRepetitionTimeAndId'; + +module.exports = { + async up(db) { + const collections = await db.listCollections({}, { + authorizedCollections: true, + nameOnly: true, + }).toArray(); + + const targetCollections = []; + + collections.forEach((collection) => { + if (/dailyEvents:/.test(collection.name)) { + targetCollections.push(collection.name); + } + }); + + console.log(`${targetCollections.length} dailyEvents collections will be updated.`); + + let currentCollectionNumber = 1; + + for (const collectionName of targetCollections) { + console.log(`${collectionName} in process.`); + console.log(`${currentCollectionNumber} of ${targetCollections.length} in process.`); + + try { + const hasIndexAlready = await db.collection(collectionName).indexExists(indexName); + + if (!hasIndexAlready) { + await db.collection(collectionName).createIndex({ + groupingTimestamp: -1, + lastRepetitionTime: -1, + _id: -1, + }, { + name: indexName, + background: true, + }); + console.log(`Index ${indexName} created for ${collectionName}`); + } else { + console.log(`Index ${indexName} already exists for ${collectionName}`); + } + } catch (error) { + console.error(`Error adding index to ${collectionName}:`, error); + } + + currentCollectionNumber++; + } + }, + + async down(db) { + const collections = await db.listCollections({}, { + authorizedCollections: true, + nameOnly: true, + }).toArray(); + + const targetCollections = []; + + collections.forEach((collection) => { + if (/dailyEvents:/.test(collection.name)) { + targetCollections.push(collection.name); + } + }); + + console.log(`${targetCollections.length} dailyEvents collections will be updated.`); + + let currentCollectionNumber = 1; + + for (const collectionName of targetCollections) { + console.log(`${collectionName} in process.`); + console.log(`${currentCollectionNumber} of ${targetCollections.length} in process.`); + + try { + const hasIndexAlready = await db.collection(collectionName).indexExists(indexName); + + if (hasIndexAlready) { + await db.collection(collectionName).dropIndex(indexName); + console.log(`Index ${indexName} dropped for ${collectionName}`); + } else { + console.log(`Index ${indexName} does not exist for ${collectionName}, skipping drop.`); + } + } catch (error) { + console.error(`Error dropping index from ${collectionName}:`, error); + } + + currentCollectionNumber++; + } + } +}; + + diff --git a/migrations/20251107210624-add-index-on-events-payload-release.js b/migrations/20251107210624-add-index-on-events-payload-release.js new file mode 100644 index 00000000..350e7120 --- /dev/null +++ b/migrations/20251107210624-add-index-on-events-payload-release.js @@ -0,0 +1,62 @@ +/** + * @file Add index on `payload.release` for all per-project events collections + * Collections pattern: `events:{projectId}` in the events database + */ +module.exports = { + async up(db) { + const indexSpec = { 'payload.release': 1 }; + const indexOptions = { + name: 'payloadRelease', + background: true, + sparse: true, + }; + + const collections = await db.listCollections().toArray(); + const targetCollections = collections + .map(c => c.name) + .filter(name => name && name.startsWith('events:')); + + console.log(`Found ${targetCollections.length} events collections`); + + for (const name of targetCollections) { + const coll = db.collection(name); + const existing = await coll.indexes(); + + const alreadyExists = existing.some(idx => idx.name === indexOptions.name) || + existing.some(idx => idx.key && idx.key['payload.release'] === 1); + + if (alreadyExists) { + console.log(`Index already exists on ${name}. Skipped.`); + continue; + } + + console.log(`Creating index ${indexOptions.name} on ${name}...`); + await coll.createIndex(indexSpec, indexOptions); + } + }, + + async down(db) { + const indexName = 'idx_payload_release'; + + const collections = await db.listCollections().toArray(); + const targetCollections = collections + .map(c => c.name) + .filter(name => name && name.startsWith('events:')); + + console.log(`Found ${targetCollections.length} events collections`); + + for (const name of targetCollections) { + const coll = db.collection(name); + + try { + console.log(`Dropping index ${indexName} on ${name}...`); + await coll.dropIndex(indexName); + } catch (e) { + // If index does not exist, ignore + if (e && e.codeName !== 'IndexNotFound') { + throw e; + } + } + } + }, +}; diff --git a/migrations/20251107222417-convert-release-field-to-string.js b/migrations/20251107222417-convert-release-field-to-string.js new file mode 100644 index 00000000..9f838686 --- /dev/null +++ b/migrations/20251107222417-convert-release-field-to-string.js @@ -0,0 +1,66 @@ +/** + * Convert all payload.release fields to strings across all per-project events collections. + * Collections pattern: events:{projectId} + */ +module.exports = { + async up(db) { + const collections = await db.listCollections().toArray(); + const targetCollections = collections + .map(c => c.name) + .filter(name => name && name.startsWith('events:')); + + console.log(`Found ${targetCollections.length} events collections to process`); + + for (const name of targetCollections) { + const coll = db.collection(name); + + // Find docs where payload.release exists + const cursor = coll.find( + { 'payload.release': { $exists: true } }, + { + projection: { + _id: 1, + 'payload.release': 1, + }, + } + ); + + let converted = 0; + let scanned = 0; + const ops = []; + const BATCH_SIZE = 1000; + + while (await cursor.hasNext()) { + const doc = await cursor.next(); + + scanned++; + const releaseValue = doc && doc.payload ? doc.payload.release : undefined; + + if (typeof releaseValue !== 'string') { + ops.push({ + updateOne: { + filter: { _id: doc._id }, + update: { $set: { 'payload.release': String(releaseValue) } }, + }, + }); + converted++; + } + + if (ops.length >= BATCH_SIZE) { + await coll.bulkWrite(ops, { ordered: false }); + ops.length = 0; + } + } + + if (ops.length > 0) { + await coll.bulkWrite(ops, { ordered: false }); + } + + console.log(`[${name}] scanned=${scanned}, converted=${converted}`); + } + }, + + async down() { + console.log('Down migration is not implemented: cannot reliably restore original non-string types for payload.release.'); + }, +}; diff --git a/workers/grouper/src/index.ts b/workers/grouper/src/index.ts index f7621149..d8869e16 100644 --- a/workers/grouper/src/index.ts +++ b/workers/grouper/src/index.ts @@ -101,6 +101,15 @@ export default class GrouperWorker extends Worker { public async handle(task: GroupWorkerTask): Promise { let uniqueEventHash = await this.getUniqueEventHash(task); + // FIX RELEASE TYPE + // TODO: REMOVE AFTER 01.01.2026, after the most of the users update to new js catcher + if (task.payload && task.payload.release !== undefined) { + task.payload = { + ...task.payload, + release: String(task.payload.release) + } + } + /** * Find event by group hash. */