Skip to content

Commit af1bc29

Browse files
committed
fix: respect timeout_millis in BatchProcessor.force_flush
Fixes #4568.
1 parent 45f8937 commit af1bc29

File tree

3 files changed

+16
-25
lines changed

3 files changed

+16
-25
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1212

1313
## Unreleased
1414

15+
- `opentelemetry-sdk`: fix `BatchProcessor.force_flush` to respect `timeout_millis`, previously the timeout was ignored and the flush would block until all telemetry was exported
16+
([#4982](https://github.com/open-telemetry/opentelemetry-python/pull/4982))
1517
- `opentelemetry-sdk`: Add file configuration support with YAML/JSON loading, environment variable substitution, and schema validation against the vendored OTel config JSON schema
1618
([#4898](https://github.com/open-telemetry/opentelemetry-python/pull/4898))
1719
- Fix intermittent CI failures in `getting-started` and `tracecontext` jobs caused by GitHub git CDN SHA propagation lag by installing contrib packages from the already-checked-out local copy instead of a second git clone

opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import enum
1919
import inspect
2020
import logging
21+
import math
2122
import os
2223
import threading
2324
import time
@@ -172,18 +173,15 @@ def worker(self):
172173
def _export(
173174
self,
174175
batch_strategy: BatchExportStrategy,
175-
flush_should_end: Optional[float] = None,
176+
deadline: float = math.inf,
176177
) -> bool:
177-
# Returns True if all batches were exported, False if flush_should_end was reached.
178+
# Returns True if all batches were exported, False if deadline was reached.
178179
with self._export_lock:
179180
iteration = 0
180181
# We could see concurrent export calls from worker and force_flush. We call _should_export_batch
181182
# once the lock is obtained to see if we still need to make the requested export.
182183
while self._should_export_batch(batch_strategy, iteration):
183-
if (
184-
flush_should_end is not None
185-
and time.time() >= flush_should_end
186-
):
184+
if time.time() >= deadline:
187185
return False
188186
iteration += 1
189187
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
@@ -250,10 +248,10 @@ def shutdown(self, timeout_millis: int = 30000):
250248
def force_flush(self, timeout_millis: Optional[int] = None) -> bool:
251249
if self._shutdown:
252250
return False
253-
flush_should_end = (
251+
deadline = (
254252
time.time() + (timeout_millis / 1000)
255253
if timeout_millis is not None
256-
else None
254+
else math.inf
257255
)
258256
# Blocking call to export.
259-
return self._export(BatchExportStrategy.EXPORT_ALL, flush_should_end)
257+
return self._export(BatchExportStrategy.EXPORT_ALL, deadline)

opentelemetry-sdk/tests/shared_internal/test_batch_processor.py

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@
1515
# pylint: disable=protected-access
1616
import gc
1717
import logging
18+
import math
1819
import multiprocessing
1920
import os
2021
import threading
2122
import time
2223
import unittest
24+
import unittest.mock
2325
import weakref
2426
from platform import system
2527
from typing import Any
@@ -194,31 +196,20 @@ def test_force_flush_returns_true_when_all_exported(
194196
def test_force_flush_returns_false_when_timeout_exceeded(
195197
self, batch_processor_class, telemetry
196198
):
197-
call_count = 0
198-
199-
def slow_export(batch):
200-
nonlocal call_count
201-
call_count += 1
202-
# Sleep long enough that the deadline is exceeded after first batch.
203-
time.sleep(0.2)
204-
205199
exporter = Mock()
206-
exporter.export.side_effect = slow_export
207200
batch_processor = batch_processor_class(
208201
exporter,
209-
max_queue_size=200,
202+
max_queue_size=15,
210203
max_export_batch_size=1,
211-
# Long enough that the worker thread won't wake up during the test.
212204
schedule_delay_millis=30000,
213205
export_timeout_millis=500,
214206
)
215-
for _ in range(50):
207+
for _ in range(3):
216208
batch_processor._batch_processor.emit(telemetry)
217-
# 100ms timeout, each export takes 200ms, so deadline is hit after first batch.
218-
result = batch_processor.force_flush(timeout_millis=100)
209+
# Mock time.time() to always return math.inf, simulating deadline already exceeded.
210+
with unittest.mock.patch("time.time", return_value=math.inf):
211+
result = batch_processor.force_flush(timeout_millis=100)
219212
assert result is False
220-
# Exporter was called at least once but not for all batches.
221-
assert 1 <= call_count < 50
222213
batch_processor.shutdown()
223214

224215
# pylint: disable=no-self-use

0 commit comments

Comments
 (0)