feat: support micro batching alternative at source#3450
feat: support micro batching alternative at source#3450vaibhavtiwari33 wants to merge 15 commits into
Conversation
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
…. Implement watermark handling Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #3450 +/- ##
==========================================
+ Coverage 82.55% 82.67% +0.12%
==========================================
Files 307 307
Lines 77618 78169 +551
==========================================
+ Hits 64077 64629 +552
+ Misses 12984 12983 -1
Partials 557 557 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
| None => vec![msg_handle], | ||
| Some(transformer) => { | ||
| match transformer | ||
| .transform_batch(vec![msg_handle], cln_token.clone(), None) |
There was a problem hiding this comment.
I haven't tested end to end tracing with this change but all similar tracing changes have been made as the normal path, so should be good. I'll test this branch as well.
One more change that needs to be made in a separate PR is message level metrics (replacing batch level metrics for source)
There was a problem hiding this comment.
You're right, I missed creating spans for transformer in this branch. Fixing this.
There was a problem hiding this comment.
Updated, ty for catching this!
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vigith Maurice <vigith@gmail.com>
Signed-off-by: Vigith Maurice <vigith@gmail.com>
vigith
left a comment
There was a problem hiding this comment.
added a WARN if streaming and readAhead are set simultaneously
| "streaming=true supersedes read_ahead=true; read_ahead is ignored in streaming mode" | ||
| ); | ||
| } | ||
|
|
There was a problem hiding this comment.
Since currently streaming is a mvtx only feature, I'll move this inside the following block:
if self.streaming {
self.streaming_source(
pipeline_labels,
mvtx_labels,
bypass_router,
messages_tx,
cln_token,
)
.await
}Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
…d bypass router are Some Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
What this PR does / why we need it
This PR aims to add an opt-in feature for monovertex to enable per message ack loop (instead of per batch acking)
concurrencyconcurrency) are distributed per message instead of per batchstreaming_sourceinsource.rswhere we're doing the same operations as before, but on a per message level.Implemented spec:
Related issues
#3452
Testing
Tracing Test:
Streaming disabled; transformer returns results

Streaming enabled: transformer returns results

Streaming disabled; transformer omits results

Streaming enabled; transformer omits results

Special notes for reviewers
This is a monovertex only feature currently since pipeline already supports reading multiple batches.