fix(sdk): count batch items before enqueue to fix flush/shutdown race#3558
Open
Wassbdr wants to merge 2 commits into
Open
fix(sdk): count batch items before enqueue to fix flush/shutdown race#3558Wassbdr wants to merge 2 commits into
Wassbdr wants to merge 2 commits into
Conversation
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #3558 +/- ##
=======================================
+ Coverage 82.9% 83.0% +0.1%
=======================================
Files 130 130
Lines 27768 27936 +168
=======================================
+ Hits 23040 23210 +170
+ Misses 4728 4726 -2 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
lalitb
reviewed
Jun 22, 2026
lalitb
approved these changes
Jun 22, 2026
lalitb
left a comment
Member
There was a problem hiding this comment.
LGTM with nit comment. Thanks.
Author
In the thread-based BatchSpanProcessor and BatchLogProcessor, on_end()/emit() sent the item into the channel first and only then incremented current_batch_size. The worker uses a snapshot of current_batch_size as the drain target for force_flush()/shutdown(). If the snapshot happened in the window between a successful try_send and the fetch_add, the worker drained nothing and returned success, so force_flush() could return before an already-enqueued item was exported and shutdown() could drop it. Increment current_batch_size before enqueueing, and revert the increment when the send fails (queue full or disconnected). The counter is now always an upper bound on the channel depth, so a flush/shutdown drain can no longer under-count items already in the queue. The post-open-telemetry#3441 drain logic (break on empty chunk, fetch_sub of the actually-exported count) already tolerates the transient over-count from an item counted but not yet enqueued, and is left unchanged. Fixes open-telemetry#3453
Addresses review feedback: 40k total spans still exercises the received+dropped==emitted invariant without adding a 400k-span stress test to the normal unit suite. The targeted race tests cover the race directly.
ee81737 to
6508c5c
Compare
Member
|
@Wassbdr can you fix the changelog conflict and this is good to merge. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Fixes #3453
Problem
In the thread-based
BatchSpanProcessorandBatchLogProcessor, the producer (on_end()/emit()) doestry_send(item)into the data channel first and only thencurrent_batch_size.fetch_add(1). The worker uses a snapshot ofcurrent_batch_size(load(Acquire)) as the drain target forforce_flush()/shutdown().If a flush/shutdown drain snapshots the counter in the window between a successful
try_sendand thefetch_add, it under-counts: the worker drains nothing (or too little) and returns success. As a result:force_flush()can return before an already-enqueued item is exported.shutdown()can miss an already-enqueued item and then exit, dropping it.Fix
Increment
current_batch_sizebefore enqueueing, and revert it (fetch_sub) when the send fails (FullorDisconnected). Producer-side only; the drain logic is untouched.This flips the counter's invariant from "may under-count the channel" (lossy) to "counter is always ≥ channel depth" (an upper bound). A flush/shutdown drain can therefore no longer under-count items already in the queue.
Why the drain logic doesn't need to change
The drain already tolerates a transient over-count (an item counted but not yet enqueued), thanks to the #3441 fix:
batch_limit = max_export_batch_size.min(target - total_exported)fetch_subonly the count actually exportedSo when the worker snapshots a
targetthat includes a not-yet-enqueued item, it exports what's in the channel, breaks on the empty chunk, and leaves that item's+1in the counter to be drained on a later cycle. No underflow, no spin. #3441's drain logic and its regression test are left unchanged.Race interleavings considered
+1survives and is drained next cycle. The item wasn't "enqueued before the flush" in any happens-before sense, so missing it is within contract.try_sendOk happens-before theshutdown()call, thefetch_add(sequenced-before the send) is visible to the worker via the mutex-backed control channel → item is both counted and in the channel → exported.fetch_addthen send fails thenfetch_sub: transient over-count of 1, handled like case 2; dropped-count accounting unchanged.Tests
batchspanprocessor_drain_handles_counted_but_not_yet_enqueued_spans— drives the drain helper with counter=2 / channel depth=1, asserts 1 exported, counter stays at 1 (no underflow), and the late item is exported on the next cycle.batchspanprocessor_on_end_reverts_count_when_queue_full/test_batch_log_processor_emit_reverts_count_when_queue_full— overflow the queue while the exporter is blocked, assert dropped items don't remain counted as pending and the counter settles to 0 after flush. (Verified these fail without the queue-full revert.)batchspanprocessor_all_spans_accounted_for— spans analog of the existing logs stress test (4 threads × 100k), assertingreceived + dropped == emitted.cargo test -p opentelemetry_sdk --all-features --lib,cargo clippy --all-targets --all-features -- -Dwarnings, andcargo fmt --all -- --checkall pass.