-
Notifications
You must be signed in to change notification settings - Fork 22
Add REST call and event batch size metrics #131
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
67a5032
5e24cb7
e58489d
212748a
6d12b63
f85b2d2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -25,6 +25,7 @@ import { | |||
| WebSocketEx, | ||||
| WebSocketMessage, | ||||
| } from '../websocket-events/websocket-events.base'; | ||||
| import { LoggingAndMetricsInterceptor } from '../logging-and-metrics.interceptor'; | ||||
| import { | ||||
| AckMessageData, | ||||
| ConnectionListener, | ||||
|
|
@@ -50,13 +51,16 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { | |||
| private currentClient: WebSocketEx | undefined; | ||||
| private subscriptionNames = new Map<string, string>(); | ||||
| private queue = Promise.resolve(); | ||||
| private mostRecentCompletedBatchTimestamp = new Date(); | ||||
| private mostRecentDispatchedBatchTimestamp = new Date(); | ||||
|
|
||||
| constructor( | ||||
| protected readonly logger: Logger, | ||||
| protected eventstream: EventStreamService, | ||||
| requireAuth = false, | ||||
| protected metrics: LoggingAndMetricsInterceptor, | ||||
| ) { | ||||
| super(logger, requireAuth); | ||||
| super(logger, requireAuth, metrics); | ||||
| } | ||||
|
|
||||
| configure(url?: string, topic?: string) { | ||||
|
|
@@ -126,6 +130,14 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { | |||
| } | ||||
|
|
||||
| private async processEvents(batch: EventBatch) { | ||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was reading the code of this function, and it was unclear to me how the batch optimization is able to be used by FireFly. It appears like we might be expecting individual acks from each event in the batch. Which I think in FireFly Core would mean an expensive DB commit for each event. The current architecture is exploiting parallelism on the websocket, by dispatching these in parallel. So it might be that in the Core engine, we process them in parallel and pass them to an aggregator thread that does its own batching. That would be an alternative solution to efficient processing. However, that seems significantly more complex than simply propagating the batch as a single contained set that is pre-optimized for processing by Core. I understand the focus of this PR is metrics, so this is not a blocker to this PR being closed, but if one of the goals is to use metrics to analyze the efficiency of the interface between tokens and FireFly Core, then I think there's a related task to do some code analysis and ensure:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed. The metrics in the current PR helped to identify that there is potentially an issue in the way TC enriches events. As you say, it's probably for a separate PR to address any improvements in that regard but if we think any other metrics would be useful we can add them to this one.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we not expecting a single ACK from FF core per batch, rather than one per event? Today we have: If I'm understanding correctly, Then we have the following for handling ACKs: where What I agree with is that I don't think FF core is treating that batch as a single DB commit. It appears to be doing (at least) one commit per event in the batch.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct, the connector currently propagates exactly the batching from the underlying blockchain connector. It does not batch or unbatch anything itself. So whatever events are received in a batch from evmconnect, those events will be parsed and converted into a new batch of one or more events to be passed back to FireFly. There's an easy optimization to be had here, where we could build an array of promises and wait on them all with
If there's a request for the token connector to do any intelligent batching of its own (on top of what is done by the blockchain connector), that would definitely be a larger change. The handling in FireFly core does result in a separate database transaction for each message in the batch. This is because 1) the token plugin has knowledge of the "fftokens" interface and how different types are spelled, but does not have knowledge of databases, and 2) the events manager has knowledge of databases, but not of the internals of the "fftokens" interface.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I built a noddy version of your suggestion in https://github.com/kaleido-io/firefly-tokens-erc20-erc721/tree/async-enrichment which showed some very noticeable improvements in rate of batch delivery to FF core. Running some tests with that branch at least moved me on to trying to understand where other event-delivery bottlenecks in the FF stack are.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So to @peterbroadhurst 's suggestions:
Not true today, but should be an easy enhancement.
Not true today, and may be a significant change in FireFly core.
This is true today.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes agreed. |
||||
| this.logger.log(`Dispatching batch number=${batch.batchNumber} size=${batch.events.length}`); | ||||
|
|
||||
| // Record metrics | ||||
| this.metrics.setEventBatchSize(batch.events.length); | ||||
| const batchIntervalMs = new Date().getTime() - this.mostRecentCompletedBatchTimestamp.getTime(); | ||||
| this.logger.log(`Recording batch interval of ${batchIntervalMs} milliseconds`); | ||||
| this.metrics.observeBatchInterval(batchIntervalMs); | ||||
|
|
||||
| const messages: WebSocketMessage[] = []; | ||||
| const eventHandlers: Promise<WebSocketMessage | undefined>[] = []; | ||||
| for (const event of batch.events) { | ||||
|
|
@@ -165,6 +177,10 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { | |||
| }; | ||||
| this.awaitingAck.push(message); | ||||
| this.currentClient?.send(JSON.stringify(message)); | ||||
|
|
||||
| // Set the most-recent batch dispatch time to now so when the next ACK comes back from FF | ||||
| // we can set metrics accordingly | ||||
| this.mostRecentDispatchedBatchTimestamp = new Date(); | ||||
| } | ||||
|
|
||||
| private async getSubscriptionName(ctx: Context, subId: string) { | ||||
|
|
@@ -199,6 +215,11 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { | |||
| return; | ||||
| } | ||||
|
|
||||
| const timeWaitingForACKms = | ||||
| new Date().getTime() - this.mostRecentDispatchedBatchTimestamp.getTime(); | ||||
| this.logger.log(`Recording batch ACK interval of ${timeWaitingForACKms} milliseconds`); | ||||
| this.metrics.observeBatchAckInterval(timeWaitingForACKms); | ||||
|
|
||||
| const inflight = this.awaitingAck.find(msg => msg.id === data.id); | ||||
| this.logger.log(`Received ack ${data.id} inflight=${!!inflight}`); | ||||
| if (this.socket !== undefined && inflight !== undefined) { | ||||
|
|
@@ -215,5 +236,9 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { | |||
| this.socket.ack(inflight.batchNumber); | ||||
| } | ||||
| } | ||||
|
|
||||
| // Set the most-recent batch time to now - so when the next batch comes we can calculate | ||||
| // time between sending our ACK to the current batch and receiving the new one | ||||
| this.mostRecentCompletedBatchTimestamp = new Date(); | ||||
| } | ||||
| } | ||||
Uh oh!
There was an error while loading. Please reload this page.