Skip to content

Use static shared catalog and remove premature FileIO close#38149

Merged
ahmedabu98 merged 9 commits intoapache:masterfrom
dejii:fix/iceberg-fileio-lifecycle
Apr 21, 2026
Merged

Use static shared catalog and remove premature FileIO close#38149
ahmedabu98 merged 9 commits intoapache:masterfrom
dejii:fix/iceberg-fileio-lifecycle

Conversation

@dejii
Copy link
Copy Markdown
Contributor

@dejii dejii commented Apr 13, 2026

Problem

IcebergIO write pipelines fail with "connection pool shut down" errors when writing to dynamic destinations across multiple bundles on the same worker. The root cause:

  • RecordWriterManager.close() (called per-bundle in @FinishBundle) was closing FileIO instances
  • RESTSessionCatalog share a single FileIO across all tables. Closing it per-bundle permanently kills the catalog's connection pool — all subsequent bundles on that DoFn instance fail

Fix

Remove all premature FileIO close calls — RecordWriterManager, AppendFilesToTables, and ScanTaskReader must not close a resource they don't own.

Use a static shared catalog per VM. This ensures:

  • One catalog instance per unique config per VM (keyed by IcebergCatalogConfig AutoValue equality)
  • All DoFn threads on the same worker share the catalog and its connection pool
  • No @teardown close needed — catalog resources (HTTP client, FileIO, auth manager) are designed to be long-lived; JVM/container shutdown handles cleanup

What changed

What Before this PR After this PR
Catalog instance One per DoFn thread (via transient cachedCatalog) One per VM per config (static ConcurrentHashMap)
Table cache Static, shared across VM Static, shared across VM (unchanged)
FileIO close Per-bundle in RecordWriterManager.close(), also in AppendFilesToTables and ScanTaskReader Never — JVM/container shutdown handles cleanup
Catalog quota impact N catalogs per VM (one per DoFn thread) 1 catalog per VM per config
loadTable() calls at startup One per table per VM (static cache) One per table per VM (unchanged)

follow up on: #37782


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses a critical issue where IcebergIO write pipelines fail due to premature connection pool closure. By shifting the responsibility of closing the FileIO from the per-bundle RecordWriterManager to the DoFn's @teardown lifecycle, we ensure that shared resources remain active across multiple bundles, preventing 'connection pool shut down' errors while maintaining proper resource cleanup.

Highlights

  • Resource Lifecycle Management: Removed FileIO closure from RecordWriterManager.close() to prevent premature connection pool termination.
  • Teardown Implementation: Added @teardown methods to IcebergIO write DoFns to ensure the catalog is closed exactly once when the DoFn instance is destroyed.
  • Regression Testing: Added comprehensive tests to verify FileIO persistence across bundles and proper cleanup during DoFn teardown.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@dejii
Copy link
Copy Markdown
Contributor Author

dejii commented Apr 13, 2026

@ahmedabu98 following up on #37782 - that fix correctly moved FileIO close from RecordWriter to RecordWriterManager, but it turns out there's a deeper issue that only manifests under high write volume to dynamic destinations (many bundles per worker).

The root cause: the catalog is @MonotonicNonNull on the DoFn and reused across all bundles on the same instance. RecordWriterManager.close() is called per bundle (@FinishBundle), so closing FileIO there, even deduplicated, kills the catalog's shared connection pool for all subsequent bundles on that DoFn.

This PR removes FileIO close from RecordWriterManager entirely and adds @Teardown to all four IcebergIO write DoFns, so the catalog (and its FileIO) is closed exactly once when the DoFn instance is destroyed.

Would appreciate your review here.

@dejii
Copy link
Copy Markdown
Contributor Author

dejii commented Apr 13, 2026

The alternative from Beam's side would create brittle coupling to Iceberg internals. @Teardown is a clean boundary: Beam manages the catalog lifecycle, the catalog manages everything it owns.

@github-actions
Copy link
Copy Markdown
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@dejii
Copy link
Copy Markdown
Contributor Author

dejii commented Apr 13, 2026

The check failures are not related to the code changes. e.g:

:sdks:java:maven-archetypes:examples:generateSources: generate-sources.sh exited with code 127 (command not found)

@dejii
Copy link
Copy Markdown
Contributor Author

dejii commented Apr 13, 2026

assign set of reviewers

@github-actions
Copy link
Copy Markdown
Contributor

Assigning reviewers:

R: @chamikaramj for label java.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@ahmedabu98
Copy link
Copy Markdown
Contributor

Thanks @dejii, overall this looks good. I'm wondering though, if our assumption that closing the Catalog will indeed close all underlying FileIOs. Did you check if this is true for different catalog implementations?

Copy link
Copy Markdown
Contributor

@stankiewicz stankiewicz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit lost with overall lifecycle of catalog here.

@stankiewicz
Copy link
Copy Markdown
Contributor

hey,

