Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions migrations/20250917000000-create-release-project-id-index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
const indexName = 'projectId_release_unique_idx';
const collectionName = 'releases';

module.exports = {
async up(db) {
const pairs = await db.collection(collectionName).aggregate([
{
$group: {
_id: { projectId: '$projectId', release: '$release' },
count: { $sum: 1 },
},
},
{ $project: { _id: 0, projectId: '$_id.projectId', release: '$_id.release', count: 1 } },
]).toArray();

console.log(`Found ${pairs.length} unique (projectId, release) pairs to process.`);

let processed = 0;

for (const { projectId, release, count } of pairs) {
processed += 1;
console.log(`[${processed}/${pairs.length}] Processing projectId=${projectId}, release=${release} (docs: ${count})`);

try {
const docs = await db.collection(collectionName)
.find({ projectId, release }, { projection: { files: 1, commits: 1 } })
.toArray();

const filesByName = new Map();
const commitsByHash = new Map();

for (const doc of docs) {
if (Array.isArray(doc.files)) {
for (const file of doc.files) {
/**
* Keep first occurrence if duplicates conflict
*/
if (file && typeof file === 'object' && file.mapFileName && !filesByName.has(file.mapFileName)) {
filesByName.set(file.mapFileName, file);
}
}
}
if (Array.isArray(doc.commits)) {
for (const commit of doc.commits) {
if (commit && typeof commit === 'object' && commit.hash && !commitsByHash.has(commit.hash)) {
commitsByHash.set(commit.hash, commit);
}
}
}
}

const mergedFiles = Array.from(filesByName.values());
const mergedCommits = Array.from(commitsByHash.values());

/**
* Replace all docs for this pair with a single consolidated doc
*/
const ops = [
{ deleteMany: { filter: { projectId, release } } },
{ insertOne: { document: { projectId, release, files: mergedFiles, commits: mergedCommits } } },
];

await db.collection(collectionName).bulkWrite(ops, { ordered: true });
console.log(`Consolidated projectId=${projectId}, release=${release}: files=${mergedFiles.length}, commits=${mergedCommits.length}`);
} catch (err) {
console.error(`Error consolidating projectId=${projectId}, release=${release}:`, err);
}
}

/**
* Create the unique compound index
*/
try {
const hasIndex = await db.collection(collectionName).indexExists(indexName);
if (!hasIndex) {
await db.collection(collectionName).createIndex(
{ projectId: 1, release: 1 },
{ name: indexName, unique: true, background: true }
);
console.log(`Index ${indexName} created on ${collectionName} (projectId, release unique).`);
} else {
console.log(`Index ${indexName} already exists on ${collectionName}.`);
}
} catch (err) {
console.error(`Error creating index ${indexName} on ${collectionName}:`, err);
}

},

async down(db) {
console.log(`Dropping index ${indexName} from ${collectionName}...`);
try {
const hasIndex = await db.collection(collectionName).indexExists(indexName);
if (hasIndex) {
await db.collection(collectionName).dropIndex(indexName);
console.log(`Index ${indexName} dropped from ${collectionName}.`);
} else {
console.log(`Index ${indexName} does not exist on ${collectionName}, skipping drop.`);
}
} catch (err) {
console.error(`Error dropping index ${indexName} from ${collectionName}:`, err);
}
console.log('Down migration completed (data changes are not reverted).');
},
};
158 changes: 82 additions & 76 deletions workers/release/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,15 @@ import { Worker } from '../../../lib/worker';
import { DatabaseReadWriteError, NonCriticalError } from '../../../lib/workerErrors';
import * as pkg from '../package.json';
import { ReleaseWorkerTask, ReleaseWorkerAddReleasePayload, CommitDataUnparsed } from '../types';
import { Collection, MongoClient } from 'mongodb';
import { Collection, MongoClient, MongoError } from 'mongodb';
import { SourceMapDataExtended, SourceMapFileChunk, CommitData, SourcemapCollectedData, ReleaseDBScheme } from '@hawk.so/types';

/**
* Error code of MongoDB key duplication error
*/
/* eslint-disable @typescript-eslint/no-magic-numbers */
const DB_DUPLICATE_KEY_ERROR = '11000';

