Skip to content

Commit 7be6cff

Browse files
fix: finalize PyArrow S3 threads after Iceberg read in instructor_onboarding (#2221)
PyArrowFileIO initializes the C++ AWS SDK which spawns non-daemon threads. These threads block subprocess exit, causing the Dagster step subprocess to hang after STEP_SUCCESS with the multiprocess executor never logging 'parent process exiting'. Calling pa_fs.finalize_s3() after collect() — the last PyArrow S3 read in this asset — shuts down the C++ thread pool and allows clean subprocess exit. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 639d305 commit 7be6cff

1 file changed

Lines changed: 13 additions & 1 deletion

File tree

dg_projects/lakehouse/lakehouse/assets/instructor_onboarding.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from typing import Any
1111

1212
import polars as pl
13+
import pyarrow.fs as pa_fs
1314
from dagster import (
1415
AssetExecutionContext,
1516
AssetIn,
@@ -77,8 +78,19 @@ def generate_instructor_onboarding_user_list(
7778
# Reorder columns: email, role, sent_invite
7879
user_data = user_data.select(["email", "role", "sent_invite"])
7980

80-
# Collect the LazyFrame before operations that need materialization
81+
# Collect the LazyFrame to materialize the Iceberg scan.
82+
# After collect() returns, this asset performs no further PyArrow S3 reads;
83+
# the subsequent CSV generation uses StringIO and the IO manager output is a
84+
# plain string (not Iceberg-backed). finalize_s3() shuts down the C++ S3
85+
# thread pool so the step subprocess can exit cleanly — PyArrow non-daemon
86+
# threads otherwise block Python's shutdown sequence.
87+
# NOTE: finalize_s3() is irreversible in PyArrow 24+; no PyArrow S3 usage
88+
# must follow this call in the same subprocess.
8189
user_data_collected = user_data.collect()
90+
try:
91+
pa_fs.finalize_s3()
92+
except Exception: # noqa: BLE001
93+
context.log.warning("Failed to finalize PyArrow S3", exc_info=True)
8294

8395
# Convert to CSV string using StringIO (no file I/O)
8496
csv_buffer = StringIO()

0 commit comments

Comments
 (0)