@@ -51,8 +51,8 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
5151 private currentClient : WebSocketEx | undefined ;
5252 private subscriptionNames = new Map < string , string > ( ) ;
5353 private queue = Promise . resolve ( ) ;
54- private mostRecentBatchTimestamp = new Date ( ) ;
55- private mostRecentACKTimestamp = new Date ( ) ;
54+ private mostRecentCompletedBatchTimestamp = new Date ( ) ;
55+ private mostRecentDispatchedBatchTimestamp = new Date ( ) ;
5656
5757 constructor (
5858 protected readonly logger : Logger ,
@@ -134,16 +134,9 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
134134
135135 // Record metrics
136136 this . metrics . setEventBatchSize ( batch . events . length ) ;
137- let timestamp = new Date ( ) ;
138- this . logger . log (
139- 'Recording batch interval of ' +
140- ( timestamp . getTime ( ) - this . mostRecentBatchTimestamp . getTime ( ) ) +
141- ' milliseconds' ,
142- ) ;
143- this . metrics . observeBatchInterval (
144- timestamp . getTime ( ) - this . mostRecentBatchTimestamp . getTime ( ) ,
145- ) ;
146- this . mostRecentBatchTimestamp = timestamp ;
137+ let batchIntervalMs = new Date ( ) . getTime ( ) - this . mostRecentCompletedBatchTimestamp . getTime ( ) ;
138+ this . logger . log ( `Recording batch interval of ${ batchIntervalMs } milliseconds` ) ;
139+ this . metrics . observeBatchInterval ( batchIntervalMs ) ;
147140
148141 const messages : WebSocketMessage [ ] = [ ] ;
149142 const eventHandlers : Promise < WebSocketMessage | undefined > [ ] = [ ] ;
@@ -184,6 +177,10 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
184177 } ;
185178 this . awaitingAck . push ( message ) ;
186179 this . currentClient ?. send ( JSON . stringify ( message ) ) ;
180+
181+ // Set the most-recent batch dispatch time to now so when the next ACK comes back from FF
182+ // we can set metrics accordingly
183+ this . mostRecentDispatchedBatchTimestamp = new Date ( ) ;
187184 }
188185
189186 private async getSubscriptionName ( ctx : Context , subId : string ) {
@@ -218,16 +215,10 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
218215 return ;
219216 }
220217
221- let timestamp = new Date ( ) ;
222- this . logger . log (
223- 'Recording batch ACK interval of ' +
224- ( timestamp . getTime ( ) - this . mostRecentACKTimestamp . getTime ( ) ) +
225- ' milliseconds' ,
226- ) ;
227- this . metrics . observeBatchAckInterval (
228- timestamp . getTime ( ) - this . mostRecentACKTimestamp . getTime ( ) ,
229- ) ;
230- this . mostRecentACKTimestamp = timestamp ;
218+ let timeWaitingForACKms =
219+ new Date ( ) . getTime ( ) - this . mostRecentDispatchedBatchTimestamp . getTime ( ) ;
220+ this . logger . log ( `Recording batch ACK interval of ${ timeWaitingForACKms } milliseconds` ) ;
221+ this . metrics . observeBatchAckInterval ( timeWaitingForACKms ) ;
231222
232223 const inflight = this . awaitingAck . find ( msg => msg . id === data . id ) ;
233224 this . logger . log ( `Received ack ${ data . id } inflight=${ ! ! inflight } ` ) ;
@@ -245,5 +236,9 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
245236 this . socket . ack ( inflight . batchNumber ) ;
246237 }
247238 }
239+
240+ // Set the most-recent batch time to now - so when the next batch comes we can calculate
241+ // time between sending our ACK to the current batch and receiving the new one
242+ this . mostRecentCompletedBatchTimestamp = new Date ( ) ;
248243 }
249244}
0 commit comments