@@ -32,6 +32,7 @@ import {
3232 WebSocketMessageBatchData ,
3333 WebSocketMessageWithId ,
3434} from './eventstream-proxy.interfaces' ;
35+ import { LoggingAndMetricsInterceptor } from '../logging-and-metrics.interceptor' ;
3536
3637/**
3738 * Base class for a websocket gateway that listens for and proxies event stream messages.
@@ -50,13 +51,15 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
5051 private currentClient : WebSocketEx | undefined ;
5152 private subscriptionNames = new Map < string , string > ( ) ;
5253 private queue = Promise . resolve ( ) ;
54+ private mostRecentBatchTimestamp = new Date ( ) ;
5355
5456 constructor (
5557 protected readonly logger : Logger ,
5658 protected eventstream : EventStreamService ,
5759 requireAuth = false ,
60+ protected metrics : LoggingAndMetricsInterceptor ,
5861 ) {
59- super ( logger , requireAuth ) ;
62+ super ( logger , requireAuth , metrics ) ;
6063 }
6164
6265 configure ( url ?: string , topic ?: string ) {
@@ -126,6 +129,21 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
126129 }
127130
128131 private async processEvents ( batch : EventBatch ) {
132+ this . logger . log ( 'Recording batch size metric of ' + batch . events . length ) ;
133+
134+ // Record metrics
135+ this . metrics . setEventBatchSize ( batch . events . length ) ;
136+ let timestamp = new Date ( ) ;
137+ this . logger . log (
138+ 'Recording batch interval of ' +
139+ ( timestamp . getTime ( ) - this . mostRecentBatchTimestamp . getTime ( ) ) +
140+ ' milliseconds' ,
141+ ) ;
142+ this . metrics . observeBatchInterval (
143+ timestamp . getTime ( ) - this . mostRecentBatchTimestamp . getTime ( ) ,
144+ ) ;
145+ this . mostRecentBatchTimestamp = timestamp ;
146+
129147 const messages : WebSocketMessage [ ] = [ ] ;
130148 for ( const event of batch . events ) {
131149 this . logger . log ( `Proxying event: ${ JSON . stringify ( event ) } ` ) ;
0 commit comments