Skip to content

Commit 6f8b1e4

Browse files
committed
kafka: emit arrangement key only when a user key encoder exists
The previous refactor conflated two distinct conditions under a single key_is_synthetic flag: whether any key exists (user or relation) for PK violation detection, and whether a user-configured key should be emitted to the Kafka message. The old zip_into_diff_pairs stripped the emitted key whenever user_key_indices.is_none() — covering both the relation-key case and the synthetic-hash case. My refactor only stripped when BOTH were absent, so for sinks with a relation key but no user key, the arrangement key leaked through to encode_collection. The key_encoder is None in that case (no user key_format was configured), and the expect("key present") at line 1537 panicked. Fix: gate the emitted key on key_encoder.is_some() instead of !key_is_synthetic. Matches the old behavior exactly and is a local decision the sink can make from its own config without a trait change.
1 parent a0896b7 commit 6f8b1e4

1 file changed

Lines changed: 4 additions & 1 deletion

File tree

src/storage/src/sink/kafka.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1496,7 +1496,10 @@ fn encode_collection<'scope>(
14961496
if let Some(warner) = pk_warner.as_mut() {
14971497
warner.observe(key, time);
14981498
}
1499-
let key_for_message = if key_is_synthetic { &None } else { key };
1499+
// Only emit the arrangement key when the user configured one; relation-key
1500+
// and synthetic-hash arrangements exist purely for grouping / worker
1501+
// distribution and have no corresponding key encoder.
1502+
let key_for_message = if key_encoder.is_some() { key } else { &None };
15001503

15011504
let mut hash = None;
15021505
let mut headers = vec![];

0 commit comments

Comments
 (0)