From 92a4f435991c7c729eaebb4997bb045eb81c0031 Mon Sep 17 00:00:00 2001 From: Antonio Blanco Date: Mon, 6 Apr 2026 16:46:08 -0700 Subject: [PATCH] [SPARK-53759][PYTHON][4.0] Fix missing close in simple-worker path On Python 3.12+, changed GC finalization ordering can close the underlying socket before BufferedRWPair flushes its write buffer, causing EOFException on the JVM side. This affects the simple-worker (non-daemon) path used on Windows and when spark.python.use.daemon=false. Adds explicit sock_file.close() in a finally block to all 12 worker files' __main__ blocks, matching how PR #54458 solved this on master via a context manager. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../sql/connect/streaming/worker/foreach_batch_worker.py | 5 ++++- .../pyspark/sql/connect/streaming/worker/listener_worker.py | 5 ++++- .../pyspark/sql/streaming/python_streaming_source_runner.py | 5 ++++- .../sql/streaming/transform_with_state_driver_worker.py | 5 ++++- python/pyspark/sql/worker/analyze_udtf.py | 5 ++++- python/pyspark/sql/worker/commit_data_source_write.py | 5 ++++- python/pyspark/sql/worker/create_data_source.py | 5 ++++- python/pyspark/sql/worker/lookup_data_sources.py | 5 ++++- python/pyspark/sql/worker/plan_data_source_read.py | 5 ++++- python/pyspark/sql/worker/python_streaming_sink_runner.py | 5 ++++- python/pyspark/sql/worker/write_into_data_source.py | 5 ++++- python/pyspark/worker.py | 5 ++++- 12 files changed, 48 insertions(+), 12 deletions(-) diff --git a/python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py b/python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py index f144ac49e5bb1..a06cb139f5a63 100644 --- a/python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py +++ b/python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py @@ -96,4 +96,7 @@ def process(df_id, batch_id): # type: ignore[no-untyped-def] sock.settimeout(None) write_int(os.getpid(), sock_file) sock_file.flush() - main(sock_file, sock_file) + try: + main(sock_file, sock_file) + finally: + sock_file.close() diff --git a/python/pyspark/sql/connect/streaming/worker/listener_worker.py b/python/pyspark/sql/connect/streaming/worker/listener_worker.py index a7a5066ca0d77..ae4630ed1a1dc 100644 --- a/python/pyspark/sql/connect/streaming/worker/listener_worker.py +++ b/python/pyspark/sql/connect/streaming/worker/listener_worker.py @@ -112,4 +112,7 @@ def process(listener_event_str, listener_event_type): # type: ignore[no-untyped sock.settimeout(None) write_int(os.getpid(), sock_file) sock_file.flush() - main(sock_file, sock_file) + try: + main(sock_file, sock_file) + finally: + sock_file.close() diff --git a/python/pyspark/sql/streaming/python_streaming_source_runner.py b/python/pyspark/sql/streaming/python_streaming_source_runner.py index 11aa4e15ab1ee..eb094b8e2ad9d 100644 --- a/python/pyspark/sql/streaming/python_streaming_source_runner.py +++ b/python/pyspark/sql/streaming/python_streaming_source_runner.py @@ -211,4 +211,7 @@ def main(infile: IO, outfile: IO) -> None: sock.settimeout(None) write_int(os.getpid(), sock_file) sock_file.flush() - main(sock_file, sock_file) + try: + main(sock_file, sock_file) + finally: + sock_file.close() diff --git a/python/pyspark/sql/streaming/transform_with_state_driver_worker.py b/python/pyspark/sql/streaming/transform_with_state_driver_worker.py index 99d386f07b5b6..92b951ab7a80c 100644 --- a/python/pyspark/sql/streaming/transform_with_state_driver_worker.py +++ b/python/pyspark/sql/streaming/transform_with_state_driver_worker.py @@ -99,4 +99,7 @@ def process( (sock_file, sock) = local_connect_and_auth(java_port, auth_secret) write_int(os.getpid(), sock_file) sock_file.flush() - main(sock_file, sock_file) + try: + main(sock_file, sock_file) + finally: + sock_file.close() diff --git a/python/pyspark/sql/worker/analyze_udtf.py b/python/pyspark/sql/worker/analyze_udtf.py index 9247fde78004f..691b05be13183 100644 --- a/python/pyspark/sql/worker/analyze_udtf.py +++ b/python/pyspark/sql/worker/analyze_udtf.py @@ -279,4 +279,7 @@ def invalid_analyze_result_field(field_name: str, expected_field: str) -> PySpar # TODO: Remove the following two lines and use `Process.pid()` when we drop JDK 8. write_int(os.getpid(), sock_file) sock_file.flush() - main(sock_file, sock_file) + try: + main(sock_file, sock_file) + finally: + sock_file.close() diff --git a/python/pyspark/sql/worker/commit_data_source_write.py b/python/pyspark/sql/worker/commit_data_source_write.py index c891d9f083cb8..46596259359bc 100644 --- a/python/pyspark/sql/worker/commit_data_source_write.py +++ b/python/pyspark/sql/worker/commit_data_source_write.py @@ -124,4 +124,7 @@ def main(infile: IO, outfile: IO) -> None: (sock_file, _) = local_connect_and_auth(java_port, auth_secret) write_int(os.getpid(), sock_file) sock_file.flush() - main(sock_file, sock_file) + try: + main(sock_file, sock_file) + finally: + sock_file.close() diff --git a/python/pyspark/sql/worker/create_data_source.py b/python/pyspark/sql/worker/create_data_source.py index 33957616c4834..3aa5b707b681f 100644 --- a/python/pyspark/sql/worker/create_data_source.py +++ b/python/pyspark/sql/worker/create_data_source.py @@ -189,4 +189,7 @@ def main(infile: IO, outfile: IO) -> None: (sock_file, _) = local_connect_and_auth(java_port, auth_secret) write_int(os.getpid(), sock_file) sock_file.flush() - main(sock_file, sock_file) + try: + main(sock_file, sock_file) + finally: + sock_file.close() diff --git a/python/pyspark/sql/worker/lookup_data_sources.py b/python/pyspark/sql/worker/lookup_data_sources.py index 18737095fa9c6..02bb71c927fdd 100644 --- a/python/pyspark/sql/worker/lookup_data_sources.py +++ b/python/pyspark/sql/worker/lookup_data_sources.py @@ -109,4 +109,7 @@ def main(infile: IO, outfile: IO) -> None: (sock_file, _) = local_connect_and_auth(java_port, auth_secret) write_int(os.getpid(), sock_file) sock_file.flush() - main(sock_file, sock_file) + try: + main(sock_file, sock_file) + finally: + sock_file.close() diff --git a/python/pyspark/sql/worker/plan_data_source_read.py b/python/pyspark/sql/worker/plan_data_source_read.py index 4c6fd4c0a77c3..ba9c630979ac2 100644 --- a/python/pyspark/sql/worker/plan_data_source_read.py +++ b/python/pyspark/sql/worker/plan_data_source_read.py @@ -382,4 +382,7 @@ def data_source_read_func(iterator: Iterable[pa.RecordBatch]) -> Iterable[pa.Rec (sock_file, _) = local_connect_and_auth(java_port, auth_secret) write_int(os.getpid(), sock_file) sock_file.flush() - main(sock_file, sock_file) + try: + main(sock_file, sock_file) + finally: + sock_file.close() diff --git a/python/pyspark/sql/worker/python_streaming_sink_runner.py b/python/pyspark/sql/worker/python_streaming_sink_runner.py index 13b8f4d30786c..c0d54b917da0f 100644 --- a/python/pyspark/sql/worker/python_streaming_sink_runner.py +++ b/python/pyspark/sql/worker/python_streaming_sink_runner.py @@ -153,4 +153,7 @@ def main(infile: IO, outfile: IO) -> None: (sock_file, _) = local_connect_and_auth(java_port, auth_secret) write_int(os.getpid(), sock_file) sock_file.flush() - main(sock_file, sock_file) + try: + main(sock_file, sock_file) + finally: + sock_file.close() diff --git a/python/pyspark/sql/worker/write_into_data_source.py b/python/pyspark/sql/worker/write_into_data_source.py index 235e5c249f691..1b96e5cc275e1 100644 --- a/python/pyspark/sql/worker/write_into_data_source.py +++ b/python/pyspark/sql/worker/write_into_data_source.py @@ -260,4 +260,7 @@ def batch_to_rows() -> Iterator[Row]: (sock_file, _) = local_connect_and_auth(java_port, auth_secret) write_int(os.getpid(), sock_file) sock_file.flush() - main(sock_file, sock_file) + try: + main(sock_file, sock_file) + finally: + sock_file.close() diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 6162854799cda..7ff60bd0258b3 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -2081,4 +2081,7 @@ def process(): # TODO: Remove the following two lines and use `Process.pid()` when we drop JDK 8. write_int(os.getpid(), sock_file) sock_file.flush() - main(sock_file, sock_file) + try: + main(sock_file, sock_file) + finally: + sock_file.close()