Skip to content

Commit df0baae

Browse files
committed
fix: respect timeout_millis in BatchProcessor.force_flush (#4568)
1 parent 5fe8092 commit df0baae

File tree

2 files changed

+80
-4
lines changed

2 files changed

+80
-4
lines changed

opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -169,12 +169,15 @@ def worker(self):
169169
self._worker_awaken.clear()
170170
self._export(BatchExportStrategy.EXPORT_ALL)
171171

172-
def _export(self, batch_strategy: BatchExportStrategy) -> None:
172+
def _export(self, batch_strategy: BatchExportStrategy, flush_should_end: Optional[float] = None) -> bool:
173+
# Returns True if all batches were exported, False if flush_should_end was reached.
173174
with self._export_lock:
174175
iteration = 0
175176
# We could see concurrent export calls from worker and force_flush. We call _should_export_batch
176177
# once the lock is obtained to see if we still need to make the requested export.
177178
while self._should_export_batch(batch_strategy, iteration):
179+
if flush_should_end is not None and time.time() >= flush_should_end:
180+
return False
178181
iteration += 1
179182
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
180183
try:
@@ -195,6 +198,7 @@ def _export(self, batch_strategy: BatchExportStrategy) -> None:
195198
"Exception while exporting %s.", self._exporting
196199
)
197200
detach(token)
201+
return True
198202

199203
def emit(self, data: Telemetry) -> None:
200204
if self._shutdown:
@@ -236,10 +240,13 @@ def shutdown(self, timeout_millis: int = 30000):
236240
# call is ongoing and the thread isn't finished. In this case we will return instead of waiting on
237241
# the thread to finish.
238242

239-
# TODO: Fix force flush so the timeout is used https://github.com/open-telemetry/opentelemetry-python/issues/4568.
240243
def force_flush(self, timeout_millis: Optional[int] = None) -> bool:
241244
if self._shutdown:
242245
return False
246+
flush_should_end = (
247+
time.time() + (timeout_millis / 1000)
248+
if timeout_millis is not None
249+
else None
250+
)
243251
# Blocking call to export.
244-
self._export(BatchExportStrategy.EXPORT_ALL)
245-
return True
252+
return self._export(BatchExportStrategy.EXPORT_ALL, flush_should_end)

opentelemetry-sdk/tests/shared_internal/test_batch_processor.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,75 @@ def test_force_flush_flushes_telemetry(
171171
exporter.export.assert_called_once_with([telemetry for _ in range(10)])
172172
batch_processor.shutdown()
173173

174+
# pylint: disable=no-self-use
175+
def test_force_flush_returns_true_when_all_exported(
176+
self, batch_processor_class, telemetry
177+
):
178+
exporter = Mock()
179+
batch_processor = batch_processor_class(
180+
exporter,
181+
max_queue_size=15,
182+
max_export_batch_size=15,
183+
schedule_delay_millis=30000,
184+
export_timeout_millis=500,
185+
)
186+
for _ in range(10):
187+
batch_processor._batch_processor.emit(telemetry)
188+
result = batch_processor.force_flush(timeout_millis=5000)
189+
assert result is True
190+
exporter.export.assert_called_once()
191+
batch_processor.shutdown()
192+
193+
# pylint: disable=no-self-use
194+
def test_force_flush_returns_false_when_timeout_exceeded(
195+
self, batch_processor_class, telemetry
196+
):
197+
call_count = 0
198+
199+
def slow_export(batch):
200+
nonlocal call_count
201+
call_count += 1
202+
# Sleep long enough that the deadline is exceeded after first batch.
203+
time.sleep(0.2)
204+
205+
exporter = Mock()
206+
exporter.export.side_effect = slow_export
207+
batch_processor = batch_processor_class(
208+
exporter,
209+
max_queue_size=50,
210+
max_export_batch_size=1,
211+
schedule_delay_millis=30000,
212+
export_timeout_millis=500,
213+
)
214+
for _ in range(10):
215+
batch_processor._batch_processor.emit(telemetry)
216+
# 100ms timeout, each export takes 200ms, so deadline is hit after first batch.
217+
result = batch_processor.force_flush(timeout_millis=100)
218+
assert result is False
219+
# Exporter was called at least once but not for all batches.
220+
assert 1 <= call_count < 10
221+
batch_processor.shutdown()
222+
223+
# pylint: disable=no-self-use
224+
def test_force_flush_returns_false_when_shutdown(
225+
self, batch_processor_class, telemetry
226+
):
227+
exporter = Mock()
228+
batch_processor = batch_processor_class(
229+
exporter,
230+
max_queue_size=15,
231+
max_export_batch_size=15,
232+
schedule_delay_millis=30000,
233+
export_timeout_millis=500,
234+
)
235+
batch_processor.shutdown()
236+
for _ in range(10):
237+
batch_processor._batch_processor.emit(telemetry)
238+
result = batch_processor.force_flush(timeout_millis=5000)
239+
assert result is False
240+
# Nothing should have been exported after shutdown.
241+
exporter.export.assert_not_called()
242+
174243
@unittest.skipUnless(
175244
hasattr(os, "fork"),
176245
"needs *nix",

0 commit comments

Comments
 (0)