I understand the need to close the catalog.
Doing it per bundle is an overkill, especially if for each bundle there are many tables to write to, catalog will waste plenty of time on loading table credentials.
Not doing it at all, especially with dynamic destinations and with vended credentials, fileIOTracker will grow without limits and we may constantly refresh credentials for tables that we don't need credentials anymore.

@dejii
Copy link
Copy Markdown
Contributor Author

dejii commented Apr 13, 2026

Hey @stankiewicz - thanks for the review.

Doing it per bundle is an overkill, especially if for each bundle there are many tables to write to, catalog will waste plenty of time on loading table credentials.

Just to clarify - closing FileIO per bundle is the current behavior, and it's not wasted time but thrown errors when using dynamic destinations: the catalog tries to reuse a dead connection pool on every subsequent bundle, and the pipeline ultimately fails.

@dejii
Copy link
Copy Markdown
Contributor Author

dejii commented Apr 13, 2026

Thanks @dejii, overall this looks good. I'm wondering though, if our assumption that closing the Catalog will indeed close all underlying FileIOs. Did you check if this is true for different catalog implementations?

@ahmedabu98 Yes, confirmed - verified against source and included links in the PR description. REST, Glue, JDBC, and Nessie catalogs all properly close their FileIO via CloseableGroup in close(). Hadoop and Hive catalogs do not close their FileIO, but this is likely intentional - they default to HadoopFileIO which has no close() implementation since Hadoop's FileSystem manages its own lifecycle.

@dejii
Copy link
Copy Markdown
Contributor Author

dejii commented Apr 14, 2026

@stankiewicz Thanks for the review - addressed all the feedback from our discussion. Here's what changed:

  • Catalog lifecycle: Each DoFn now creates its own catalog via catalogConfig.newCatalog() in @Setup and closes it in @Teardown. This removes the reliance on the shared cachedCatalog in IcebergCatalogConfig, so the fix works correctly regardless of whether the DoFn is passed as instance or deserialized. The existing catalog() method is preserved for driver-side operations (pipeline construction, schema inference, etc.).

  • Static table cache: LAST_REFRESHED_TABLE_CACHE is no longer static - each DoFn owns its own cache instance and passes it to RecordWriterManager via the constructor. This prevents a closed catalog's dead Table objects from poisoning other DoFn instances.

@dejii dejii requested a review from stankiewicz April 14, 2026 00:01
@stankiewicz
Copy link
Copy Markdown
Contributor

stankiewicz commented Apr 14, 2026

@stankiewicz Thanks for the review - addressed all the feedback from our discussion. Here's what changed:

  • Catalog lifecycle: Each DoFn now creates its own catalog via catalogConfig.newCatalog() in @Setup and closes it in @Teardown. This removes the reliance on the shared cachedCatalog in IcebergCatalogConfig, so the fix works correctly regardless of whether the DoFn is passed as instance or deserialized. The existing catalog() method is preserved for driver-side operations (pipeline construction, schema inference, etc.).
  • Static table cache: LAST_REFRESHED_TABLE_CACHE is no longer static - each DoFn owns its own cache instance and passes it to RecordWriterManager via the constructor. This prevents a closed catalog's dead Table objects from poisoning other DoFn instances.

thanks @dejii .

What is important to note is:

What Previous Change
table cache - table identifier --> storage credentials, io per VM per doFn life / harness thread
iceberg catalog instance per harness thread per doFn life / per harness thread
io close finish bundle doFn.teardown closes catalog (Which closes all IOs that need to be closed)
leak no memory leak slow memory leak - if doFn runs for long, catalog will grow over time with trackedFileIO with vended credentials

@ahmedabu98 we've discussed having multiple catalog and some quota issues. Can you share some thoughts? This change doesn't reduce amount of catalogs used over time.

@dejii
Copy link
Copy Markdown
Contributor Author

dejii commented Apr 14, 2026

@stankiewicz Thanks for the summary table. One comment on the leak characterization:

There shouldn't be a memory leak. FileIOTracker uses Caffeine's weakKeys() with a removal listener that calls fileIO.close() when a TableOperations key is GC'd (FileIOTracker.java#L37-L47). The strong reference chain is tableCache -> Table -> TableOperations. This should happen continuously throughout the DoFn's lifetime, not just at teardown, so even a DoFn running for long will naturally clean up per-table FileIOs for tables that go idle. The tracker is bounded by the number of distinct tables written to in the last 10 minutes.

@dejii
Copy link
Copy Markdown
Contributor Author

dejii commented Apr 14, 2026

On catalog quota: On Dataflow Runner v2 (and any runner that deserializes DoFns), the number of catalogs per worker is unchanged. The only case where catalog count increases is on runners that pass DoFns by instance.

@stankiewicz
Copy link
Copy Markdown
Contributor

thanks, fix spotless errors, please.

@ahmedabu98
Copy link
Copy Markdown
Contributor

