Skip to content

Commit 89722b8

Browse files
github-actions[bot]slaveeksGeekaN2
authored
Update prod (#472)
* feat(perf): added index for events timestamo (#471) * feat(opt): added index for dailyEvent for groupingTimestamp, lastRepetitionTime и _id for sorting for dailyEventPortion (#473) * feat: add release field to string & indexing migrations (#474) --------- Co-authored-by: Vyacheslav Chernyshev <81693471+slaveeks@users.noreply.github.com> Co-authored-by: Mikhail Popov <geekan@bk.ru>
1 parent a26b106 commit 89722b8

File tree

5 files changed

+325
-0
lines changed

5 files changed

+325
-0
lines changed
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/**
2+
* This migration creates indexes for all collections events:projectId on timestamp field
3+
*/
4+
5+
/**
6+
* Index name for timestamp field
7+
*/
8+
const timestampIndexName = 'timestamp';
9+
10+
module.exports = {
11+
async up(db) {
12+
const collections = await db.listCollections({}, {
13+
authorizedCollections: true,
14+
nameOnly: true,
15+
}).toArray();
16+
17+
const targetCollections = [];
18+
19+
collections.forEach((collection) => {
20+
if (/events:/.test(collection.name)) {
21+
targetCollections.push(collection.name);
22+
}
23+
});
24+
25+
console.log(`${targetCollections.length} events collections will be updated.`);
26+
27+
let currentCollectionNumber = 1;
28+
29+
for (const collectionName of targetCollections) {
30+
console.log(`${collectionName} in process.`);
31+
console.log(`${currentCollectionNumber} of ${targetCollections.length} in process.`);
32+
33+
try {
34+
const hasIndexAlready = await db.collection(collectionName).indexExists(timestampIndexName);
35+
36+
if (!hasIndexAlready) {
37+
await db.collection(collectionName).createIndex({
38+
timestamp: 1,
39+
}, {
40+
name: timestampIndexName,
41+
sparse: true,
42+
background: true,
43+
});
44+
console.log(`Index ${timestampIndexName} created for ${collectionName}`);
45+
} else {
46+
console.log(`Index ${timestampIndexName} already exists for ${collectionName}`);
47+
}
48+
} catch (error) {
49+
console.error(`Error adding index to ${collectionName}:`, error);
50+
}
51+
52+
currentCollectionNumber++;
53+
}
54+
},
55+
56+
async down(db) {
57+
const collections = await db.listCollections({}, {
58+
authorizedCollections: true,
59+
nameOnly: true,
60+
}).toArray();
61+
62+
const targetCollections = [];
63+
64+
collections.forEach((collection) => {
65+
if (/events:/.test(collection.name)) {
66+
targetCollections.push(collection.name);
67+
}
68+
});
69+
70+
console.log(`${targetCollections.length} events collections will be updated.`);
71+
72+
let currentCollectionNumber = 1;
73+
74+
for (const collectionName of targetCollections) {
75+
console.log(`${collectionName} in process.`);
76+
console.log(`${currentCollectionNumber} of ${targetCollections.length} in process.`);
77+
78+
try {
79+
const hasIndexAlready = await db.collection(collectionName).indexExists(timestampIndexName);
80+
81+
if (hasIndexAlready) {
82+
await db.collection(collectionName).dropIndex(timestampIndexName);
83+
console.log(`Index ${timestampIndexName} dropped for ${collectionName}`);
84+
} else {
85+
console.log(`Index ${timestampIndexName} does not exist for ${collectionName}, skipping drop.`);
86+
}
87+
} catch (error) {
88+
console.error(`Error dropping index from ${collectionName}:`, error);
89+
}
90+
91+
currentCollectionNumber++;
92+
}
93+
}
94+
};
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/**
2+
* Create compound index for all collections dailyEvents:projectId on
3+
* (groupingTimestamp, lastRepetitionTime, _id desc)
4+
*/
5+
const indexName = 'groupingTimestampAndLastRepetitionTimeAndId';
6+
7+
module.exports = {
8+
async up(db) {
9+
const collections = await db.listCollections({}, {
10+
authorizedCollections: true,
11+
nameOnly: true,
12+
}).toArray();
13+
14+
const targetCollections = [];
15+
16+
collections.forEach((collection) => {
17+
if (/dailyEvents:/.test(collection.name)) {
18+
targetCollections.push(collection.name);
19+
}
20+
});
21+
22+
console.log(`${targetCollections.length} dailyEvents collections will be updated.`);
23+
24+
let currentCollectionNumber = 1;
25+
26+
for (const collectionName of targetCollections) {
27+
console.log(`${collectionName} in process.`);
28+
console.log(`${currentCollectionNumber} of ${targetCollections.length} in process.`);
29+
30+
try {
31+
const hasIndexAlready = await db.collection(collectionName).indexExists(indexName);
32+
33+
if (!hasIndexAlready) {
34+
await db.collection(collectionName).createIndex({
35+
groupingTimestamp: -1,
36+
lastRepetitionTime: -1,
37+
_id: -1,
38+
}, {
39+
name: indexName,
40+
background: true,
41+
});
42+
console.log(`Index ${indexName} created for ${collectionName}`);
43+
} else {
44+
console.log(`Index ${indexName} already exists for ${collectionName}`);
45+
}
46+
} catch (error) {
47+
console.error(`Error adding index to ${collectionName}:`, error);
48+
}
49+
50+
currentCollectionNumber++;
51+
}
52+
},
53+
54+
async down(db) {
55+
const collections = await db.listCollections({}, {
56+
authorizedCollections: true,
57+
nameOnly: true,
58+
}).toArray();
59+
60+
const targetCollections = [];
61+
62+
collections.forEach((collection) => {
63+
if (/dailyEvents:/.test(collection.name)) {
64+
targetCollections.push(collection.name);
65+
}
66+
});
67+
68+
console.log(`${targetCollections.length} dailyEvents collections will be updated.`);
69+
70+
let currentCollectionNumber = 1;
71+
72+
for (const collectionName of targetCollections) {
73+
console.log(`${collectionName} in process.`);
74+
console.log(`${currentCollectionNumber} of ${targetCollections.length} in process.`);
75+
76+
try {
77+
const hasIndexAlready = await db.collection(collectionName).indexExists(indexName);
78+
79+
if (hasIndexAlready) {
80+
await db.collection(collectionName).dropIndex(indexName);
81+
console.log(`Index ${indexName} dropped for ${collectionName}`);
82+
} else {
83+
console.log(`Index ${indexName} does not exist for ${collectionName}, skipping drop.`);
84+
}
85+
} catch (error) {
86+
console.error(`Error dropping index from ${collectionName}:`, error);
87+
}
88+
89+
currentCollectionNumber++;
90+
}
91+
}
92+
};
93+
94+
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/**
2+
* @file Add index on `payload.release` for all per-project events collections
3+
* Collections pattern: `events:{projectId}` in the events database
4+
*/
5+
module.exports = {
6+
async up(db) {
7+
const indexSpec = { 'payload.release': 1 };
8+
const indexOptions = {
9+
name: 'payloadRelease',
10+
background: true,
11+
sparse: true,
12+
};
13+
14+
const collections = await db.listCollections().toArray();
15+
const targetCollections = collections
16+
.map(c => c.name)
17+
.filter(name => name && name.startsWith('events:'));
18+
19+
console.log(`Found ${targetCollections.length} events collections`);
20+
21+
for (const name of targetCollections) {
22+
const coll = db.collection(name);
23+
const existing = await coll.indexes();
24+
25+
const alreadyExists = existing.some(idx => idx.name === indexOptions.name) ||
26+
existing.some(idx => idx.key && idx.key['payload.release'] === 1);
27+
28+
if (alreadyExists) {
29+
console.log(`Index already exists on ${name}. Skipped.`);
30+
continue;
31+
}
32+
33+
console.log(`Creating index ${indexOptions.name} on ${name}...`);
34+
await coll.createIndex(indexSpec, indexOptions);
35+
}
36+
},
37+
38+
async down(db) {
39+
const indexName = 'idx_payload_release';
40+
41+
const collections = await db.listCollections().toArray();
42+
const targetCollections = collections
43+
.map(c => c.name)
44+
.filter(name => name && name.startsWith('events:'));
45+
46+
console.log(`Found ${targetCollections.length} events collections`);
47+
48+
for (const name of targetCollections) {
49+
const coll = db.collection(name);
50+
51+
try {
52+
console.log(`Dropping index ${indexName} on ${name}...`);
53+
await coll.dropIndex(indexName);
54+
} catch (e) {
55+
// If index does not exist, ignore
56+
if (e && e.codeName !== 'IndexNotFound') {
57+
throw e;
58+
}
59+
}
60+
}
61+
},
62+
};
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/**
2+
* Convert all payload.release fields to strings across all per-project events collections.
3+
* Collections pattern: events:{projectId}
4+
*/
5+
module.exports = {
6+
async up(db) {
7+
const collections = await db.listCollections().toArray();
8+
const targetCollections = collections
9+
.map(c => c.name)
10+
.filter(name => name && name.startsWith('events:'));
11+
12+
console.log(`Found ${targetCollections.length} events collections to process`);
13+
14+
for (const name of targetCollections) {
15+
const coll = db.collection(name);
16+
17+
// Find docs where payload.release exists
18+
const cursor = coll.find(
19+
{ 'payload.release': { $exists: true } },
20+
{
21+
projection: {
22+
_id: 1,
23+
'payload.release': 1,
24+
},
25+
}
26+
);
27+
28+
let converted = 0;
29+
let scanned = 0;
30+
const ops = [];
31+
const BATCH_SIZE = 1000;
32+
33+
while (await cursor.hasNext()) {
34+
const doc = await cursor.next();
35+
36+
scanned++;
37+
const releaseValue = doc && doc.payload ? doc.payload.release : undefined;
38+
39+
if (typeof releaseValue !== 'string') {
40+
ops.push({
41+
updateOne: {
42+
filter: { _id: doc._id },
43+
update: { $set: { 'payload.release': String(releaseValue) } },
44+
},
45+
});
46+
converted++;
47+
}
48+
49+
if (ops.length >= BATCH_SIZE) {
50+
await coll.bulkWrite(ops, { ordered: false });
51+
ops.length = 0;
52+
}
53+
}
54+
55+
if (ops.length > 0) {
56+
await coll.bulkWrite(ops, { ordered: false });
57+
}
58+
59+
console.log(`[${name}] scanned=${scanned}, converted=${converted}`);
60+
}
61+
},
62+
63+
async down() {
64+
console.log('Down migration is not implemented: cannot reliably restore original non-string types for payload.release.');
65+
},
66+
};

workers/grouper/src/index.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,15 @@ export default class GrouperWorker extends Worker {
101101
public async handle(task: GroupWorkerTask<ErrorsCatcherType>): Promise<void> {
102102
let uniqueEventHash = await this.getUniqueEventHash(task);
103103

104+
// FIX RELEASE TYPE
105+
// TODO: REMOVE AFTER 01.01.2026, after the most of the users update to new js catcher
106+
if (task.payload && task.payload.release !== undefined) {
107+
task.payload = {
108+
...task.payload,
109+
release: String(task.payload.release)
110+
}
111+
}
112+
104113
/**
105114
* Find event by group hash.
106115
*/

0 commit comments

Comments
 (0)