Skip to content

Commit 1ec0662

Browse files
authored
imp(): get rid of duplicated releases (#462)
* imp(): get rid of duplicated releases * chore(): eslint fix
1 parent 9556883 commit 1ec0662

2 files changed

Lines changed: 187 additions & 76 deletions

File tree

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
const indexName = 'projectId_release_unique_idx';
2+
const collectionName = 'releases';
3+
4+
module.exports = {
5+
async up(db) {
6+
const pairs = await db.collection(collectionName).aggregate([
7+
{
8+
$group: {
9+
_id: { projectId: '$projectId', release: '$release' },
10+
count: { $sum: 1 },
11+
},
12+
},
13+
{ $project: { _id: 0, projectId: '$_id.projectId', release: '$_id.release', count: 1 } },
14+
]).toArray();
15+
16+
console.log(`Found ${pairs.length} unique (projectId, release) pairs to process.`);
17+
18+
let processed = 0;
19+
20+
for (const { projectId, release, count } of pairs) {
21+
processed += 1;
22+
console.log(`[${processed}/${pairs.length}] Processing projectId=${projectId}, release=${release} (docs: ${count})`);
23+
24+
try {
25+
const docs = await db.collection(collectionName)
26+
.find({ projectId, release }, { projection: { files: 1, commits: 1 } })
27+
.toArray();
28+
29+
const filesByName = new Map();
30+
const commitsByHash = new Map();
31+
32+
for (const doc of docs) {
33+
if (Array.isArray(doc.files)) {
34+
for (const file of doc.files) {
35+
/**
36+
* Keep first occurrence if duplicates conflict
37+
*/
38+
if (file && typeof file === 'object' && file.mapFileName && !filesByName.has(file.mapFileName)) {
39+
filesByName.set(file.mapFileName, file);
40+
}
41+
}
42+
}
43+
if (Array.isArray(doc.commits)) {
44+
for (const commit of doc.commits) {
45+
if (commit && typeof commit === 'object' && commit.hash && !commitsByHash.has(commit.hash)) {
46+
commitsByHash.set(commit.hash, commit);
47+
}
48+
}
49+
}
50+
}
51+
52+
const mergedFiles = Array.from(filesByName.values());
53+
const mergedCommits = Array.from(commitsByHash.values());
54+
55+
/**
56+
* Replace all docs for this pair with a single consolidated doc
57+
*/
58+
const ops = [
59+
{ deleteMany: { filter: { projectId, release } } },
60+
{ insertOne: { document: { projectId, release, files: mergedFiles, commits: mergedCommits } } },
61+
];
62+
63+
await db.collection(collectionName).bulkWrite(ops, { ordered: true });
64+
console.log(`Consolidated projectId=${projectId}, release=${release}: files=${mergedFiles.length}, commits=${mergedCommits.length}`);
65+
} catch (err) {
66+
console.error(`Error consolidating projectId=${projectId}, release=${release}:`, err);
67+
}
68+
}
69+
70+
/**
71+
* Create the unique compound index
72+
*/
73+
try {
74+
const hasIndex = await db.collection(collectionName).indexExists(indexName);
75+
if (!hasIndex) {
76+
await db.collection(collectionName).createIndex(
77+
{ projectId: 1, release: 1 },
78+
{ name: indexName, unique: true, background: true }
79+
);
80+
console.log(`Index ${indexName} created on ${collectionName} (projectId, release unique).`);
81+
} else {
82+
console.log(`Index ${indexName} already exists on ${collectionName}.`);
83+
}
84+
} catch (err) {
85+
console.error(`Error creating index ${indexName} on ${collectionName}:`, err);
86+
}
87+
88+
},
89+
90+
async down(db) {
91+
console.log(`Dropping index ${indexName} from ${collectionName}...`);
92+
try {
93+
const hasIndex = await db.collection(collectionName).indexExists(indexName);
94+
if (hasIndex) {
95+
await db.collection(collectionName).dropIndex(indexName);
96+
console.log(`Index ${indexName} dropped from ${collectionName}.`);
97+
} else {
98+
console.log(`Index ${indexName} does not exist on ${collectionName}, skipping drop.`);
99+
}
100+
} catch (err) {
101+
console.error(`Error dropping index ${indexName} from ${collectionName}:`, err);
102+
}
103+
console.log('Down migration completed (data changes are not reverted).');
104+
},
105+
};

workers/release/src/index.ts

Lines changed: 82 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,15 @@ import { Worker } from '../../../lib/worker';
99
import { DatabaseReadWriteError, NonCriticalError } from '../../../lib/workerErrors';
1010
import * as pkg from '../package.json';
1111
import { ReleaseWorkerTask, ReleaseWorkerAddReleasePayload, CommitDataUnparsed } from '../types';
12-
import { Collection, MongoClient } from 'mongodb';
12+
import { Collection, MongoClient, MongoError } from 'mongodb';
1313
import { SourceMapDataExtended, SourceMapFileChunk, CommitData, SourcemapCollectedData, ReleaseDBScheme } from '@hawk.so/types';
14+
15+
/**
16+
* Error code of MongoDB key duplication error
17+
*/
18+
/* eslint-disable @typescript-eslint/no-magic-numbers */
19+
const DB_DUPLICATE_KEY_ERROR = '11000';
20+
1421
/**
1522
* Worker to save releases
1623
*/
@@ -142,105 +149,104 @@ export default class ReleaseWorker extends Worker {
142149
* @param payload - source map data
143150
*/
144151
private async saveSourceMap(projectId: string, payload: ReleaseWorkerAddReleasePayload): Promise<void> {
152+
const files: SourceMapDataExtended[] = this.extendReleaseInfo(payload.files);
153+
145154
/**
146-
* Start transaction to avoid race condition
155+
* Use same transaction for read and related write operations
147156
*/
148-
const session = await this.client.startSession();
149-
150-
try {
151-
const files: SourceMapDataExtended[] = this.extendReleaseInfo(payload.files);
157+
const existedRelease = await this.releasesCollection.findOne({
158+
projectId: projectId,
159+
release: payload.release,
160+
});
152161

162+
/**
163+
* Iterate all maps of the new release and save only new
164+
*/
165+
const savedFiles = await Promise.all(files.map(async (map: SourceMapDataExtended) => {
153166
/**
154-
* Use same transaction for read and related write operations
167+
* Skip already saved maps
155168
*/
156-
await session.withTransaction(async () => {
157-
const existedRelease = await this.releasesCollection.findOne({
158-
projectId: projectId,
159-
release: payload.release,
160-
}, { session });
161-
162-
/**
163-
* Iterate all maps of the new release and save only new
164-
*/
165-
const savedFiles = await Promise.all(files.map(async (map: SourceMapDataExtended) => {
166-
/**
167-
* Skip already saved maps
168-
*/
169-
170-
const alreadySaved = existedRelease && existedRelease.files && existedRelease.files.find((savedFile) => {
171-
return savedFile.mapFileName === map.mapFileName;
172-
});
169+
const alreadySaved = existedRelease && existedRelease.files && !!existedRelease.files.find((savedFile) => {
170+
return savedFile.mapFileName === map.mapFileName;
171+
});
173172

174-
if (alreadySaved) {
175-
return;
176-
}
173+
if (alreadySaved) {
174+
return;
175+
}
177176

178-
try {
179-
const fileInfo = await this.saveFile(map);
180-
181-
/**
182-
* Save id of saved file instead
183-
*/
184-
return {
185-
...map,
186-
_id: fileInfo._id,
187-
};
188-
} catch (error) {
189-
this.logger.error(`Map ${map.mapFileName} was not saved: ${error}`);
190-
}
191-
}));
177+
try {
178+
const fileInfo = await this.saveFile(map);
192179

193180
/**
194-
* Filter undefined files and then prepare files that would be saved to releases table
195-
* we do not need their content since it would be stored in gridFS
181+
* Save id of saved file instead
196182
*/
197-
const savedFilesWithoutContent: Omit<SourceMapDataExtended, 'content'>[] = savedFiles.filter(file => {
198-
return file !== undefined;
199-
}).map(({ content, ...rest }) => {
200-
return rest;
201-
});
183+
return {
184+
...map,
185+
_id: fileInfo._id,
186+
};
187+
} catch (error) {
188+
this.logger.error(`Map ${map.mapFileName} was not saved: ${error}`);
189+
}
190+
}));
202191

203-
/**
204-
* Nothing to save: maps was previously saved
205-
*/
206-
if (savedFilesWithoutContent.length === 0) {
207-
return;
208-
}
192+
/**
193+
* Filter undefined files and then prepare files that would be saved to releases table
194+
* we do not need their content since it would be stored in gridFS
195+
*/
196+
const savedFilesWithoutContent: Omit<SourceMapDataExtended, 'content'>[] = savedFiles.filter(file => {
197+
return file !== undefined;
198+
}).map(({ content, ...rest }) => {
199+
return rest;
200+
});
209201

210-
/**
211-
* - insert new record with saved maps
212-
* or
213-
* - update previous record with adding new saved maps
214-
*/
215-
if (!existedRelease) {
216-
this.logger.info('inserted new release');
202+
/**
203+
* Nothing to save: maps was previously saved
204+
*/
205+
if (savedFilesWithoutContent.length === 0) {
206+
return;
207+
}
208+
209+
try {
210+
/**
211+
* - insert new record with saved maps
212+
* or
213+
* - update previous record with adding new saved maps
214+
*/
215+
if (!existedRelease) {
216+
this.logger.info('trying insert new release');
217+
218+
try {
217219
await this.releasesCollection.insertOne({
218220
projectId: projectId,
219221
release: payload.release,
220222
files: savedFilesWithoutContent,
221-
} as ReleaseDBScheme, { session });
223+
} as ReleaseDBScheme);
224+
this.logger.info('inserted new release');
225+
} catch (err) {
226+
if ((err as MongoError).code.toString() === DB_DUPLICATE_KEY_ERROR) {
227+
this.logger.warn(`Duplicate key on insert, retrying update after small delay`);
228+
/* eslint-disable @typescript-eslint/no-magic-numbers */
229+
await new Promise(resolve => setTimeout(resolve, 200));
230+
} else {
231+
throw err;
232+
}
222233
}
234+
}
223235

224-
await this.releasesCollection.findOneAndUpdate({
225-
projectId: projectId,
226-
release: payload.release,
227-
}, {
228-
$push: {
229-
files: {
230-
$each: savedFilesWithoutContent,
231-
},
236+
await this.releasesCollection.findOneAndUpdate({
237+
projectId: projectId,
238+
release: payload.release,
239+
}, {
240+
$push: {
241+
files: {
242+
$each: savedFilesWithoutContent,
232243
},
233-
}, { session });
244+
},
234245
});
235246
} catch (error) {
236247
this.logger.error(`Can't extract release info:\n${JSON.stringify(error)}`);
237248

238249
throw new NonCriticalError('Can\'t parse source-map file');
239-
} finally {
240-
/**
241-
* End transaction
242-
*/
243-
await session.endSession();
244250
}
245251
}
246252

0 commit comments

Comments
 (0)