I understand wanting to prevent a situation where closed thread-level Catalogs lead to dead Tables in a VM cache.

I'm a little worried about moving to a per-thread Table cache though. There will be a lot of "get table" calls in the beginning of a job's lifetime which can max out quota pretty quickly (we’ve seen that with users already). And it gets exacerbated for some streaming runners like Dataflow, where (# threads) >> (# vCPUs).

What if we move in the opposite direction and have a static cache of Catalogs? So all threads in a VM share one Catalog instance and perform all Table operations through it? Realistically, only one thread will need to create the Catalog. That same thread will likely create/load the Table and also store it in a static cache.

Thoughts on this approach? It would also improve our current Catalog management (which I’m noticing, is to eagerly create a Catalog per thread whether or not it actually gets used)

@ahmedabu98
Copy link
Copy Markdown
Contributor

I'm wondering if with this approach we can let JVM shutdown handle the FileIO cleanup instead of doing it ourselves in @Teardown

@stankiewicz
Copy link
Copy Markdown
Contributor

stankiewicz commented Apr 15, 2026

Singleton configuration is a bit more tricky, as you would have to keep single catalog instance per IcebergCatalogConfig but definately doable. IcebergCatalogConfig is autoValue with equals so adding static Map <IcebergCatalogConfig, Catalog> singletonMap should work.

@stankiewicz
Copy link
Copy Markdown
Contributor

if we keep cache per VM, there are two options:

  1. we shouldn't close catalog on teardown
  2. if there is good reason to teardown catalog, it should also wait until all other doFns are not using catalog, so it can safely flush cache and null catalog. One solution is to introduce ReentrantReadWriteLock where write lock is used when you want to recycle and read lock is used to fetch catalog.

…lifecycle

# Conflicts:
#	sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java
@dejii
Copy link
Copy Markdown
Contributor Author

dejii commented Apr 15, 2026

I think we can go with (1). No real downside to not closing for a shared static catalog. Taking Rest catalog as an example, the resources are:

All are designed to be long-lived. The only "leak" is that these resources aren't explicitly released when the pipeline finishes, but the JVM/container dies shortly after anyway.

@dejii
Copy link
Copy Markdown
Contributor Author

dejii commented Apr 15, 2026

@ahmedabu98 @stankiewicz -- reworked the PR based on the discussion so far. Key notes:

What Previous (before PR) This PR
Catalog instance One per DoFn thread (depending on the runner) One per VM
Table cache Static, shared across VM Static, shared across VM (unchanged)
FileIO close Per-bundle in RecordWriterManager.close(), also in AppendFilesToTables and ScanTaskReader Never — JVM/container shutdown handles cleanup
Catalog quota impact (init calls) N per VM 1 per VM
loadTable() calls at startup One per table per VM (static cache) One per table per VM (unchanged)
Resource cleanup Premature — killed shared connection pool when using dynamic destinations JVM lifecycle

@stankiewicz
Copy link
Copy Markdown
Contributor

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request transitions Iceberg Catalog and FileIO management to a shared static cache to allow resources to survive across bundle boundaries. Feedback highlights several critical issues: the static cache lacks an eviction policy, potentially causing memory leaks; shared catalogs might be prematurely closed by one DoFn during teardown while others are still active; and the removal of try-with-resources in AppendFilesToTables has made ManifestWriter closing unsafe in the event of exceptions. Additionally, it is suggested to expand test coverage to verify the full lifecycle including teardown.

@stankiewicz
Copy link
Copy Markdown
Contributor

Update PR description and title @dejii so it's describing current effort. looks good to me!

Copy link
Copy Markdown
Contributor

@ahmedabu98 ahmedabu98 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thanks for sticking this out @dejii!

I like how this turned out, and am happy with the simplicity.

Can you just trigger integration tests to make sure things are running smooth? Just commit a small change for these files:
https://github.com/apache/beam/blob/master/.github/trigger_files/IO_Iceberg_Integration_Tests.json
https://github.com/apache/beam/blob/master/.github/trigger_files/IO_Iceberg_Integration_Tests_Dataflow.json

@dejii dejii changed the title Fix IcebergIO conn pool crash by moving FileIO lifecycle to @Teardown Use static shared catalog and remove premature FileIO close Apr 16, 2026
@dejii
Copy link
Copy Markdown
Contributor Author

dejii commented Apr 16, 2026

The failing tests don't seem related to my PR, and is currently flaky on master. 1, 2

@ahmedabu98
Copy link
Copy Markdown
Contributor

There's a known issue for IcebergIO Managed Integration Tests on Dataflow (#34809 (comment))

We can merge when the remaining tests pass

@ahmedabu98
Copy link
Copy Markdown
Contributor

Java Precommit is also a known issue: #38094 (comment)

Java Examples workflow passed but Test Results failed to publish.

We're good to merge!

@ahmedabu98 ahmedabu98 merged commit 0cdd1b4 into apache:master Apr 21, 2026
17 of 20 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants