fix(connectors): Add unwrap_envelope transform and envelope detection to fix source-sink format mismatch#3197
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #3197 +/- ##
=============================================
- Coverage 74.17% 55.39% -18.79%
Complexity 943 943
=============================================
Files 1237 1238 +1
Lines 112641 99864 -12777
Branches 89200 76424 -12776
=============================================
- Hits 83549 55317 -28232
- Misses 26299 41956 +15657
+ Partials 2793 2591 -202
🚀 New features to boost your workflow:
|
|
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 7 days if no further activity occurs. If you need a review, please ensure CI is green and the PR is rebased on the latest master. Don't hesitate to ping the maintainers - either @core on Discord or by mentioning them directly here on the PR. Thank you for your contribution! |
|
@atharvalade please rebase this PR /author |
8defa26 to
7c92361
Compare
done |
|
/ready |
hubcio
left a comment
There was a problem hiding this comment.
review notes (not posted as line comments because targets are out-of-diff or already tracked):
-
the FFI
(consume)return code is still discarded atcore/connectors/runtime/src/sink.rs:642-650(ConsumeCallbackreturnsi32,sdk/src/sink.rs:28-36) andAutoCommit::When(AutoCommitWhen::PollingMessages)atcore/connectors/runtime/src/sink.rs:467commits offsets before the sink runs. this PR widens the blast radius: any envelope-detected batch nowreturn Err(Error::InvalidRecord)from the sink, gets logged, and the offsets have already advanced. tracked in #2927 and #2928 but this PR adds a brand new fail path triggerable purely by source/sink misconfiguration, so they should land together. -
core/connectors/sinks/delta_sink/src/sink.rs:106has the same envelope-vs-flat-JSON mismatch and no detection or unwrap guidance. fixing one sink and leaving the other broken means #3174 is still reachable through the postgres → delta path. either replicate the detection (with the tighter shape suggested below) or, better, documentunwrap_envelopeas the single fix and drop sink-side sniffing entirely. -
neither
core/connectors/sdk/README.mdnorcore/connectors/sinks/iceberg_sink/README.mdmention the newunwrap_envelopetransform or the source-compatibility requirement forpostgres_source→ iceberg. discoverability is the whole point of this fix; please add at least a short entry to both. -
architecturally, sink-side envelope sniffing is the wrong layer. transform discipline + docs is the real fix. if sink-side detection stays, it should be opt-in via config, not a hardcoded postgres-shaped heuristic.
overall, i'm not fan of this PR.
|
/ready |
9114935 to
b6d6b47
Compare
|
/ready |
Which issue does this PR close?
Closes #3174
Rationale
Sources (e.g. Postgres) wrap row data in a
DatabaseRecordenvelope while sinks (e.g. Iceberg) expect flat JSON matching the target table schema — no shared contract exists, producing silent null failures.What changed?
The Postgres source emits
{table_name, operation_type, timestamp, data: {...}, old_data}envelopes, but the Iceberg sink's Arrow JSON reader maps these nested structures to top-level fields as null, silently violating non-nullable constraints.This adds a reusable
unwrap_envelopetransform to the connector SDK that extracts a nested field (e.g.data) and promotes it as the top-level payload, plus explicit envelope detection in the Iceberg sink that errors with an actionable message instead of failing silently.Local Execution
AI Usage