Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- `opentelemetry-sdk`: fix `BatchProcessor.force_flush` to respect `timeout_millis`, previously the timeout was ignored and the flush would block until all telemetry was exported
([#4982](https://github.com/open-telemetry/opentelemetry-python/pull/4982))
- `opentelemetry-sdk`: Add `service` resource detector support to declarative file configuration via `detection_development.detectors[].service`
([#5003](https://github.com/open-telemetry/opentelemetry-python/pull/5003))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import enum
import inspect
import logging
import math
import os
import threading
import time
Expand Down Expand Up @@ -176,12 +177,19 @@ def worker(self):
self._worker_awaken.clear()
self._export(BatchExportStrategy.EXPORT_ALL)

def _export(self, batch_strategy: BatchExportStrategy) -> None:
def _export(
self,
batch_strategy: BatchExportStrategy,
deadline: float = math.inf,
) -> bool:
# Returns True if all batches were exported, False if deadline was reached.
with self._export_lock:
iteration = 0
# We could see concurrent export calls from worker and force_flush. We call _should_export_batch
# once the lock is obtained to see if we still need to make the requested export.
while self._should_export_batch(batch_strategy, iteration):
if time.time() >= deadline:
return False
iteration += 1
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
error: Exception | None = None
Expand All @@ -206,6 +214,7 @@ def _export(self, batch_strategy: BatchExportStrategy) -> None:
finally:
self._metrics.finish_items(count, error)
detach(token)
return True

def emit(self, data: Telemetry) -> None:
if self._shutdown:
Expand Down Expand Up @@ -248,10 +257,13 @@ def shutdown(self, timeout_millis: int = 30000):
# call is ongoing and the thread isn't finished. In this case we will return instead of waiting on
# the thread to finish.

# TODO: Fix force flush so the timeout is used https://github.com/open-telemetry/opentelemetry-python/issues/4568.
def force_flush(self, timeout_millis: Optional[int] = None) -> bool:
if self._shutdown:
return False
deadline = (
time.time() + (timeout_millis / 1000)
if timeout_millis is not None
else math.inf
)
# Blocking call to export.
self._export(BatchExportStrategy.EXPORT_ALL)
return True
return self._export(BatchExportStrategy.EXPORT_ALL, deadline)
72 changes: 72 additions & 0 deletions opentelemetry-sdk/tests/shared_internal/test_batch_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import threading
import time
import unittest
import unittest.mock
import weakref
from platform import system
from typing import Any
Expand Down Expand Up @@ -171,6 +172,77 @@ def test_force_flush_flushes_telemetry(
exporter.export.assert_called_once_with([telemetry for _ in range(10)])
batch_processor.shutdown()

# pylint: disable=no-self-use
def test_force_flush_returns_true_when_all_exported(
self, batch_processor_class, telemetry
):
exporter = Mock()
batch_processor = batch_processor_class(
exporter,
max_queue_size=15,
max_export_batch_size=15,
schedule_delay_millis=30000,
export_timeout_millis=500,
)
for _ in range(10):
batch_processor._batch_processor.emit(telemetry)
result = batch_processor.force_flush(timeout_millis=5000)
assert result is True
exporter.export.assert_called_once()
batch_processor.shutdown()

# pylint: disable=no-self-use
def test_force_flush_returns_false_when_timeout_exceeded(
self, batch_processor_class, telemetry
):
exporter = Mock()
batch_processor = batch_processor_class(
exporter,
max_queue_size=15,
max_export_batch_size=1,
schedule_delay_millis=30000,
export_timeout_millis=500,
)
# Stop the worker thread first so it cannot export or interfere.
batch_processor._batch_processor._shutdown = True
batch_processor._batch_processor._worker_awaken.set()
batch_processor._batch_processor._worker_thread.join()
# Reset _shutdown so force_flush is not a no-op.
batch_processor._batch_processor._shutdown = False
# Emit items after worker is stopped.
for _ in range(3):
batch_processor._batch_processor.emit(telemetry)
# Mock time.time(): first call computes deadline, second call is past it.
start = time.time()
with unittest.mock.patch(
"opentelemetry.sdk._shared_internal.time.time",
side_effect=[start, start + 1000],
):
result = batch_processor.force_flush(timeout_millis=100)
assert result is False
batch_processor._batch_processor._shutdown = True
batch_processor._batch_processor._exporter.shutdown()

# pylint: disable=no-self-use
def test_force_flush_returns_false_when_shutdown(
self, batch_processor_class, telemetry
):
exporter = Mock()
batch_processor = batch_processor_class(
exporter,
max_queue_size=15,
max_export_batch_size=15,
schedule_delay_millis=30000,
export_timeout_millis=500,
)
batch_processor.shutdown()
for _ in range(10):
batch_processor._batch_processor.emit(telemetry)
result = batch_processor.force_flush(timeout_millis=5000)
assert result is False
# Nothing should have been exported after shutdown.
exporter.export.assert_not_called()

@unittest.skipUnless(
hasattr(os, "fork"),
"needs *nix",
Expand Down
Loading