Use static shared catalog and remove premature FileIO close#38149
Use static shared catalog and remove premature FileIO close#38149ahmedabu98 merged 9 commits intoapache:masterfrom
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request addresses a critical issue where IcebergIO write pipelines fail due to premature connection pool closure. By shifting the responsibility of closing the FileIO from the per-bundle RecordWriterManager to the DoFn's @teardown lifecycle, we ensure that shared resources remain active across multiple bundles, preventing 'connection pool shut down' errors while maintaining proper resource cleanup. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
|
@ahmedabu98 following up on #37782 - that fix correctly moved FileIO close from The root cause: the catalog is This PR removes FileIO close from Would appreciate your review here. |
|
The alternative from Beam's side would create brittle coupling to Iceberg internals. |
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
|
The check failures are not related to the code changes. e.g:
|
|
assign set of reviewers |
|
Assigning reviewers: R: @chamikaramj for label java. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
|
Thanks @dejii, overall this looks good. I'm wondering though, if our assumption that closing the Catalog will indeed close all underlying FileIOs. Did you check if this is true for different catalog implementations? |
stankiewicz
left a comment
There was a problem hiding this comment.
I'm a bit lost with overall lifecycle of catalog here.
|
hey, I understand the need to close the catalog. |
|
Hey @stankiewicz - thanks for the review.
Just to clarify - closing |
@ahmedabu98 Yes, confirmed - verified against source and included links in the PR description. REST, Glue, JDBC, and Nessie catalogs all properly close their |
|
@stankiewicz Thanks for the review - addressed all the feedback from our discussion. Here's what changed:
|
thanks @dejii . What is important to note is:
@ahmedabu98 we've discussed having multiple catalog and some quota issues. Can you share some thoughts? This change doesn't reduce amount of catalogs used over time. |
|
@stankiewicz Thanks for the summary table. One comment on the leak characterization: There shouldn't be a memory leak. |
|
On catalog quota: On Dataflow Runner v2 (and any runner that deserializes DoFns), the number of catalogs per worker is unchanged. The only case where catalog count increases is on runners that pass DoFns by instance. |
|
thanks, fix spotless errors, please. |
|
I understand wanting to prevent a situation where closed thread-level Catalogs lead to dead Tables in a VM cache. I'm a little worried about moving to a per-thread Table cache though. There will be a lot of "get table" calls in the beginning of a job's lifetime which can max out quota pretty quickly (we’ve seen that with users already). And it gets exacerbated for some streaming runners like Dataflow, where (# threads) >> (# vCPUs). What if we move in the opposite direction and have a static cache of Catalogs? So all threads in a VM share one Catalog instance and perform all Table operations through it? Realistically, only one thread will need to create the Catalog. That same thread will likely create/load the Table and also store it in a static cache. Thoughts on this approach? It would also improve our current Catalog management (which I’m noticing, is to eagerly create a Catalog per thread whether or not it actually gets used) |
|
I'm wondering if with this approach we can let JVM shutdown handle the FileIO cleanup instead of doing it ourselves in |
|
Singleton configuration is a bit more tricky, as you would have to keep single catalog instance per IcebergCatalogConfig but definately doable. IcebergCatalogConfig is autoValue with |
|
if we keep cache per VM, there are two options:
|
…lifecycle # Conflicts: # sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java
|
I think we can go with (1). No real downside to not closing for a shared static catalog. Taking Rest catalog as an example, the resources are:
All are designed to be long-lived. The only "leak" is that these resources aren't explicitly released when the pipeline finishes, but the JVM/container dies shortly after anyway. |
|
@ahmedabu98 @stankiewicz -- reworked the PR based on the discussion so far. Key notes:
|
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request transitions Iceberg Catalog and FileIO management to a shared static cache to allow resources to survive across bundle boundaries. Feedback highlights several critical issues: the static cache lacks an eviction policy, potentially causing memory leaks; shared catalogs might be prematurely closed by one DoFn during teardown while others are still active; and the removal of try-with-resources in AppendFilesToTables has made ManifestWriter closing unsafe in the event of exceptions. Additionally, it is suggested to expand test coverage to verify the full lifecycle including teardown.
|
Update PR description and title @dejii so it's describing current effort. looks good to me! |
ahmedabu98
left a comment
There was a problem hiding this comment.
LGTM! Thanks for sticking this out @dejii!
I like how this turned out, and am happy with the simplicity.
Can you just trigger integration tests to make sure things are running smooth? Just commit a small change for these files:
https://github.com/apache/beam/blob/master/.github/trigger_files/IO_Iceberg_Integration_Tests.json
https://github.com/apache/beam/blob/master/.github/trigger_files/IO_Iceberg_Integration_Tests_Dataflow.json
…erg-fileio-lifecycle
|
There's a known issue for We can merge when the remaining tests pass |
|
Java Precommit is also a known issue: #38094 (comment) Java Examples workflow passed but Test Results failed to publish. We're good to merge! |
Problem
IcebergIO write pipelines fail with "connection pool shut down" errors when writing to dynamic destinations across multiple bundles on the same worker. The root cause:
RecordWriterManager.close()(called per-bundle in@FinishBundle) was closingFileIOinstancesRESTSessionCatalogshare a single FileIO across all tables. Closing it per-bundle permanently kills the catalog's connection pool — all subsequent bundles on that DoFn instance failFix
Remove all premature FileIO close calls —
RecordWriterManager,AppendFilesToTables, andScanTaskReadermust not close a resource they don't own.Use a static shared catalog per VM. This ensures:
IcebergCatalogConfigAutoValueequality)What changed
RecordWriterManager.close(), also in AppendFilesToTables and ScanTaskReaderfollow up on: #37782
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.