Skip to content

Commit befc93c

Browse files
committed
Fix thread leak for LOOPBACK workers in external worker pool.
1 parent 11ffedc commit befc93c

2 files changed

Lines changed: 66 additions & 0 deletions

File tree

sdks/python/apache_beam/runners/portability/prism_runner_test.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -488,6 +488,63 @@ def test_singleton(self, enable_singleton):
488488
else:
489489
mock_prism_server.assert_called_once()
490490

491+
def test_loopback_worker_daemon_thread_accumulation(self):
492+
"""Verifies that in LOOPBACK mode, the external worker pool servicer properly
493+
tracks active thread-based SdkHarness workers and cleanly shuts them down in
494+
StopWorker via sentinel messages. This prevents background daemon threads from
495+
accumulating across sequential pipeline executions and leaking resources.
496+
"""
497+
import queue
498+
import threading
499+
import time
500+
from apache_beam.portability.api import beam_fn_api_pb2
501+
from apache_beam.runners.worker import worker_pool_main
502+
503+
servicer = worker_pool_main.BeamFnExternalWorkerPoolServicer(
504+
use_process=False, state_cache_size=0, data_buffer_time_limit_ms=0)
505+
506+
active_workers = []
507+
mock_responses = queue.Queue()
508+
509+
def mock_run(self_worker):
510+
active_workers.append(self_worker)
511+
mock_responses.get()
512+
active_workers.remove(self_worker)
513+
514+
with mock.patch('apache_beam.runners.worker.sdk_worker.SdkHarness') as mock_harness:
515+
mock_harness.return_value._responses = mock_responses
516+
mock_harness.return_value.run = lambda: mock_run(mock_harness)
517+
518+
# Simulate starting Worker 1 for Pipeline 1
519+
req1 = beam_fn_api_pb2.StartWorkerRequest(worker_id="worker_1")
520+
req1.control_endpoint.url = "localhost:12345"
521+
servicer.StartWorker(req1, None)
522+
523+
time.sleep(0.05)
524+
self.assertEqual(len(active_workers), 1)
525+
526+
# Simulate stopping Worker 1 at the end of Pipeline 1
527+
stop_req1 = beam_fn_api_pb2.StopWorkerRequest(worker_id="worker_1")
528+
servicer.StopWorker(stop_req1, None)
529+
530+
time.sleep(0.05)
531+
# Verify the fix: StopWorker successfully tells the thread harness to shut down,
532+
# completely resolving the thread leak!
533+
self.assertEqual(len(active_workers), 0)
534+
535+
# Simulate starting Worker 2 for Pipeline 2
536+
req2 = beam_fn_api_pb2.StartWorkerRequest(worker_id="worker_2")
537+
req2.control_endpoint.url = "localhost:12345"
538+
servicer.StartWorker(req2, None)
539+
540+
time.sleep(0.05)
541+
self.assertEqual(len(active_workers), 1)
542+
543+
# Clean up the second worker
544+
servicer.StopWorker(beam_fn_api_pb2.StopWorkerRequest(worker_id="worker_2"), None)
545+
time.sleep(0.05)
546+
self.assertEqual(len(active_workers), 0)
547+
491548

492549
if __name__ == '__main__':
493550
# Run the tests.

sdks/python/apache_beam/runners/worker/worker_pool_main.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ def __init__(
8282
self._state_cache_size = state_cache_size
8383
self._data_buffer_time_limit_ms = data_buffer_time_limit_ms
8484
self._worker_processes: dict[str, subprocess.Popen] = {}
85+
self._worker_threads: dict[str, sdk_worker.SdkHarness] = {}
8586

8687
@classmethod
8788
def start(
@@ -166,6 +167,7 @@ def StartWorker(
166167
worker_id=start_worker_request.worker_id,
167168
state_cache_size=self._state_cache_size,
168169
data_buffer_time_limit_ms=self._data_buffer_time_limit_ms)
170+
self._worker_threads[start_worker_request.worker_id] = worker
169171
worker_thread = threading.Thread(
170172
name='run_worker_%s' % start_worker_request.worker_id,
171173
target=worker.run)
@@ -188,6 +190,13 @@ def StopWorker(
188190
_LOGGER.info("Stopping worker %s" % stop_worker_request.worker_id)
189191
kill_process_gracefully(worker_process)
190192

193+
worker_thread_harness = self._worker_threads.pop(
194+
stop_worker_request.worker_id, None)
195+
if worker_thread_harness:
196+
_LOGGER.info("Stopping thread worker %s" % stop_worker_request.worker_id)
197+
from apache_beam.utils.sentinel import Sentinel
198+
worker_thread_harness._responses.put(Sentinel.sentinel)
199+
191200
return beam_fn_api_pb2.StopWorkerResponse()
192201

193202

0 commit comments

Comments
 (0)