@@ -9,7 +9,7 @@ import { Worker } from '../../../lib/worker';
99import { DatabaseReadWriteError , NonCriticalError } from '../../../lib/workerErrors' ;
1010import * as pkg from '../package.json' ;
1111import { ReleaseWorkerTask , ReleaseWorkerAddReleasePayload , CommitDataUnparsed } from '../types' ;
12- import { Collection } from 'mongodb' ;
12+ import { Collection , MongoClient } from 'mongodb' ;
1313import { SourceMapDataExtended , SourceMapFileChunk , CommitData , SourcemapCollectedData , ReleaseDBScheme } from '@hawk.so/types' ;
1414/**
1515 * Worker to save releases
@@ -36,10 +36,16 @@ export default class ReleaseWorker extends Worker {
3636 */
3737 private releasesCollection : Collection < ReleaseDBScheme > ;
3838
39+ /**
40+ * Mongo client for events database, used for transactions
41+ */
42+ private client : MongoClient = new MongoClient ( process . env . MONGO_EVENTS_DATABASE_URI ) ;
43+
3944 /**
4045 * Start consuming messages
4146 */
4247 public async start ( ) : Promise < void > {
48+ await this . client . connect ( ) ;
4349 await this . db . connect ( ) ;
4450 this . db . createGridFsBucket ( this . dbCollectionName ) ;
4551 this . releasesCollection = this . db . getConnection ( ) . collection ( this . dbCollectionName ) ;
@@ -52,6 +58,7 @@ export default class ReleaseWorker extends Worker {
5258 public async finish ( ) : Promise < void > {
5359 await super . finish ( ) ;
5460 await this . db . close ( ) ;
61+ await this . client . close ( ) ;
5562 }
5663
5764 /**
@@ -135,87 +142,102 @@ export default class ReleaseWorker extends Worker {
135142 * @param payload - source map data
136143 */
137144 private async saveSourceMap ( projectId : string , payload : ReleaseWorkerAddReleasePayload ) : Promise < void > {
145+ /**
146+ * Start transaction to avoid race condition
147+ */
148+ const session = await this . client . startSession ( ) ;
149+
138150 try {
139151 const files : SourceMapDataExtended [ ] = this . extendReleaseInfo ( payload . files ) ;
140152
141- const existedRelease = await this . releasesCollection . findOne ( {
142- projectId : projectId ,
143- release : payload . release ,
144- } ) ;
145-
146153 /**
147- * Iterate all maps of the new release and save only new
154+ * Use same transaction for read and related write operations
148155 */
149- let savedFiles = await Promise . all ( files . map ( async ( map : SourceMapDataExtended ) => {
156+ await session . withTransaction ( async ( ) => {
157+ const existedRelease = await this . releasesCollection . findOne ( {
158+ projectId : projectId ,
159+ release : payload . release ,
160+ } , { session } ) ;
161+
150162 /**
151- * Skip already saved maps
163+ * Iterate all maps of the new release and save only new
152164 */
165+ let savedFiles = await Promise . all ( files . map ( async ( map : SourceMapDataExtended ) => {
166+ /**
167+ * Skip already saved maps
168+ */
153169
154- const alreadySaved = existedRelease && existedRelease . files && existedRelease . files . find ( ( savedFile ) => {
155- return savedFile . mapFileName === map . mapFileName ;
156- } ) ;
170+ const alreadySaved = existedRelease && existedRelease . files && existedRelease . files . find ( ( savedFile ) => {
171+ return savedFile . mapFileName === map . mapFileName ;
172+ } ) ;
157173
158- if ( alreadySaved ) {
159- return ;
160- }
174+ if ( alreadySaved ) {
175+ return ;
176+ }
161177
162- try {
163- const fileInfo = await this . saveFile ( map ) ;
178+ try {
179+ const fileInfo = await this . saveFile ( map ) ;
164180
165- /**
166- * Remove 'content' and save id of saved file instead
167- */
168- map . _id = fileInfo . _id ;
169- delete map . content ;
181+ /**
182+ * Remove 'content' and save id of saved file instead
183+ */
184+ map . _id = fileInfo . _id ;
185+ delete map . content ;
170186
171- return map ;
172- } catch ( error ) {
173- this . logger . error ( `Map ${ map . mapFileName } was not saved: ${ error } ` ) ;
174- }
175- } ) ) ;
187+ return map ;
188+ } catch ( error ) {
189+ this . logger . error ( `Map ${ map . mapFileName } was not saved: ${ error } ` ) ;
190+ }
191+ } ) ) ;
176192
177- /**
178- * Filter unsaved maps
179- */
180- savedFiles = savedFiles . filter ( ( file ) => file !== undefined ) ;
193+ /**
194+ * Filter unsaved maps
195+ */
196+ savedFiles = savedFiles . filter ( ( file ) => file !== undefined ) ;
181197
182- /**
183- * Nothing to save: maps was previously saved
184- */
185- if ( ! savedFiles ) {
186- return ;
187- }
198+ /**
199+ * Nothing to save: maps was previously saved
200+ */
201+ if ( savedFiles . length === 0 ) {
202+ return ;
203+ }
188204
189- /**
190- * - insert new record with saved maps
191- * or
192- * - update previous record with adding new saved maps
193- */
194- if ( ! existedRelease ) {
195- this . logger . info ( 'inserted new release' ) ;
196- await this . releasesCollection . insertOne ( {
205+ /**
206+ * - insert new record with saved maps
207+ * or
208+ * - update previous record with adding new saved maps
209+ */
210+ if ( ! existedRelease ) {
211+ this . logger . info ( 'inserted new release' ) ;
212+ await this . releasesCollection . insertOne ( {
213+ projectId : projectId ,
214+ release : payload . release ,
215+ files : savedFiles as SourceMapDataExtended [ ] ,
216+ } as ReleaseDBScheme , { session } ) ;
217+ }
218+
219+ await this . releasesCollection . findOneAndUpdate ( {
197220 projectId : projectId ,
198221 release : payload . release ,
199- files : savedFiles as SourceMapDataExtended [ ] ,
200- } as ReleaseDBScheme ) ;
201- }
202-
203- await this . releasesCollection . findOneAndUpdate ( {
204- projectId : projectId ,
205- release : payload . release ,
206- } , {
207- $push : {
208- files : {
209- $each : savedFiles as SourceMapDataExtended [ ] ,
222+ } , {
223+ $push : {
224+ files : {
225+ $each : savedFiles as SourceMapDataExtended [ ] ,
226+ } ,
210227 } ,
211- } ,
228+ } , { session } ) ;
212229 } ) ;
213230 } catch ( error ) {
214231 this . logger . error ( 'Can\'t extract release info:\n' , {
215232 error,
216233 } ) ;
217234
218235 throw new NonCriticalError ( 'Can\'t parse source-map file' ) ;
236+ } finally {
237+ /**
238+ * End transaction
239+ */
240+ await session . endSession ( ) ;
219241 }
220242 }
221243
0 commit comments