Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions migrations/20251104000000-add-timestamp-index-to-events.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/**
* This migration creates indexes for all collections events:projectId on timestamp field
*/

/**
* Index name for timestamp field
*/
const timestampIndexName = 'timestamp';

module.exports = {
async up(db) {
const collections = await db.listCollections({}, {
authorizedCollections: true,
nameOnly: true,
}).toArray();

const targetCollections = [];

collections.forEach((collection) => {
if (/events:/.test(collection.name)) {
targetCollections.push(collection.name);
}
});

console.log(`${targetCollections.length} events collections will be updated.`);

let currentCollectionNumber = 1;

for (const collectionName of targetCollections) {
console.log(`${collectionName} in process.`);
console.log(`${currentCollectionNumber} of ${targetCollections.length} in process.`);

try {
const hasIndexAlready = await db.collection(collectionName).indexExists(timestampIndexName);

if (!hasIndexAlready) {
await db.collection(collectionName).createIndex({
timestamp: 1,
}, {
name: timestampIndexName,
sparse: true,
background: true,
});
console.log(`Index ${timestampIndexName} created for ${collectionName}`);
} else {
console.log(`Index ${timestampIndexName} already exists for ${collectionName}`);
}
} catch (error) {
console.error(`Error adding index to ${collectionName}:`, error);
}

currentCollectionNumber++;
}
},

async down(db) {
const collections = await db.listCollections({}, {
authorizedCollections: true,
nameOnly: true,
}).toArray();

const targetCollections = [];

collections.forEach((collection) => {
if (/events:/.test(collection.name)) {
targetCollections.push(collection.name);
}
});

console.log(`${targetCollections.length} events collections will be updated.`);

let currentCollectionNumber = 1;

for (const collectionName of targetCollections) {
console.log(`${collectionName} in process.`);
console.log(`${currentCollectionNumber} of ${targetCollections.length} in process.`);

try {
const hasIndexAlready = await db.collection(collectionName).indexExists(timestampIndexName);

if (hasIndexAlready) {
await db.collection(collectionName).dropIndex(timestampIndexName);
console.log(`Index ${timestampIndexName} dropped for ${collectionName}`);
} else {
console.log(`Index ${timestampIndexName} does not exist for ${collectionName}, skipping drop.`);
}
} catch (error) {
console.error(`Error dropping index from ${collectionName}:`, error);
}

currentCollectionNumber++;
}
}
};
94 changes: 94 additions & 0 deletions migrations/20251105000000-add-compound-index-to-daily-events.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/**
* Create compound index for all collections dailyEvents:projectId on
* (groupingTimestamp, lastRepetitionTime, _id desc)
*/
const indexName = 'groupingTimestampAndLastRepetitionTimeAndId';

module.exports = {
async up(db) {
const collections = await db.listCollections({}, {
authorizedCollections: true,
nameOnly: true,
}).toArray();

const targetCollections = [];

collections.forEach((collection) => {
if (/dailyEvents:/.test(collection.name)) {
targetCollections.push(collection.name);
}
});

console.log(`${targetCollections.length} dailyEvents collections will be updated.`);

let currentCollectionNumber = 1;

for (const collectionName of targetCollections) {
console.log(`${collectionName} in process.`);
console.log(`${currentCollectionNumber} of ${targetCollections.length} in process.`);

try {
const hasIndexAlready = await db.collection(collectionName).indexExists(indexName);

if (!hasIndexAlready) {
await db.collection(collectionName).createIndex({
groupingTimestamp: -1,
lastRepetitionTime: -1,
_id: -1,
}, {
name: indexName,
background: true,
});
console.log(`Index ${indexName} created for ${collectionName}`);
} else {
console.log(`Index ${indexName} already exists for ${collectionName}`);
}
} catch (error) {
console.error(`Error adding index to ${collectionName}:`, error);
}

currentCollectionNumber++;
}
},

async down(db) {
const collections = await db.listCollections({}, {
authorizedCollections: true,
nameOnly: true,
}).toArray();

const targetCollections = [];

collections.forEach((collection) => {
if (/dailyEvents:/.test(collection.name)) {
targetCollections.push(collection.name);
}
});

console.log(`${targetCollections.length} dailyEvents collections will be updated.`);

let currentCollectionNumber = 1;

for (const collectionName of targetCollections) {
console.log(`${collectionName} in process.`);
console.log(`${currentCollectionNumber} of ${targetCollections.length} in process.`);

try {
const hasIndexAlready = await db.collection(collectionName).indexExists(indexName);

if (hasIndexAlready) {
await db.collection(collectionName).dropIndex(indexName);
console.log(`Index ${indexName} dropped for ${collectionName}`);
} else {
console.log(`Index ${indexName} does not exist for ${collectionName}, skipping drop.`);
}
} catch (error) {
console.error(`Error dropping index from ${collectionName}:`, error);
}

currentCollectionNumber++;
}
}
};


