diff --git a/python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py b/python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py index cf61463cd6870..dc43ff938267b 100644 --- a/python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py +++ b/python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py @@ -84,4 +84,7 @@ def process(df_id, batch_id): # type: ignore[no-untyped-def] sock.settimeout(None) write_int(os.getpid(), sock_file) sock_file.flush() - main(sock_file, sock_file) + try: + main(sock_file, sock_file) + finally: + sock_file.close() diff --git a/python/pyspark/sql/connect/streaming/worker/listener_worker.py b/python/pyspark/sql/connect/streaming/worker/listener_worker.py index e1f4678e42f16..38278af732a8d 100644 --- a/python/pyspark/sql/connect/streaming/worker/listener_worker.py +++ b/python/pyspark/sql/connect/streaming/worker/listener_worker.py @@ -97,4 +97,7 @@ def process(listener_event_str, listener_event_type): # type: ignore[no-untyped sock.settimeout(None) write_int(os.getpid(), sock_file) sock_file.flush() - main(sock_file, sock_file) + try: + main(sock_file, sock_file) + finally: + sock_file.close() diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 90b11d0623166..d1c3e508d8b46 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -1304,4 +1304,7 @@ def process(): # TODO: Remove the following two lines and use `Process.pid()` when we drop JDK 8. write_int(os.getpid(), sock_file) sock_file.flush() - main(sock_file, sock_file) + try: + main(sock_file, sock_file) + finally: + sock_file.close()