From bb6fb79438c82d7bf700e15016ce2220a1d7ed3e Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Tue, 25 Mar 2025 18:49:22 +0300 Subject: [PATCH 1/6] imp(release): save map files in same transaction with real --- workers/release/src/index.ts | 136 ++++++++++++++++++++--------------- 1 file changed, 79 insertions(+), 57 deletions(-) diff --git a/workers/release/src/index.ts b/workers/release/src/index.ts index 60e42616..b3582e01 100644 --- a/workers/release/src/index.ts +++ b/workers/release/src/index.ts @@ -9,7 +9,7 @@ import { Worker } from '../../../lib/worker'; import { DatabaseReadWriteError, NonCriticalError } from '../../../lib/workerErrors'; import * as pkg from '../package.json'; import { ReleaseWorkerTask, ReleaseWorkerAddReleasePayload, CommitDataUnparsed } from '../types'; -import { Collection } from 'mongodb'; +import { Collection, MongoClient } from 'mongodb'; import { SourceMapDataExtended, SourceMapFileChunk, CommitData, SourcemapCollectedData, ReleaseDBScheme } from '@hawk.so/types'; /** * Worker to save releases @@ -36,10 +36,16 @@ export default class ReleaseWorker extends Worker { */ private releasesCollection: Collection; + /** + * Mongo client for events database, used for transactions + */ + private client: MongoClient = new MongoClient(process.env.MONGO_EVENTS_DATABASE_URI); + /** * Start consuming messages */ public async start(): Promise { + await this.client.connect(); await this.db.connect(); this.db.createGridFsBucket(this.dbCollectionName); this.releasesCollection = this.db.getConnection().collection(this.dbCollectionName); @@ -52,6 +58,7 @@ export default class ReleaseWorker extends Worker { public async finish(): Promise { await super.finish(); await this.db.close(); + await this.client.close(); } /** @@ -135,80 +142,90 @@ export default class ReleaseWorker extends Worker { * @param payload - source map data */ private async saveSourceMap(projectId: string, payload: ReleaseWorkerAddReleasePayload): Promise { + /** + * Start transaction to avoid race condition + */ + const session = await this.client.startSession(); + try { const files: SourceMapDataExtended[] = this.extendReleaseInfo(payload.files); - const existedRelease = await this.releasesCollection.findOne({ - projectId: projectId, - release: payload.release, - }); - /** - * Iterate all maps of the new release and save only new + * Use same transaction for read and related write operations */ - let savedFiles = await Promise.all(files.map(async (map: SourceMapDataExtended) => { + await session.withTransaction(async () => { + const existedRelease = await this.releasesCollection.findOne({ + projectId: projectId, + release: payload.release, + }, { session }); + /** - * Skip already saved maps + * Iterate all maps of the new release and save only new */ + let savedFiles = await Promise.all(files.map(async (map: SourceMapDataExtended) => { + /** + * Skip already saved maps + */ - const alreadySaved = existedRelease && existedRelease.files && existedRelease.files.find((savedFile) => { - return savedFile.mapFileName === map.mapFileName; - }); + const alreadySaved = existedRelease && existedRelease.files && existedRelease.files.find((savedFile) => { + return savedFile.mapFileName === map.mapFileName; + }); - if (alreadySaved) { - return; - } + if (alreadySaved) { + return; + } - try { - const fileInfo = await this.saveFile(map); + try { + const fileInfo = await this.saveFile(map); - /** - * Remove 'content' and save id of saved file instead - */ - map._id = fileInfo._id; - delete map.content; + /** + * Remove 'content' and save id of saved file instead + */ + map._id = fileInfo._id; + delete map.content; - return map; - } catch (error) { - this.logger.error(`Map ${map.mapFileName} was not saved: ${error}`); - } - })); + return map; + } catch (error) { + this.logger.error(`Map ${map.mapFileName} was not saved: ${error}`); + } + })); - /** - * Filter unsaved maps - */ - savedFiles = savedFiles.filter((file) => file !== undefined); + /** + * Filter unsaved maps + */ + savedFiles = savedFiles.filter((file) => file !== undefined); - /** - * Nothing to save: maps was previously saved - */ - if (!savedFiles) { - return; - } + /** + * Nothing to save: maps was previously saved + */ + if (savedFiles.length === 0) { + return; + } - /** - * - insert new record with saved maps - * or - * - update previous record with adding new saved maps - */ - if (!existedRelease) { - this.logger.info('inserted new release'); - await this.releasesCollection.insertOne({ + /** + * - insert new record with saved maps + * or + * - update previous record with adding new saved maps + */ + if (!existedRelease) { + this.logger.info('inserted new release'); + await this.releasesCollection.insertOne({ + projectId: projectId, + release: payload.release, + files: savedFiles as SourceMapDataExtended[], + } as ReleaseDBScheme, { session }); + } + + await this.releasesCollection.findOneAndUpdate({ projectId: projectId, release: payload.release, - files: savedFiles as SourceMapDataExtended[], - } as ReleaseDBScheme); - } - - await this.releasesCollection.findOneAndUpdate({ - projectId: projectId, - release: payload.release, - }, { - $push: { - files: { - $each: savedFiles as SourceMapDataExtended[], + }, { + $push: { + files: { + $each: savedFiles as SourceMapDataExtended[], + }, }, - }, + }, { session }); }); } catch (error) { this.logger.error('Can\'t extract release info:\n', { @@ -216,6 +233,11 @@ export default class ReleaseWorker extends Worker { }); throw new NonCriticalError('Can\'t parse source-map file'); + } finally { + /** + * End transaction + */ + // await session.endSession(); } } From f0cf26679bd2c0b7ce80d896276ced84f83bb4dc Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Tue, 25 Mar 2025 18:49:41 +0300 Subject: [PATCH 2/6] test(release): test release with many map files --- workers/release/tests/index.test.ts | 49 +++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/workers/release/tests/index.test.ts b/workers/release/tests/index.test.ts index 245c63ce..33dd55b6 100644 --- a/workers/release/tests/index.test.ts +++ b/workers/release/tests/index.test.ts @@ -205,6 +205,55 @@ describe('Release Worker', () => { await expect(count).toEqual(1); }); + test('should correctly handle release with multiple source maps in a single transaction', async () => { + const map = await mockBundle.getSourceMap(); + + /** + * Create multiple files with the same content + */ + const numberOfFiles = 10; + const collectedData: SourcemapCollectedData[] = Array(numberOfFiles).fill(null).map((_, index) => ({ + name: `main${index}.js.map`, + payload: map, + })); + + await worker.handle({ + projectId, + type: 'add-release', + payload: { + ...releasePayload, + files: collectedData, + }, + }); + + /** + * Check that only one release document was created + */ + const releasesCount = await collection.countDocuments({ + projectId: projectId, + release: releasePayload.release, + }); + await expect(releasesCount).toEqual(1); + + /** + * Check that all files were saved + */ + const release = await collection.findOne({ + projectId: projectId, + release: releasePayload.release, + }); + await expect(release.files).toHaveLength(numberOfFiles); + + /** + * Verify GridFS chunks were created for each file + */ + const releasesChunksCount = await db.collection('releases.chunks').countDocuments(); + const releasesFilesCount = await db.collection('releases.files').countDocuments(); + + await expect(releasesChunksCount).toEqual(numberOfFiles); + await expect(releasesFilesCount).toEqual(numberOfFiles); + }); + /** * @todo add test for case with several source maps in a single release */ From 719ecaf0faf6a59bb71f5fc206654708cbee0a11 Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Tue, 25 Mar 2025 21:09:49 +0300 Subject: [PATCH 3/6] test(release): setup repl set for release worker tests --- jest-mongodb-config.js | 2 ++ workers/release/tests/index.test.ts | 7 +++++++ 2 files changed, 9 insertions(+) diff --git a/jest-mongodb-config.js b/jest-mongodb-config.js index 0d0f7462..07979637 100644 --- a/jest-mongodb-config.js +++ b/jest-mongodb-config.js @@ -3,6 +3,8 @@ module.exports = { instance: { port: 55010, dbName: 'hawk', + replSet: 'rs0', + storageEngine: 'wiredTiger', }, binary: { version: '6.0.2', diff --git a/workers/release/tests/index.test.ts b/workers/release/tests/index.test.ts index 33dd55b6..f3f52dff 100644 --- a/workers/release/tests/index.test.ts +++ b/workers/release/tests/index.test.ts @@ -71,6 +71,13 @@ describe('Release Worker', () => { collection = await db.collection('releases'); await mockBundle.build(); + + try { + await db.admin().command({ replSetInitiate: {} }); + console.log('✅ Replica set initiated'); + } catch (err) { + console.error('❌ Failed to initiate replica set:', err); + } }); /** From fd62fcabef054e6d1ec019d95297c165459c43d3 Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sun, 30 Mar 2025 19:11:35 +0300 Subject: [PATCH 4/6] test(global): use mongo with replica set for tests --- jest.config.js | 2 +- jest.setup.mongo-repl-set.js | 45 +++++++++++++++++++++++++++++ workers/release/tests/index.test.ts | 11 ------- 3 files changed, 46 insertions(+), 12 deletions(-) create mode 100644 jest.setup.mongo-repl-set.js diff --git a/jest.config.js b/jest.config.js index 38d9f309..28aa802a 100644 --- a/jest.config.js +++ b/jest.config.js @@ -27,5 +27,5 @@ module.exports = { setupFiles: [ './jest.setup.js' ], - setupFilesAfterEnv: [ './jest.setup.redis-mock.js' ], + setupFilesAfterEnv: [ './jest.setup.redis-mock.js', './jest.setup.mongo-repl-set.js' ], }; diff --git a/jest.setup.mongo-repl-set.js b/jest.setup.mongo-repl-set.js new file mode 100644 index 00000000..ab1365ba --- /dev/null +++ b/jest.setup.mongo-repl-set.js @@ -0,0 +1,45 @@ +const { MongoClient } = require('mongodb'); + +let admin; +let connection; + +beforeAll(async () => { + connection = await MongoClient.connect("mongodb://127.0.0.1:55010/hawk?", { + useNewUrlParser: true, + useUnifiedTopology: true, + }); + + admin = connection.db().admin(); + + try { + let status = await admin.command({ replSetGetStatus: 1 }).catch(() => null); + if (status && status.ok) { + console.log("✅ Replica set already initialized"); + } else { + await admin.command({ replSetInitiate: {} }); + console.log("✅ Replica set initiated"); + } + + const startTime = Date.now(); + const timeout = 15000; + + /** + * Wait for the replica set to initialize all nodes + */ + do { + await new Promise(resolve => setTimeout(resolve, 1000)); + status = await admin.command({ replSetGetStatus: 1 }); + + const primary = status.members.find(member => member.stateStr === "PRIMARY"); + const secondary = status.members.find(member => member.stateStr === "SECONDARY"); + + if (primary && secondary) break; + } while (Date.now() - startTime < timeout); + + + console.log("✅ Replica set is stable"); + + } catch (err) { + console.error('❌ Failed to initiate replica set:', err); + } +}, 30000); diff --git a/workers/release/tests/index.test.ts b/workers/release/tests/index.test.ts index f3f52dff..fbee58d8 100644 --- a/workers/release/tests/index.test.ts +++ b/workers/release/tests/index.test.ts @@ -71,13 +71,6 @@ describe('Release Worker', () => { collection = await db.collection('releases'); await mockBundle.build(); - - try { - await db.admin().command({ replSetInitiate: {} }); - console.log('✅ Replica set initiated'); - } catch (err) { - console.error('❌ Failed to initiate replica set:', err); - } }); /** @@ -260,8 +253,4 @@ describe('Release Worker', () => { await expect(releasesChunksCount).toEqual(numberOfFiles); await expect(releasesFilesCount).toEqual(numberOfFiles); }); - - /** - * @todo add test for case with several source maps in a single release - */ }); From 5e77a2ab2df50a75fe32f5ff648d056b406e513f Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sun, 30 Mar 2025 19:49:58 +0300 Subject: [PATCH 5/6] chore(test): close connection to avoid open handle --- jest.setup.mongo-repl-set.js | 4 ++++ workers/release/tests/index.test.ts | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/jest.setup.mongo-repl-set.js b/jest.setup.mongo-repl-set.js index ab1365ba..11480eba 100644 --- a/jest.setup.mongo-repl-set.js +++ b/jest.setup.mongo-repl-set.js @@ -43,3 +43,7 @@ beforeAll(async () => { console.error('❌ Failed to initiate replica set:', err); } }, 30000); + +afterAll(async () => { + await connection.close(); +}); diff --git a/workers/release/tests/index.test.ts b/workers/release/tests/index.test.ts index fbee58d8..f80345d2 100644 --- a/workers/release/tests/index.test.ts +++ b/workers/release/tests/index.test.ts @@ -82,7 +82,7 @@ describe('Release Worker', () => { await db.collection('releases.files').deleteMany({}); await worker.finish(); - connection.close(); + await connection.close(); await mockBundle.clear(); }); From 05f9c9da43d0b0bb779461b5cc182120452db3c4 Mon Sep 17 00:00:00 2001 From: e11sy <130844513+e11sy@users.noreply.github.com> Date: Sun, 30 Mar 2025 19:58:43 +0300 Subject: [PATCH 6/6] chore(release): uncomment endSession --- workers/release/src/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workers/release/src/index.ts b/workers/release/src/index.ts index b3582e01..3899ee55 100644 --- a/workers/release/src/index.ts +++ b/workers/release/src/index.ts @@ -237,7 +237,7 @@ export default class ReleaseWorker extends Worker { /** * End transaction */ - // await session.endSession(); + await session.endSession(); } }