diff --git a/jest-mongodb-config.js b/jest-mongodb-config.js index 0d0f7462..07979637 100644 --- a/jest-mongodb-config.js +++ b/jest-mongodb-config.js @@ -3,6 +3,8 @@ module.exports = { instance: { port: 55010, dbName: 'hawk', + replSet: 'rs0', + storageEngine: 'wiredTiger', }, binary: { version: '6.0.2', diff --git a/jest.config.js b/jest.config.js index 38d9f309..28aa802a 100644 --- a/jest.config.js +++ b/jest.config.js @@ -27,5 +27,5 @@ module.exports = { setupFiles: [ './jest.setup.js' ], - setupFilesAfterEnv: [ './jest.setup.redis-mock.js' ], + setupFilesAfterEnv: [ './jest.setup.redis-mock.js', './jest.setup.mongo-repl-set.js' ], }; diff --git a/jest.setup.mongo-repl-set.js b/jest.setup.mongo-repl-set.js new file mode 100644 index 00000000..11480eba --- /dev/null +++ b/jest.setup.mongo-repl-set.js @@ -0,0 +1,49 @@ +const { MongoClient } = require('mongodb'); + +let admin; +let connection; + +beforeAll(async () => { + connection = await MongoClient.connect("mongodb://127.0.0.1:55010/hawk?", { + useNewUrlParser: true, + useUnifiedTopology: true, + }); + + admin = connection.db().admin(); + + try { + let status = await admin.command({ replSetGetStatus: 1 }).catch(() => null); + if (status && status.ok) { + console.log("✅ Replica set already initialized"); + } else { + await admin.command({ replSetInitiate: {} }); + console.log("✅ Replica set initiated"); + } + + const startTime = Date.now(); + const timeout = 15000; + + /** + * Wait for the replica set to initialize all nodes + */ + do { + await new Promise(resolve => setTimeout(resolve, 1000)); + status = await admin.command({ replSetGetStatus: 1 }); + + const primary = status.members.find(member => member.stateStr === "PRIMARY"); + const secondary = status.members.find(member => member.stateStr === "SECONDARY"); + + if (primary && secondary) break; + } while (Date.now() - startTime < timeout); + + + console.log("✅ Replica set is stable"); + + } catch (err) { + console.error('❌ Failed to initiate replica set:', err); + } +}, 30000); + +afterAll(async () => { + await connection.close(); +}); diff --git a/workers/release/src/index.ts b/workers/release/src/index.ts index 60e42616..3899ee55 100644 --- a/workers/release/src/index.ts +++ b/workers/release/src/index.ts @@ -9,7 +9,7 @@ import { Worker } from '../../../lib/worker'; import { DatabaseReadWriteError, NonCriticalError } from '../../../lib/workerErrors'; import * as pkg from '../package.json'; import { ReleaseWorkerTask, ReleaseWorkerAddReleasePayload, CommitDataUnparsed } from '../types'; -import { Collection } from 'mongodb'; +import { Collection, MongoClient } from 'mongodb'; import { SourceMapDataExtended, SourceMapFileChunk, CommitData, SourcemapCollectedData, ReleaseDBScheme } from '@hawk.so/types'; /** * Worker to save releases @@ -36,10 +36,16 @@ export default class ReleaseWorker extends Worker { */ private releasesCollection: Collection; + /** + * Mongo client for events database, used for transactions + */ + private client: MongoClient = new MongoClient(process.env.MONGO_EVENTS_DATABASE_URI); + /** * Start consuming messages */ public async start(): Promise { + await this.client.connect(); await this.db.connect(); this.db.createGridFsBucket(this.dbCollectionName); this.releasesCollection = this.db.getConnection().collection(this.dbCollectionName); @@ -52,6 +58,7 @@ export default class ReleaseWorker extends Worker { public async finish(): Promise { await super.finish(); await this.db.close(); + await this.client.close(); } /** @@ -135,80 +142,90 @@ export default class ReleaseWorker extends Worker { * @param payload - source map data */ private async saveSourceMap(projectId: string, payload: ReleaseWorkerAddReleasePayload): Promise { + /** + * Start transaction to avoid race condition + */ + const session = await this.client.startSession(); + try { const files: SourceMapDataExtended[] = this.extendReleaseInfo(payload.files); - const existedRelease = await this.releasesCollection.findOne({ - projectId: projectId, - release: payload.release, - }); - /** - * Iterate all maps of the new release and save only new + * Use same transaction for read and related write operations */ - let savedFiles = await Promise.all(files.map(async (map: SourceMapDataExtended) => { + await session.withTransaction(async () => { + const existedRelease = await this.releasesCollection.findOne({ + projectId: projectId, + release: payload.release, + }, { session }); + /** - * Skip already saved maps + * Iterate all maps of the new release and save only new */ + let savedFiles = await Promise.all(files.map(async (map: SourceMapDataExtended) => { + /** + * Skip already saved maps + */ - const alreadySaved = existedRelease && existedRelease.files && existedRelease.files.find((savedFile) => { - return savedFile.mapFileName === map.mapFileName; - }); + const alreadySaved = existedRelease && existedRelease.files && existedRelease.files.find((savedFile) => { + return savedFile.mapFileName === map.mapFileName; + }); - if (alreadySaved) { - return; - } + if (alreadySaved) { + return; + } - try { - const fileInfo = await this.saveFile(map); + try { + const fileInfo = await this.saveFile(map); - /** - * Remove 'content' and save id of saved file instead - */ - map._id = fileInfo._id; - delete map.content; + /** + * Remove 'content' and save id of saved file instead + */ + map._id = fileInfo._id; + delete map.content; - return map; - } catch (error) { - this.logger.error(`Map ${map.mapFileName} was not saved: ${error}`); - } - })); + return map; + } catch (error) { + this.logger.error(`Map ${map.mapFileName} was not saved: ${error}`); + } + })); - /** - * Filter unsaved maps - */ - savedFiles = savedFiles.filter((file) => file !== undefined); + /** + * Filter unsaved maps + */ + savedFiles = savedFiles.filter((file) => file !== undefined); - /** - * Nothing to save: maps was previously saved - */ - if (!savedFiles) { - return; - } + /** + * Nothing to save: maps was previously saved + */ + if (savedFiles.length === 0) { + return; + } - /** - * - insert new record with saved maps - * or - * - update previous record with adding new saved maps - */ - if (!existedRelease) { - this.logger.info('inserted new release'); - await this.releasesCollection.insertOne({ + /** + * - insert new record with saved maps + * or + * - update previous record with adding new saved maps + */ + if (!existedRelease) { + this.logger.info('inserted new release'); + await this.releasesCollection.insertOne({ + projectId: projectId, + release: payload.release, + files: savedFiles as SourceMapDataExtended[], + } as ReleaseDBScheme, { session }); + } + + await this.releasesCollection.findOneAndUpdate({ projectId: projectId, release: payload.release, - files: savedFiles as SourceMapDataExtended[], - } as ReleaseDBScheme); - } - - await this.releasesCollection.findOneAndUpdate({ - projectId: projectId, - release: payload.release, - }, { - $push: { - files: { - $each: savedFiles as SourceMapDataExtended[], + }, { + $push: { + files: { + $each: savedFiles as SourceMapDataExtended[], + }, }, - }, + }, { session }); }); } catch (error) { this.logger.error('Can\'t extract release info:\n', { @@ -216,6 +233,11 @@ export default class ReleaseWorker extends Worker { }); throw new NonCriticalError('Can\'t parse source-map file'); + } finally { + /** + * End transaction + */ + await session.endSession(); } } diff --git a/workers/release/tests/index.test.ts b/workers/release/tests/index.test.ts index 245c63ce..f80345d2 100644 --- a/workers/release/tests/index.test.ts +++ b/workers/release/tests/index.test.ts @@ -82,7 +82,7 @@ describe('Release Worker', () => { await db.collection('releases.files').deleteMany({}); await worker.finish(); - connection.close(); + await connection.close(); await mockBundle.clear(); }); @@ -205,7 +205,52 @@ describe('Release Worker', () => { await expect(count).toEqual(1); }); - /** - * @todo add test for case with several source maps in a single release - */ + test('should correctly handle release with multiple source maps in a single transaction', async () => { + const map = await mockBundle.getSourceMap(); + + /** + * Create multiple files with the same content + */ + const numberOfFiles = 10; + const collectedData: SourcemapCollectedData[] = Array(numberOfFiles).fill(null).map((_, index) => ({ + name: `main${index}.js.map`, + payload: map, + })); + + await worker.handle({ + projectId, + type: 'add-release', + payload: { + ...releasePayload, + files: collectedData, + }, + }); + + /** + * Check that only one release document was created + */ + const releasesCount = await collection.countDocuments({ + projectId: projectId, + release: releasePayload.release, + }); + await expect(releasesCount).toEqual(1); + + /** + * Check that all files were saved + */ + const release = await collection.findOne({ + projectId: projectId, + release: releasePayload.release, + }); + await expect(release.files).toHaveLength(numberOfFiles); + + /** + * Verify GridFS chunks were created for each file + */ + const releasesChunksCount = await db.collection('releases.chunks').countDocuments(); + const releasesFilesCount = await db.collection('releases.files').countDocuments(); + + await expect(releasesChunksCount).toEqual(numberOfFiles); + await expect(releasesFilesCount).toEqual(numberOfFiles); + }); });