Skip to content

Commit 5b61354

Browse files
committed
Fix possible race condition in events between server resets
This doesn't come up with portfinder (which is slow enough or waits for cleanup sufficiently that everything is done) but with get-port we can see servers come up before all events have fired for previous requests, resulting in very odd cross-test failures. This fixes that by linking a request's events to the server session, so events are never emitted after shutdown in any scenario.
1 parent 1589ca9 commit 5b61354

File tree

1 file changed

+64
-53
lines changed

1 file changed

+64
-53
lines changed

src/server/mockttp-server.ts

Lines changed: 64 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ export class MockttpServer extends AbstractMockttp implements Mockttp {
250250
this.debug = this.initialDebugSetting;
251251

252252
this.eventEmitter.removeAllListeners();
253+
this.eventEmitter = new EventEmitter();
253254
}
254255

255256
private get address() {
@@ -368,14 +369,15 @@ export class MockttpServer extends AbstractMockttp implements Mockttp {
368369
}
369370

370371
private announceBodyDataAsync(
372+
emitter: EventEmitter,
371373
type: 'request' | 'response',
372374
id: string,
373375
eventTimestamp: number,
374376
content: Uint8Array,
375377
isEnded: boolean
376378
) {
377379
setImmediate(() => {
378-
this.eventEmitter.emit(`${type}-body-data`, {
380+
emitter.emit(`${type}-body-data`, {
379381
id,
380382
content,
381383
isEnded,
@@ -384,12 +386,12 @@ export class MockttpServer extends AbstractMockttp implements Mockttp {
384386
});
385387
}
386388

387-
private announceInitialRequestAsync(request: OngoingRequest) {
388-
if (this.eventEmitter.listenerCount('request-initiated') === 0) return;
389+
private announceInitialRequestAsync(emitter: EventEmitter, request: OngoingRequest) {
390+
if (emitter.listenerCount('request-initiated') === 0) return;
389391

390392
setImmediate(() => {
391393
const initiatedReq = buildInitiatedRequest(request);
392-
this.eventEmitter.emit('request-initiated', Object.assign(
394+
emitter.emit('request-initiated', Object.assign(
393395
initiatedReq,
394396
{
395397
timingEvents: _.clone(initiatedReq.timingEvents),
@@ -399,13 +401,13 @@ export class MockttpServer extends AbstractMockttp implements Mockttp {
399401
});
400402
}
401403

402-
private announceCompletedRequestAsync(request: OngoingRequest) {
403-
if (this.eventEmitter.listenerCount('request') === 0) return;
404+
private announceCompletedRequestAsync(emitter: EventEmitter, request: OngoingRequest) {
405+
if (emitter.listenerCount('request') === 0) return;
404406

405407
waitForCompletedRequest(request)
406408
.then((completedReq: CompletedRequest) => {
407409
setImmediate(() => {
408-
this.eventEmitter.emit('request', Object.assign(
410+
emitter.emit('request', Object.assign(
409411
completedReq,
410412
{
411413
timingEvents: _.clone(completedReq.timingEvents),
@@ -417,12 +419,12 @@ export class MockttpServer extends AbstractMockttp implements Mockttp {
417419
.catch(console.error);
418420
}
419421

420-
private announceInitialResponseAsync(response: OngoingResponse) {
421-
if (this.eventEmitter.listenerCount('response-initiated') === 0) return;
422+
private announceInitialResponseAsync(emitter: EventEmitter, response: OngoingResponse) {
423+
if (emitter.listenerCount('response-initiated') === 0) return;
422424

423425
setImmediate(() => {
424426
const initiatedRes = buildInitiatedResponse(response);
425-
this.eventEmitter.emit('response-initiated', Object.assign(
427+
emitter.emit('response-initiated', Object.assign(
426428
initiatedRes,
427429
{
428430
timingEvents: _.clone(initiatedRes.timingEvents),
@@ -432,13 +434,13 @@ export class MockttpServer extends AbstractMockttp implements Mockttp {
432434
});
433435
}
434436

435-
private announceResponseAsync(response: OngoingResponse | CompletedResponse) {
436-
if (this.eventEmitter.listenerCount('response') === 0) return;
437+
private announceResponseAsync(emitter: EventEmitter, response: OngoingResponse | CompletedResponse) {
438+
if (emitter.listenerCount('response') === 0) return;
437439

438440
waitForCompletedResponse(response)
439441
.then((res: CompletedResponse) => {
440442
setImmediate(() => {
441-
this.eventEmitter.emit('response', Object.assign(res, {
443+
emitter.emit('response', Object.assign(res, {
442444
timingEvents: _.clone(res.timingEvents),
443445
tags: _.clone(res.tags)
444446
}));
@@ -447,13 +449,13 @@ export class MockttpServer extends AbstractMockttp implements Mockttp {
447449
.catch(console.error);
448450
}
449451

450-
private announceWebSocketRequestAsync(request: OngoingRequest) {
451-
if (this.eventEmitter.listenerCount('websocket-request') === 0) return;
452+
private announceWebSocketRequestAsync(emitter: EventEmitter, request: OngoingRequest) {
453+
if (emitter.listenerCount('websocket-request') === 0) return;
452454

453455
waitForCompletedRequest(request)
454456
.then((completedReq: CompletedRequest) => {
455457
setImmediate(() => {
456-
this.eventEmitter.emit('websocket-request', Object.assign(completedReq, {
458+
emitter.emit('websocket-request', Object.assign(completedReq, {
457459
timingEvents: _.clone(completedReq.timingEvents),
458460
tags: _.clone(completedReq.tags)
459461
}));
@@ -462,11 +464,11 @@ export class MockttpServer extends AbstractMockttp implements Mockttp {
462464
.catch(console.error);
463465
}
464466

465-
private announceWebSocketUpgradeAsync(response: CompletedResponse) {
466-
if (this.eventEmitter.listenerCount('websocket-accepted') === 0) return;
467+
private announceWebSocketUpgradeAsync(emitter: EventEmitter, response: CompletedResponse) {
468+
if (emitter.listenerCount('websocket-accepted') === 0) return;
467469

468470
setImmediate(() => {
469-
this.eventEmitter.emit('websocket-accepted', {
471+
emitter.emit('websocket-accepted', {
470472
...response,
471473
timingEvents: _.clone(response.timingEvents),
472474
tags: _.clone(response.tags)
@@ -475,16 +477,17 @@ export class MockttpServer extends AbstractMockttp implements Mockttp {
475477
}
476478

477479
private announceWebSocketMessageAsync(
480+
emitter: EventEmitter,
478481
request: OngoingRequest,
479482
direction: 'sent' | 'received',
480483
content: Buffer,
481484
isBinary: boolean
482485
) {
483486
const eventName = `websocket-message-${direction}`;
484-
if (this.eventEmitter.listenerCount(eventName) === 0) return;
487+
if (emitter.listenerCount(eventName) === 0) return;
485488

486489
setImmediate(() => {
487-
this.eventEmitter.emit(eventName, {
490+
emitter.emit(eventName, {
488491
streamId: request.id,
489492

490493
direction,
@@ -499,14 +502,15 @@ export class MockttpServer extends AbstractMockttp implements Mockttp {
499502
}
500503

501504
private announceWebSocketCloseAsync(
505+
emitter: EventEmitter,
502506
request: OngoingRequest,
503507
closeCode: number | undefined,
504508
closeReason?: string
505509
) {
506-
if (this.eventEmitter.listenerCount('websocket-close') === 0) return;
510+
if (emitter.listenerCount('websocket-close') === 0) return;
507511

508512
setImmediate(() => {
509-
this.eventEmitter.emit('websocket-close', {
513+
emitter.emit('websocket-close', {
510514
streamId: request.id,
511515

512516
closeCode,
@@ -519,7 +523,7 @@ export class MockttpServer extends AbstractMockttp implements Mockttp {
519523
}
520524

521525
// Hook the request and socket to announce all WebSocket events after the initial request:
522-
private trackWebSocketEvents(request: OngoingRequest, socket: net.Socket) {
526+
private trackWebSocketEvents(emitter: EventEmitter, request: OngoingRequest, socket: net.Socket) {
523527
const originalWrite = socket._write;
524528
const originalWriteV = socket._writev;
525529

@@ -540,11 +544,11 @@ export class MockttpServer extends AbstractMockttp implements Mockttp {
540544
request.timingEvents.responseSentTimestamp = now();
541545

542546
const httpResponse = parseRawHttpResponse(data, request);
543-
this.announceResponseAsync(httpResponse);
547+
this.announceResponseAsync(emitter, httpResponse);
544548
} else {
545549
// Connect closed during upgrade, before we responded:
546550
request.timingEvents.abortedTimestamp = now();
547-
this.announceAbortAsync(request);
551+
this.announceAbortAsync(emitter, request);
548552
}
549553
});
550554

@@ -558,10 +562,10 @@ export class MockttpServer extends AbstractMockttp implements Mockttp {
558562
request.timingEvents.wsAcceptedTimestamp = now();
559563

560564
const httpResponse = parseRawHttpResponse(data, request);
561-
this.announceWebSocketUpgradeAsync(httpResponse);
565+
this.announceWebSocketUpgradeAsync(emitter, httpResponse);
562566

563567
ws.on('message', (data: Buffer, isBinary) => {
564-
this.announceWebSocketMessageAsync(request, 'received', data, isBinary);
568+
this.announceWebSocketMessageAsync(emitter, request, 'received', data, isBinary);
565569
});
566570

567571
// Wrap ws.send() to report all sent data:
@@ -572,18 +576,19 @@ export class MockttpServer extends AbstractMockttp implements Mockttp {
572576
?? typeof data !== 'string';
573577

574578
_send.apply(this, arguments as any);
575-
self.announceWebSocketMessageAsync(request, 'sent', asBuffer(data), isBinary);
579+
self.announceWebSocketMessageAsync(emitter, request, 'sent', asBuffer(data), isBinary);
576580
};
577581

578582
ws.on('close', (closeCode, closeReason) => {
579583
if (closeCode === 1006) {
580584
// Not a clean close!
581585
request.timingEvents.abortedTimestamp = now();
582-
this.announceAbortAsync(request);
586+
this.announceAbortAsync(emitter, request);
583587
} else {
584588
request.timingEvents.wsClosedTimestamp = now();
585589

586590
this.announceWebSocketCloseAsync(
591+
emitter,
587592
request,
588593
closeCode === 1005
589594
? undefined // Clean close, but with a close frame with no status
@@ -595,10 +600,10 @@ export class MockttpServer extends AbstractMockttp implements Mockttp {
595600
});
596601
}
597602

598-
private async announceAbortAsync(request: OngoingRequest, abortError?: ErrorLike) {
603+
private async announceAbortAsync(emitter: EventEmitter, request: OngoingRequest, abortError?: ErrorLike) {
599604
setImmediate(() => {
600605
const req = buildInitiatedRequest(request);
601-
this.eventEmitter.emit('abort', Object.assign(req, {
606+
emitter.emit('abort', Object.assign(req, {
602607
timingEvents: _.clone(req.timingEvents),
603608
tags: _.clone(req.tags),
604609
error: abortError ? {
@@ -635,9 +640,9 @@ export class MockttpServer extends AbstractMockttp implements Mockttp {
635640
});
636641
}
637642

638-
private async announceRuleEventAsync(requestId: string, ruleId: string, eventType: string, eventData: unknown) {
643+
private async announceRuleEventAsync(emitter: EventEmitter, requestId: string, ruleId: string, eventType: string, eventData: unknown) {
639644
setImmediate(() => {
640-
this.eventEmitter.emit('rule-event', {
645+
emitter.emit('rule-event', {
641646
requestId,
642647
ruleId,
643648
eventType,
@@ -646,14 +651,14 @@ export class MockttpServer extends AbstractMockttp implements Mockttp {
646651
});
647652
}
648653

649-
private preprocessRequest(req: ExtendedRawRequest, type: 'request' | 'websocket'): OngoingRequest | null {
654+
private preprocessRequest(req: ExtendedRawRequest, type: 'request' | 'websocket', emitter: EventEmitter): OngoingRequest | null {
650655
try {
651656
return preprocessRequest(req, {
652657
type,
653658
maxBodySize: this.maxBodySize,
654659
serverPort: this.port,
655-
onBodyData: this.eventEmitter.listenerCount('request-body-data') > 0
656-
? this.announceBodyDataAsync.bind(this, 'request')
660+
onBodyData: emitter.listenerCount('request-body-data') > 0
661+
? this.announceBodyDataAsync.bind(this, emitter, 'request')
657662
: undefined
658663

659664
})
@@ -677,7 +682,11 @@ export class MockttpServer extends AbstractMockttp implements Mockttp {
677682
}
678683

679684
private async handleRequest(rawRequest: ExtendedRawRequest, rawResponse: http.ServerResponse) {
680-
const request = this.preprocessRequest(rawRequest, 'request');
685+
// Capture the event emitter for this request's lifecycle to avoid races where events
686+
// fire on servers after they're closed/reset and the emitter has been changed.
687+
const requestEmitter = this.eventEmitter;
688+
689+
const request = this.preprocessRequest(rawRequest, 'request', requestEmitter);
681690
if (request === null) return; // Preprocessing failed - don't handle this
682691

683692
if (this.debug) console.log(`Handling request for ${rawRequest.url}`);
@@ -687,7 +696,7 @@ export class MockttpServer extends AbstractMockttp implements Mockttp {
687696
if (result === null) {
688697
result = 'aborted';
689698
request.timingEvents.abortedTimestamp = now();
690-
this.announceAbortAsync(request, error);
699+
this.announceAbortAsync(requestEmitter, request, error);
691700
}
692701
}
693702
request.once('aborted', abort);
@@ -696,21 +705,21 @@ export class MockttpServer extends AbstractMockttp implements Mockttp {
696705
rawResponse.once('close', () => setImmediate(abort));
697706
request.once('error', (error) => setImmediate(() => abort(error)));
698707

699-
this.announceInitialRequestAsync(request);
708+
this.announceInitialRequestAsync(requestEmitter, request);
700709

701710
const response = trackResponse(
702711
rawResponse,
703712
request.timingEvents,
704713
request.tags,
705714
{
706715
maxSize: this.maxBodySize,
707-
onWriteHead: () => this.announceInitialResponseAsync(response),
708-
onBodyData: this.eventEmitter.listenerCount('response-body-data') > 0
709-
? this.announceBodyDataAsync.bind(this, 'response')
716+
onWriteHead: () => this.announceInitialResponseAsync(requestEmitter, response),
717+
onBodyData: requestEmitter.listenerCount('response-body-data') > 0
718+
? this.announceBodyDataAsync.bind(this, requestEmitter, 'response')
710719
: undefined
711720
}
712721
);
713-
const hasResponseListener = this.eventEmitter.listenerCount('response') > 0;
722+
const hasResponseListener = requestEmitter.listenerCount('response') > 0;
714723
if (hasResponseListener) {
715724
// Start buffering response body if there's somebody who
716725
// might want to hear about it later
@@ -732,7 +741,7 @@ export class MockttpServer extends AbstractMockttp implements Mockttp {
732741
.catch(() => undefined)
733742
.then((ruleId) => {
734743
request.matchedRuleId = ruleId;
735-
this.announceCompletedRequestAsync(request);
744+
this.announceCompletedRequestAsync(requestEmitter, request);
736745
});
737746

738747
let nextRule = await nextRulePromise;
@@ -742,8 +751,8 @@ export class MockttpServer extends AbstractMockttp implements Mockttp {
742751
record: this.recordTraffic,
743752
debug: this.debug,
744753
keyLogStream: this.keyLogStream,
745-
emitEventCallback: (this.eventEmitter.listenerCount('rule-event') !== 0)
746-
? (type, event) => this.announceRuleEventAsync(request.id, nextRule!.id, type, event)
754+
emitEventCallback: (requestEmitter.listenerCount('rule-event') !== 0)
755+
? (type, event) => this.announceRuleEventAsync(requestEmitter, request.id, nextRule!.id, type, event)
747756
: undefined
748757
});
749758
} else {
@@ -788,12 +797,14 @@ export class MockttpServer extends AbstractMockttp implements Mockttp {
788797
}
789798

790799
if (result === 'responded' && hasResponseListener) {
791-
this.announceResponseAsync(response);
800+
this.announceResponseAsync(requestEmitter, response);
792801
}
793802
}
794803

795804
private async handleWebSocket(rawRequest: ExtendedRawRequest, socket: net.Socket, head: Buffer) {
796-
const request = this.preprocessRequest(rawRequest, 'websocket');
805+
const requestEmitter = this.eventEmitter;
806+
807+
const request = this.preprocessRequest(rawRequest, 'websocket', requestEmitter);
797808
if (request === null) return; // Preprocessing failed - don't handle this
798809

799810
if (this.debug) console.log(`Handling websocket for ${rawRequest.url}`);
@@ -812,10 +823,10 @@ export class MockttpServer extends AbstractMockttp implements Mockttp {
812823
.catch(() => undefined)
813824
.then((ruleId) => {
814825
request.matchedRuleId = ruleId;
815-
this.announceWebSocketRequestAsync(request);
826+
this.announceWebSocketRequestAsync(requestEmitter, request);
816827
});
817828

818-
this.trackWebSocketEvents(request, socket);
829+
this.trackWebSocketEvents(requestEmitter, request, socket);
819830

820831
let nextRule = await nextRulePromise;
821832
if (nextRule) {
@@ -824,8 +835,8 @@ export class MockttpServer extends AbstractMockttp implements Mockttp {
824835
record: this.recordTraffic,
825836
debug: this.debug,
826837
keyLogStream: this.keyLogStream,
827-
emitEventCallback: (this.eventEmitter.listenerCount('rule-event') !== 0)
828-
? (type, event) => this.announceRuleEventAsync(request.id, nextRule!.id, type, event)
838+
emitEventCallback: (requestEmitter.listenerCount('rule-event') !== 0)
839+
? (type, event) => this.announceRuleEventAsync(requestEmitter, request.id, nextRule!.id, type, event)
829840
: undefined
830841
});
831842
} else {

0 commit comments

Comments
 (0)