62 changes: 62 additions & 0 deletions migrations/20251107210624-add-index-on-events-payload-release.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/**
* @file Add index on `payload.release` for all per-project events collections
* Collections pattern: `events:{projectId}` in the events database
*/
module.exports = {
async up(db) {
const indexSpec = { 'payload.release': 1 };
const indexOptions = {
name: 'payloadRelease',
background: true,
sparse: true,
};

const collections = await db.listCollections().toArray();
const targetCollections = collections
.map(c => c.name)
.filter(name => name && name.startsWith('events:'));

console.log(`Found ${targetCollections.length} events collections`);

for (const name of targetCollections) {
const coll = db.collection(name);
const existing = await coll.indexes();

const alreadyExists = existing.some(idx => idx.name === indexOptions.name) ||
existing.some(idx => idx.key && idx.key['payload.release'] === 1);

if (alreadyExists) {
console.log(`Index already exists on ${name}. Skipped.`);
continue;
}

console.log(`Creating index ${indexOptions.name} on ${name}...`);
await coll.createIndex(indexSpec, indexOptions);
}
},

async down(db) {
const indexName = 'idx_payload_release';

const collections = await db.listCollections().toArray();
const targetCollections = collections
.map(c => c.name)
.filter(name => name && name.startsWith('events:'));

console.log(`Found ${targetCollections.length} events collections`);

for (const name of targetCollections) {
const coll = db.collection(name);

try {
console.log(`Dropping index ${indexName} on ${name}...`);
await coll.dropIndex(indexName);
} catch (e) {
// If index does not exist, ignore
if (e && e.codeName !== 'IndexNotFound') {
throw e;
}
}
}
},
};
66 changes: 66 additions & 0 deletions migrations/20251107222417-convert-release-field-to-string.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/**
* Convert all payload.release fields to strings across all per-project events collections.
* Collections pattern: events:{projectId}
*/
module.exports = {
async up(db) {
const collections = await db.listCollections().toArray();
const targetCollections = collections
.map(c => c.name)
.filter(name => name && name.startsWith('events:'));

console.log(`Found ${targetCollections.length} events collections to process`);

for (const name of targetCollections) {
const coll = db.collection(name);

// Find docs where payload.release exists
const cursor = coll.find(
{ 'payload.release': { $exists: true } },
{
projection: {
_id: 1,
'payload.release': 1,
},
}
);

let converted = 0;
let scanned = 0;
const ops = [];
const BATCH_SIZE = 1000;

while (await cursor.hasNext()) {
const doc = await cursor.next();

scanned++;
const releaseValue = doc && doc.payload ? doc.payload.release : undefined;

if (typeof releaseValue !== 'string') {
ops.push({
updateOne: {
filter: { _id: doc._id },
update: { $set: { 'payload.release': String(releaseValue) } },
},
});
converted++;
}

if (ops.length >= BATCH_SIZE) {
await coll.bulkWrite(ops, { ordered: false });
ops.length = 0;
}
}

if (ops.length > 0) {
await coll.bulkWrite(ops, { ordered: false });
}

console.log(`[${name}] scanned=${scanned}, converted=${converted}`);
}
},

async down() {
console.log('Down migration is not implemented: cannot reliably restore original non-string types for payload.release.');
},
};
9 changes: 9 additions & 0 deletions workers/grouper/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,15 @@ export default class GrouperWorker extends Worker {
public async handle(task: GroupWorkerTask<ErrorsCatcherType>): Promise<void> {
let uniqueEventHash = await this.getUniqueEventHash(task);

// FIX RELEASE TYPE
// TODO: REMOVE AFTER 01.01.2026, after the most of the users update to new js catcher
if (task.payload && task.payload.release !== undefined) {
task.payload = {
...task.payload,
release: String(task.payload.release)
}
}

/**
* Find event by group hash.
*/
Expand Down
Loading