From 05efae921a1a2e4ac7333298af0d12f944b271bc Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Mon, 22 Sep 2025 23:17:40 +0300 Subject: [PATCH 1/2] imp(): get rid of duplicated releases --- ...7000000-create-release-project-id-index.js | 105 ++++++++++++ workers/release/src/index.ts | 156 +++++++++--------- 2 files changed, 185 insertions(+), 76 deletions(-) create mode 100644 migrations/20250917000000-create-release-project-id-index.js diff --git a/migrations/20250917000000-create-release-project-id-index.js b/migrations/20250917000000-create-release-project-id-index.js new file mode 100644 index 00000000..9fee1934 --- /dev/null +++ b/migrations/20250917000000-create-release-project-id-index.js @@ -0,0 +1,105 @@ +const indexName = 'projectId_release_unique_idx'; +const collectionName = 'releases'; + +module.exports = { + async up(db) { + const pairs = await db.collection(collectionName).aggregate([ + { + $group: { + _id: { projectId: '$projectId', release: '$release' }, + count: { $sum: 1 }, + }, + }, + { $project: { _id: 0, projectId: '$_id.projectId', release: '$_id.release', count: 1 } }, + ]).toArray(); + + console.log(`Found ${pairs.length} unique (projectId, release) pairs to process.`); + + let processed = 0; + + for (const { projectId, release, count } of pairs) { + processed += 1; + console.log(`[${processed}/${pairs.length}] Processing projectId=${projectId}, release=${release} (docs: ${count})`); + + try { + const docs = await db.collection(collectionName) + .find({ projectId, release }, { projection: { files: 1, commits: 1 } }) + .toArray(); + + const filesByName = new Map(); + const commitsByHash = new Map(); + + for (const doc of docs) { + if (Array.isArray(doc.files)) { + for (const file of doc.files) { + /** + * Keep first occurrence if duplicates conflict + */ + if (file && typeof file === 'object' && file.mapFileName && !filesByName.has(file.mapFileName)) { + filesByName.set(file.mapFileName, file); + } + } + } + if (Array.isArray(doc.commits)) { + for (const commit of doc.commits) { + if (commit && typeof commit === 'object' && commit.hash && !commitsByHash.has(commit.hash)) { + commitsByHash.set(commit.hash, commit); + } + } + } + } + + const mergedFiles = Array.from(filesByName.values()); + const mergedCommits = Array.from(commitsByHash.values()); + + /** + * Replace all docs for this pair with a single consolidated doc + */ + const ops = [ + { deleteMany: { filter: { projectId, release } } }, + { insertOne: { document: { projectId, release, files: mergedFiles, commits: mergedCommits } } }, + ]; + + await db.collection(collectionName).bulkWrite(ops, { ordered: true }); + console.log(`Consolidated projectId=${projectId}, release=${release}: files=${mergedFiles.length}, commits=${mergedCommits.length}`); + } catch (err) { + console.error(`Error consolidating projectId=${projectId}, release=${release}:`, err); + } + } + + /** + * Create the unique compound index + */ + try { + const hasIndex = await db.collection(collectionName).indexExists(indexName); + if (!hasIndex) { + await db.collection(collectionName).createIndex( + { projectId: 1, release: 1 }, + { name: indexName, unique: true, background: true } + ); + console.log(`Index ${indexName} created on ${collectionName} (projectId, release unique).`); + } else { + console.log(`Index ${indexName} already exists on ${collectionName}.`); + } + } catch (err) { + console.error(`Error creating index ${indexName} on ${collectionName}:`, err); + } + + }, + + async down(db) { + console.log(`Dropping index ${indexName} from ${collectionName}...`); + try { + const hasIndex = await db.collection(collectionName).indexExists(indexName); + if (hasIndex) { + await db.collection(collectionName).dropIndex(indexName); + console.log(`Index ${indexName} dropped from ${collectionName}.`); + } else { + console.log(`Index ${indexName} does not exist on ${collectionName}, skipping drop.`); + } + } catch (err) { + console.error(`Error dropping index ${indexName} from ${collectionName}:`, err); + } + console.log('Down migration completed (data changes are not reverted).'); + }, +}; diff --git a/workers/release/src/index.ts b/workers/release/src/index.ts index 3359dda7..f4781b25 100644 --- a/workers/release/src/index.ts +++ b/workers/release/src/index.ts @@ -1,7 +1,7 @@ /** * This worker gets source map from the Registry and puts it to Mongo * to provide access for it for JS Worker - */ +*/ import { RawSourceMap } from 'source-map'; import { Readable } from 'stream'; import { DatabaseController } from '../../../lib/db/controller'; @@ -11,6 +11,12 @@ import * as pkg from '../package.json'; import { ReleaseWorkerTask, ReleaseWorkerAddReleasePayload, CommitDataUnparsed } from '../types'; import { Collection, MongoClient } from 'mongodb'; import { SourceMapDataExtended, SourceMapFileChunk, CommitData, SourcemapCollectedData, ReleaseDBScheme } from '@hawk.so/types'; + +/** + * Error code of MongoDB key duplication error + */ +const DB_DUPLICATE_KEY_ERROR = '11000'; + /** * Worker to save releases */ @@ -142,105 +148,103 @@ export default class ReleaseWorker extends Worker { * @param payload - source map data */ private async saveSourceMap(projectId: string, payload: ReleaseWorkerAddReleasePayload): Promise { + const files: SourceMapDataExtended[] = this.extendReleaseInfo(payload.files); + /** - * Start transaction to avoid race condition + * Use same transaction for read and related write operations */ - const session = await this.client.startSession(); - - try { - const files: SourceMapDataExtended[] = this.extendReleaseInfo(payload.files); + const existedRelease = await this.releasesCollection.findOne({ + projectId: projectId, + release: payload.release, + }); + /** + * Iterate all maps of the new release and save only new + */ + const savedFiles = await Promise.all(files.map(async (map: SourceMapDataExtended) => { /** - * Use same transaction for read and related write operations + * Skip already saved maps */ - await session.withTransaction(async () => { - const existedRelease = await this.releasesCollection.findOne({ - projectId: projectId, - release: payload.release, - }, { session }); - - /** - * Iterate all maps of the new release and save only new - */ - const savedFiles = await Promise.all(files.map(async (map: SourceMapDataExtended) => { - /** - * Skip already saved maps - */ - - const alreadySaved = existedRelease && existedRelease.files && existedRelease.files.find((savedFile) => { - return savedFile.mapFileName === map.mapFileName; - }); + const alreadySaved = existedRelease && existedRelease.files && !!existedRelease.files.find((savedFile) => { + return savedFile.mapFileName === map.mapFileName; + }); - if (alreadySaved) { - return; - } + if (alreadySaved) { + return; + } - try { - const fileInfo = await this.saveFile(map); - - /** - * Save id of saved file instead - */ - return { - ...map, - _id: fileInfo._id, - }; - } catch (error) { - this.logger.error(`Map ${map.mapFileName} was not saved: ${error}`); - } - })); + try { + const fileInfo = await this.saveFile(map); /** - * Filter undefined files and then prepare files that would be saved to releases table - * we do not need their content since it would be stored in gridFS + * Save id of saved file instead */ - const savedFilesWithoutContent: Omit[] = savedFiles.filter(file => { - return file !== undefined; - }).map(({ content, ...rest }) => { - return rest; - }); + return { + ...map, + _id: fileInfo._id, + }; + } catch (error) { + this.logger.error(`Map ${map.mapFileName} was not saved: ${error}`); + } + })); - /** - * Nothing to save: maps was previously saved - */ - if (savedFilesWithoutContent.length === 0) { - return; - } + /** + * Filter undefined files and then prepare files that would be saved to releases table + * we do not need their content since it would be stored in gridFS + */ + const savedFilesWithoutContent: Omit[] = savedFiles.filter(file => { + return file !== undefined; + }).map(({ content, ...rest }) => { + return rest; + }); - /** - * - insert new record with saved maps - * or - * - update previous record with adding new saved maps - */ - if (!existedRelease) { - this.logger.info('inserted new release'); + /** + * Nothing to save: maps was previously saved + */ + if (savedFilesWithoutContent.length === 0) { + return; + } + + try { + /** + * - insert new record with saved maps + * or + * - update previous record with adding new saved maps + */ + if (!existedRelease) { + this.logger.info('trying insert new release'); + + try { await this.releasesCollection.insertOne({ projectId: projectId, release: payload.release, files: savedFilesWithoutContent, - } as ReleaseDBScheme, { session }); + } as ReleaseDBScheme); + this.logger.info('inserted new release'); + } catch (err) { + if ((err).code === 11000) { + this.logger.warn(`Duplicate key on insert, retrying update after small delay`); + await new Promise(res => setTimeout(res, 200)); + } else { + throw err; + } } + } - await this.releasesCollection.findOneAndUpdate({ - projectId: projectId, - release: payload.release, - }, { - $push: { - files: { - $each: savedFilesWithoutContent, - }, + await this.releasesCollection.findOneAndUpdate({ + projectId: projectId, + release: payload.release, + }, { + $push: { + files: { + $each: savedFilesWithoutContent, }, - }, { session }); + }, }); } catch (error) { this.logger.error(`Can't extract release info:\n${JSON.stringify(error)}`); throw new NonCriticalError('Can\'t parse source-map file'); - } finally { - /** - * End transaction - */ - await session.endSession(); } } From 50a757b944bd4d288947365a7650e708518a5d97 Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Mon, 22 Sep 2025 23:26:39 +0300 Subject: [PATCH 2/2] chore(): eslint fix --- workers/release/src/index.ts | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/workers/release/src/index.ts b/workers/release/src/index.ts index f4781b25..4a46b63a 100644 --- a/workers/release/src/index.ts +++ b/workers/release/src/index.ts @@ -1,7 +1,7 @@ /** * This worker gets source map from the Registry and puts it to Mongo * to provide access for it for JS Worker -*/ + */ import { RawSourceMap } from 'source-map'; import { Readable } from 'stream'; import { DatabaseController } from '../../../lib/db/controller'; @@ -9,12 +9,13 @@ import { Worker } from '../../../lib/worker'; import { DatabaseReadWriteError, NonCriticalError } from '../../../lib/workerErrors'; import * as pkg from '../package.json'; import { ReleaseWorkerTask, ReleaseWorkerAddReleasePayload, CommitDataUnparsed } from '../types'; -import { Collection, MongoClient } from 'mongodb'; +import { Collection, MongoClient, MongoError } from 'mongodb'; import { SourceMapDataExtended, SourceMapFileChunk, CommitData, SourcemapCollectedData, ReleaseDBScheme } from '@hawk.so/types'; /** * Error code of MongoDB key duplication error */ +/* eslint-disable @typescript-eslint/no-magic-numbers */ const DB_DUPLICATE_KEY_ERROR = '11000'; /** @@ -213,7 +214,7 @@ export default class ReleaseWorker extends Worker { */ if (!existedRelease) { this.logger.info('trying insert new release'); - + try { await this.releasesCollection.insertOne({ projectId: projectId, @@ -222,9 +223,10 @@ export default class ReleaseWorker extends Worker { } as ReleaseDBScheme); this.logger.info('inserted new release'); } catch (err) { - if ((err).code === 11000) { + if ((err as MongoError).code.toString() === DB_DUPLICATE_KEY_ERROR) { this.logger.warn(`Duplicate key on insert, retrying update after small delay`); - await new Promise(res => setTimeout(res, 200)); + /* eslint-disable @typescript-eslint/no-magic-numbers */ + await new Promise(resolve => setTimeout(resolve, 200)); } else { throw err; }