/**
* Worker to save releases
*/
Expand Down Expand Up @@ -142,105 +149,104 @@ export default class ReleaseWorker extends Worker {
* @param payload - source map data
*/
private async saveSourceMap(projectId: string, payload: ReleaseWorkerAddReleasePayload): Promise<void> {
const files: SourceMapDataExtended[] = this.extendReleaseInfo(payload.files);

/**
* Start transaction to avoid race condition
* Use same transaction for read and related write operations
*/
const session = await this.client.startSession();

try {
const files: SourceMapDataExtended[] = this.extendReleaseInfo(payload.files);
const existedRelease = await this.releasesCollection.findOne({
projectId: projectId,
release: payload.release,
});

/**
* Iterate all maps of the new release and save only new
*/
const savedFiles = await Promise.all(files.map(async (map: SourceMapDataExtended) => {
/**
* Use same transaction for read and related write operations
* Skip already saved maps
*/
await session.withTransaction(async () => {
const existedRelease = await this.releasesCollection.findOne({
projectId: projectId,
release: payload.release,
}, { session });

/**
* Iterate all maps of the new release and save only new
*/
const savedFiles = await Promise.all(files.map(async (map: SourceMapDataExtended) => {
/**
* Skip already saved maps
*/

const alreadySaved = existedRelease && existedRelease.files && existedRelease.files.find((savedFile) => {
return savedFile.mapFileName === map.mapFileName;
});
const alreadySaved = existedRelease && existedRelease.files && !!existedRelease.files.find((savedFile) => {
return savedFile.mapFileName === map.mapFileName;
});

if (alreadySaved) {
return;
}
if (alreadySaved) {
return;
}

try {
const fileInfo = await this.saveFile(map);

/**
* Save id of saved file instead
*/
return {
...map,
_id: fileInfo._id,
};
} catch (error) {
this.logger.error(`Map ${map.mapFileName} was not saved: ${error}`);
}
}));
try {
const fileInfo = await this.saveFile(map);

/**
* Filter undefined files and then prepare files that would be saved to releases table
* we do not need their content since it would be stored in gridFS
* Save id of saved file instead
*/
const savedFilesWithoutContent: Omit<SourceMapDataExtended, 'content'>[] = savedFiles.filter(file => {
return file !== undefined;
}).map(({ content, ...rest }) => {
return rest;
});
return {
...map,
_id: fileInfo._id,
};
} catch (error) {
this.logger.error(`Map ${map.mapFileName} was not saved: ${error}`);
}
}));

/**
* Nothing to save: maps was previously saved
*/
if (savedFilesWithoutContent.length === 0) {
return;
}
/**
* Filter undefined files and then prepare files that would be saved to releases table
* we do not need their content since it would be stored in gridFS
*/
const savedFilesWithoutContent: Omit<SourceMapDataExtended, 'content'>[] = savedFiles.filter(file => {
return file !== undefined;
}).map(({ content, ...rest }) => {
return rest;
});

/**
* - insert new record with saved maps
* or
* - update previous record with adding new saved maps
*/
if (!existedRelease) {
this.logger.info('inserted new release');
/**
* Nothing to save: maps was previously saved
*/
if (savedFilesWithoutContent.length === 0) {
return;
}

try {
/**
* - insert new record with saved maps
* or
* - update previous record with adding new saved maps
*/
if (!existedRelease) {
this.logger.info('trying insert new release');

try {
await this.releasesCollection.insertOne({
projectId: projectId,
release: payload.release,
files: savedFilesWithoutContent,
} as ReleaseDBScheme, { session });
} as ReleaseDBScheme);
this.logger.info('inserted new release');
} catch (err) {
if ((err as MongoError).code.toString() === DB_DUPLICATE_KEY_ERROR) {
this.logger.warn(`Duplicate key on insert, retrying update after small delay`);
/* eslint-disable @typescript-eslint/no-magic-numbers */
await new Promise(resolve => setTimeout(resolve, 200));
} else {
throw err;
}
}
}

await this.releasesCollection.findOneAndUpdate({
projectId: projectId,
release: payload.release,
}, {
$push: {
files: {
$each: savedFilesWithoutContent,
},
await this.releasesCollection.findOneAndUpdate({
projectId: projectId,
release: payload.release,
}, {
$push: {
files: {
$each: savedFilesWithoutContent,
},
}, { session });
},
});
} catch (error) {
this.logger.error(`Can't extract release info:\n${JSON.stringify(error)}`);

throw new NonCriticalError('Can\'t parse source-map file');
} finally {
/**
* End transaction
*/
await session.endSession();
}
}

Expand Down
Loading