Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions jest-mongodb-config.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ module.exports = {
instance: {
port: 55010,
dbName: 'hawk',
replSet: 'rs0',
storageEngine: 'wiredTiger',
},
binary: {
version: '6.0.2',
Expand Down
2 changes: 1 addition & 1 deletion jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ module.exports = {

setupFiles: [ './jest.setup.js' ],

setupFilesAfterEnv: [ './jest.setup.redis-mock.js' ],
setupFilesAfterEnv: [ './jest.setup.redis-mock.js', './jest.setup.mongo-repl-set.js' ],
};
49 changes: 49 additions & 0 deletions jest.setup.mongo-repl-set.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
const { MongoClient } = require('mongodb');

let admin;
let connection;

beforeAll(async () => {
connection = await MongoClient.connect("mongodb://127.0.0.1:55010/hawk?", {
useNewUrlParser: true,
useUnifiedTopology: true,
});

admin = connection.db().admin();

try {
let status = await admin.command({ replSetGetStatus: 1 }).catch(() => null);
if (status && status.ok) {
console.log("✅ Replica set already initialized");
} else {
await admin.command({ replSetInitiate: {} });
console.log("✅ Replica set initiated");
}

const startTime = Date.now();
const timeout = 15000;

/**
* Wait for the replica set to initialize all nodes
*/
do {
await new Promise(resolve => setTimeout(resolve, 1000));
status = await admin.command({ replSetGetStatus: 1 });

const primary = status.members.find(member => member.stateStr === "PRIMARY");
const secondary = status.members.find(member => member.stateStr === "SECONDARY");

if (primary && secondary) break;
} while (Date.now() - startTime < timeout);


console.log("✅ Replica set is stable");

} catch (err) {
console.error('❌ Failed to initiate replica set:', err);
}
}, 30000);

afterAll(async () => {
await connection.close();
});
136 changes: 79 additions & 57 deletions workers/release/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { Worker } from '../../../lib/worker';
import { DatabaseReadWriteError, NonCriticalError } from '../../../lib/workerErrors';
import * as pkg from '../package.json';
import { ReleaseWorkerTask, ReleaseWorkerAddReleasePayload, CommitDataUnparsed } from '../types';
import { Collection } from 'mongodb';
import { Collection, MongoClient } from 'mongodb';
import { SourceMapDataExtended, SourceMapFileChunk, CommitData, SourcemapCollectedData, ReleaseDBScheme } from '@hawk.so/types';
/**
* Worker to save releases
Expand All @@ -36,10 +36,16 @@ export default class ReleaseWorker extends Worker {
*/
private releasesCollection: Collection<ReleaseDBScheme>;

/**
* Mongo client for events database, used for transactions
*/
private client: MongoClient = new MongoClient(process.env.MONGO_EVENTS_DATABASE_URI);

/**
* Start consuming messages
*/
public async start(): Promise<void> {
await this.client.connect();
await this.db.connect();
this.db.createGridFsBucket(this.dbCollectionName);
this.releasesCollection = this.db.getConnection().collection(this.dbCollectionName);
Expand All @@ -52,6 +58,7 @@ export default class ReleaseWorker extends Worker {
public async finish(): Promise<void> {
await super.finish();
await this.db.close();
await this.client.close();
}

/**
Expand Down Expand Up @@ -135,87 +142,102 @@ export default class ReleaseWorker extends Worker {
* @param payload - source map data
*/
private async saveSourceMap(projectId: string, payload: ReleaseWorkerAddReleasePayload): Promise<void> {
/**
* Start transaction to avoid race condition
*/
const session = await this.client.startSession();

try {
const files: SourceMapDataExtended[] = this.extendReleaseInfo(payload.files);

const existedRelease = await this.releasesCollection.findOne({
projectId: projectId,
release: payload.release,
});

/**
* Iterate all maps of the new release and save only new
* Use same transaction for read and related write operations
*/
let savedFiles = await Promise.all(files.map(async (map: SourceMapDataExtended) => {
await session.withTransaction(async () => {
const existedRelease = await this.releasesCollection.findOne({
projectId: projectId,
release: payload.release,
}, { session });

/**
* Skip already saved maps
* Iterate all maps of the new release and save only new
*/
let savedFiles = await Promise.all(files.map(async (map: SourceMapDataExtended) => {
/**
* Skip already saved maps
*/

const alreadySaved = existedRelease && existedRelease.files && existedRelease.files.find((savedFile) => {
return savedFile.mapFileName === map.mapFileName;
});
const alreadySaved = existedRelease && existedRelease.files && existedRelease.files.find((savedFile) => {
return savedFile.mapFileName === map.mapFileName;
});

if (alreadySaved) {
return;
}
if (alreadySaved) {
return;
}

try {
const fileInfo = await this.saveFile(map);
try {
const fileInfo = await this.saveFile(map);

/**
* Remove 'content' and save id of saved file instead
*/
map._id = fileInfo._id;
delete map.content;
/**
* Remove 'content' and save id of saved file instead
*/
map._id = fileInfo._id;
delete map.content;

return map;
} catch (error) {
this.logger.error(`Map ${map.mapFileName} was not saved: ${error}`);
}
}));
return map;
} catch (error) {
this.logger.error(`Map ${map.mapFileName} was not saved: ${error}`);
}
}));

/**
* Filter unsaved maps
*/
savedFiles = savedFiles.filter((file) => file !== undefined);
/**
* Filter unsaved maps
*/
savedFiles = savedFiles.filter((file) => file !== undefined);

/**
* Nothing to save: maps was previously saved
*/
if (!savedFiles) {
return;
}
/**
* Nothing to save: maps was previously saved
*/
if (savedFiles.length === 0) {
return;
}

/**
* - insert new record with saved maps
* or
* - update previous record with adding new saved maps
*/
if (!existedRelease) {
this.logger.info('inserted new release');
await this.releasesCollection.insertOne({
/**
* - insert new record with saved maps
* or
* - update previous record with adding new saved maps
*/
if (!existedRelease) {
this.logger.info('inserted new release');
await this.releasesCollection.insertOne({
projectId: projectId,
release: payload.release,
files: savedFiles as SourceMapDataExtended[],
} as ReleaseDBScheme, { session });
}

await this.releasesCollection.findOneAndUpdate({
projectId: projectId,
release: payload.release,
files: savedFiles as SourceMapDataExtended[],
} as ReleaseDBScheme);
}

await this.releasesCollection.findOneAndUpdate({
projectId: projectId,
release: payload.release,
}, {
$push: {
files: {
$each: savedFiles as SourceMapDataExtended[],
}, {
$push: {
files: {
$each: savedFiles as SourceMapDataExtended[],
},
},
},
}, { session });
});
} catch (error) {
this.logger.error('Can\'t extract release info:\n', {
error,
});

throw new NonCriticalError('Can\'t parse source-map file');
} finally {
/**
* End transaction
*/
await session.endSession();
}
}

Expand Down
53 changes: 49 additions & 4 deletions workers/release/tests/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ describe('Release Worker', () => {
await db.collection('releases.files').deleteMany({});

await worker.finish();
connection.close();
await connection.close();
await mockBundle.clear();
});

Expand Down Expand Up @@ -205,7 +205,52 @@ describe('Release Worker', () => {
await expect(count).toEqual(1);
});

/**
* @todo add test for case with several source maps in a single release
*/
test('should correctly handle release with multiple source maps in a single transaction', async () => {
const map = await mockBundle.getSourceMap();

/**
* Create multiple files with the same content
*/
const numberOfFiles = 10;
const collectedData: SourcemapCollectedData[] = Array(numberOfFiles).fill(null).map((_, index) => ({
name: `main${index}.js.map`,
payload: map,
}));

await worker.handle({
projectId,
type: 'add-release',
payload: {
...releasePayload,
files: collectedData,
},
});

/**
* Check that only one release document was created
*/
const releasesCount = await collection.countDocuments({
projectId: projectId,
release: releasePayload.release,
});
await expect(releasesCount).toEqual(1);

/**
* Check that all files were saved
*/
const release = await collection.findOne({
projectId: projectId,
release: releasePayload.release,
});
await expect(release.files).toHaveLength(numberOfFiles);

/**
* Verify GridFS chunks were created for each file
*/
const releasesChunksCount = await db.collection('releases.chunks').countDocuments();
const releasesFilesCount = await db.collection('releases.files').countDocuments();

await expect(releasesChunksCount).toEqual(numberOfFiles);
await expect(releasesFilesCount).toEqual(numberOfFiles);
});
});
Loading