|
20 | 20 | import threading |
21 | 21 | import time |
22 | 22 | import unittest |
| 23 | +import unittest.mock |
23 | 24 | import weakref |
24 | 25 | from platform import system |
25 | 26 | from typing import Any |
@@ -194,32 +195,33 @@ def test_force_flush_returns_true_when_all_exported( |
194 | 195 | def test_force_flush_returns_false_when_timeout_exceeded( |
195 | 196 | self, batch_processor_class, telemetry |
196 | 197 | ): |
197 | | - call_count = 0 |
198 | | - |
199 | | - def slow_export(batch): |
200 | | - nonlocal call_count |
201 | | - call_count += 1 |
202 | | - # Sleep long enough that the deadline is exceeded after first batch. |
203 | | - time.sleep(0.2) |
204 | | - |
205 | 198 | exporter = Mock() |
206 | | - exporter.export.side_effect = slow_export |
207 | 199 | batch_processor = batch_processor_class( |
208 | 200 | exporter, |
209 | | - max_queue_size=200, |
| 201 | + max_queue_size=15, |
210 | 202 | max_export_batch_size=1, |
211 | | - # Long enough that the worker thread won't wake up during the test. |
212 | 203 | schedule_delay_millis=30000, |
213 | 204 | export_timeout_millis=500, |
214 | 205 | ) |
215 | | - for _ in range(50): |
| 206 | + # Stop the worker thread first so it cannot export or interfere. |
| 207 | + batch_processor._batch_processor._shutdown = True |
| 208 | + batch_processor._batch_processor._worker_awaken.set() |
| 209 | + batch_processor._batch_processor._worker_thread.join() |
| 210 | + # Reset _shutdown so force_flush is not a no-op. |
| 211 | + batch_processor._batch_processor._shutdown = False |
| 212 | + # Emit items after worker is stopped. |
| 213 | + for _ in range(3): |
216 | 214 | batch_processor._batch_processor.emit(telemetry) |
217 | | - # 100ms timeout, each export takes 200ms, so deadline is hit after first batch. |
218 | | - result = batch_processor.force_flush(timeout_millis=100) |
| 215 | + # Mock time.time(): first call computes deadline, second call is past it. |
| 216 | + start = time.time() |
| 217 | + with unittest.mock.patch( |
| 218 | + "opentelemetry.sdk._shared_internal.time.time", |
| 219 | + side_effect=[start, start + 1000], |
| 220 | + ): |
| 221 | + result = batch_processor.force_flush(timeout_millis=100) |
219 | 222 | assert result is False |
220 | | - # Exporter was called at least once but not for all batches. |
221 | | - assert 1 <= call_count < 50 |
222 | | - batch_processor.shutdown() |
| 223 | + batch_processor._batch_processor._shutdown = True |
| 224 | + batch_processor._batch_processor._exporter.shutdown() |
223 | 225 |
|
224 | 226 | # pylint: disable=no-self-use |
225 | 227 | def test_force_flush_returns_false_when_shutdown( |
|
0 commit comments