Skip to content

Commit 259a143

Browse files
nickdelnanogmdfalkclaude
authored
[cdc] Add metadata column support for Kafka CDC connector (#7315)
This PR supercedes #6353 - I took over this work from my colleague @gmdfalk ## Description Add --metadata_column support to Paimon Kafka CDC connector, similar to the already existing options added for MySQL and Postgres: #2077 Also add optional --metadata_column_prefix to avoid conflicts with existing Paimon fields like topic, timestamp etc. Supported metadata columns are those on org.apache.kafka.clients.consumer.ConsumerRecord i.e.: - topic - partition - offset - timestamp - timestampType: This is the name of the enum i.e. NoTimestampType, CreateTime or LogAppendTime The feature is backwards compatible. It's only active when --metadata_column is supplied resp. SynchronizationActionBase.withMetadataColumns is used. ## Motivation This is a requested feature: #3210 We primarly use this feature for two purposes: 1. Troubleshooting and data lineage (e.g. where in our Kafka infrastructure does this Paimon row come from?) 2. Mapping large Kafka topic partitions 1:1 to Paimon buckets to avoid reshuffling (see this issue it would solve: #3249) ## Tests Unit and Integration Tests ## API and Format No changes to public apis or storage format. The changes here are contained to the flink cdc package but I did have to update CdcSourceRecord since it previously didn't provide a way to surface arbitrary metadata for a record. The metadata attribute on CdcSourceRecord is intentionally a generic Map so that it can potentially be used to add metadata support for other connectors like Pulsar or Mongo that are not yet implemented. ## Documentation Added the new --metadata_column and --metadata_column_prefix parameter to Kafka CDC docs. ## Dev notes For running integration tests on MacOS with Rancher Desktop, i had to properly expose the docker socket to testcontainers e.g. system wide via sudo ln -sf "$HOME/.rd/docker.sock" /var/run/docker.sock. --------- Signed-off-by: Max Falk <gfalk@yelp.com> Co-authored-by: Max Falk <gfalk@yelp.com> Co-authored-by: Max Falk <279131+gmdfalk@users.noreply.github.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent cac3980 commit 259a143

39 files changed

Lines changed: 1131 additions & 15 deletions

docs/content/cdc-ingestion/kafka-cdc.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,8 @@ To use this feature through `flink run`, run the following shell command.
103103
[--primary_keys <primary-keys>] \
104104
[--type_mapping to-string] \
105105
[--computed_column <'column-name=expr-name(args[, ...])'> [--computed_column ...]] \
106+
[--metadata_column <metadata-column> [--metadata_column ...]] \
107+
[--metadata_column_prefix <metadata-column-prefix>] \
106108
[--kafka_conf <kafka-source-conf> [--kafka_conf <kafka-source-conf> ...]] \
107109
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] \
108110
[--table_conf <paimon-table-sink-conf> [--table_conf <paimon-table-sink-conf> ...]]
@@ -215,6 +217,8 @@ To use this feature through `flink run`, run the following shell command.
215217
[--partition_keys <partition_keys>] \
216218
[--primary_keys <primary-keys>] \
217219
[--computed_column <'column-name=expr-name(args[, ...])'> [--computed_column ...]] \
220+
[--metadata_column <metadata-column> [--metadata_column ...]] \
221+
[--metadata_column_prefix <metadata-column-prefix>] \
218222
[--kafka_conf <kafka-source-conf> [--kafka_conf <kafka-source-conf> ...]] \
219223
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] \
220224
[--table_conf <paimon-table-sink-conf> [--table_conf <paimon-table-sink-conf> ...]]

docs/layouts/shortcodes/generated/kafka_sync_database.html

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,14 @@
9494
<td><h5>--computed_column</h5></td>
9595
<td>The definitions of computed columns. The argument field is from Kafka topic's table field name. See <a href="../overview/#computed-functions">here</a> for a complete list of configurations. NOTICE: It returns null if the referenced column does not exist in the source table.</td>
9696
</tr>
97+
<tr>
98+
<td><h5>--metadata_column</h5></td>
99+
<td>--metadata_column is used to specify which metadata columns to include in the output schema of the connector. Metadata columns provide additional information related to the source data. Available values are topic, partition, offset, timestamp and timestamp_type (one of 'NoTimestampType', 'CreateTime' or 'LogAppendTime').</td>
100+
</tr>
101+
<tr>
102+
<td><h5>--metadata_column_prefix</h5></td>
103+
<td>--metadata_column_prefix is optionally used to set a prefix for metadata columns in the Paimon table to avoid conflicts with existing attributes. For example, with prefix "__kafka_", the metadata column "topic" will be stored as "__kafka_topic" field.</td>
104+
</tr>
97105
<tr>
98106
<td><h5>--eager_init</h5></td>
99107
<td>It is default false. If true, all relevant tables commiter will be initialized eagerly, which means those tables could be forced to create snapshot.</td>

docs/layouts/shortcodes/generated/kafka_sync_table.html

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,14 @@
7070
<td><h5>--computed_column</h5></td>
7171
<td>The definitions of computed columns. The argument field is from Kafka topic's table field name. See <a href="../overview/#computed-functions">here</a> for a complete list of configurations. </td>
7272
</tr>
73+
<tr>
74+
<td><h5>--metadata_column</h5></td>
75+
<td>--metadata_column is used to specify which metadata columns to include in the output schema of the connector. Metadata columns provide additional information related to the source data. Available values are topic, partition, offset, timestamp and timestamp_type (one of 'NoTimestampType', 'CreateTime' or 'LogAppendTime').</td>
76+
</tr>
77+
<tr>
78+
<td><h5>--metadata_column_prefix</h5></td>
79+
<td>--metadata_column_prefix is optionally used to set a prefix for metadata columns in the Paimon table to avoid conflicts with existing attributes. For example, with prefix "__kafka_", the metadata column "topic" will be stored as "__kafka_topic" field.</td>
80+
</tr>
7381
<tr>
7482
<td><h5>--kafka_conf</h5></td>
7583
<td>The configuration for Flink Kafka sources. Each configuration should be specified in the format `key=value`. `properties.bootstrap.servers`, `topic/topic-pattern`, `properties.group.id`, and `value.format` are required configurations, others are optional.See its <a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/kafka/#connector-options">document</a> for a complete list of configurations.</td>
@@ -83,4 +91,4 @@
8391
<td>The configuration for Paimon table sink. Each configuration should be specified in the format "key=value". See <a href="{{ $ref }}">here</a> for a complete list of table configurations.</td>
8492
</tr>
8593
</tbody>
86-
</table>
94+
</table>

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ public class CdcActionCommonUtils {
6868
public static final String PRIMARY_KEYS = "primary_keys";
6969
public static final String COMPUTED_COLUMN = "computed_column";
7070
public static final String METADATA_COLUMN = "metadata_column";
71+
public static final String METADATA_COLUMN_PREFIX = "metadata_column_prefix";
7172
public static final String MULTIPLE_TABLE_PARTITION_KEYS = "multiple_table_partition_keys";
7273
public static final String EAGER_INIT = "eager_init";
7374
public static final String SYNC_PKEYS_FROM_SOURCE_SCHEMA =

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcMetadataConverter.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,25 @@
3434
* A functional interface for converting CDC metadata.
3535
*
3636
* <p>This interface provides a mechanism to convert Change Data Capture (CDC) metadata from a given
37-
* {@link JsonNode} source. Implementations of this interface can be used to process and transform
38-
* metadata entries from CDC sources.
37+
* {@link JsonNode} source or {@link CdcSourceRecord}. Implementations of this interface can be used
38+
* to process and transform metadata entries from CDC sources.
3939
*/
4040
public interface CdcMetadataConverter extends Serializable {
4141

4242
String read(JsonNode payload);
4343

44+
/**
45+
* Read metadata from a CDC source record. Default implementation throws
46+
* UnsupportedOperationException to maintain backward compatibility.
47+
*
48+
* @param record the CDC source record
49+
* @return the metadata value as a string
50+
*/
51+
default String read(CdcSourceRecord record) {
52+
throw new UnsupportedOperationException(
53+
"This metadata converter does not support reading from CdcSourceRecord");
54+
}
55+
4456
DataType dataType();
4557

4658
String columnName();

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcMetadataProcessor.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package org.apache.paimon.flink.action.cdc;
2020

21+
import org.apache.paimon.flink.action.cdc.kafka.KafkaMetadataConverter;
22+
2123
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
2224

2325
import java.util.Arrays;
@@ -49,7 +51,14 @@ public enum CdcMetadataProcessor {
4951
new CdcMetadataConverter.DatabaseNameConverter(),
5052
new CdcMetadataConverter.TableNameConverter(),
5153
new CdcMetadataConverter.SchemaNameConverter(),
52-
new CdcMetadataConverter.OpTsConverter());
54+
new CdcMetadataConverter.OpTsConverter()),
55+
KAFKA_METADATA_PROCESSOR(
56+
SyncJobHandler.SourceType.KAFKA,
57+
new KafkaMetadataConverter.TopicConverter(),
58+
new KafkaMetadataConverter.PartitionConverter(),
59+
new KafkaMetadataConverter.OffsetConverter(),
60+
new KafkaMetadataConverter.TimestampConverter(),
61+
new KafkaMetadataConverter.TimestampTypeConverter());
5362

5463
private final SyncJobHandler.SourceType sourceType;
5564

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcSourceRecord.java

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121
import javax.annotation.Nullable;
2222

2323
import java.io.Serializable;
24+
import java.util.Collections;
25+
import java.util.HashMap;
26+
import java.util.Map;
2427
import java.util.Objects;
2528

2629
/** A data change record from the CDC source. */
@@ -35,14 +38,29 @@ public class CdcSourceRecord implements Serializable {
3538
// TODO Use generics to support more scenarios.
3639
private final Object value;
3740

41+
// Generic metadata map - any source can add metadata
42+
private final Map<String, Object> metadata;
43+
3844
public CdcSourceRecord(@Nullable String topic, @Nullable Object key, Object value) {
39-
this.topic = topic;
40-
this.key = key;
41-
this.value = value;
45+
this(topic, key, value, null);
4246
}
4347

4448
public CdcSourceRecord(Object value) {
45-
this(null, null, value);
49+
this(null, null, value, null);
50+
}
51+
52+
public CdcSourceRecord(
53+
@Nullable String topic,
54+
@Nullable Object key,
55+
Object value,
56+
@Nullable Map<String, Object> metadata) {
57+
this.topic = topic;
58+
this.key = key;
59+
this.value = value;
60+
this.metadata =
61+
metadata != null
62+
? Collections.unmodifiableMap(new HashMap<>(metadata))
63+
: Collections.emptyMap();
4664
}
4765

4866
@Nullable
@@ -59,6 +77,15 @@ public Object getValue() {
5977
return value;
6078
}
6179

80+
public Map<String, Object> getMetadata() {
81+
return metadata;
82+
}
83+
84+
@Nullable
85+
public Object getMetadata(String key) {
86+
return metadata.get(key);
87+
}
88+
6289
@Override
6390
public boolean equals(Object o) {
6491
if (!(o instanceof CdcSourceRecord)) {
@@ -68,12 +95,13 @@ public boolean equals(Object o) {
6895
CdcSourceRecord that = (CdcSourceRecord) o;
6996
return Objects.equals(topic, that.topic)
7097
&& Objects.equals(key, that.key)
71-
&& Objects.equals(value, that.value);
98+
&& Objects.equals(value, that.value)
99+
&& Objects.equals(metadata, that.metadata);
72100
}
73101

74102
@Override
75103
public int hashCode() {
76-
return Objects.hash(topic, key, value);
104+
return Objects.hash(topic, key, value, metadata);
77105
}
78106

79107
@Override
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.flink.action.cdc;
20+
21+
import org.apache.paimon.types.DataType;
22+
23+
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
24+
25+
/**
26+
* Wraps a {@link CdcMetadataConverter} to add a prefix to its column name.
27+
*
28+
* <p>This decorator allows adding prefixes like "__kafka_" to metadata column names to avoid
29+
* collisions with source data columns, while keeping the underlying converter logic unchanged.
30+
*/
31+
public class PrefixedMetadataConverter implements CdcMetadataConverter {
32+
33+
private static final long serialVersionUID = 1L;
34+
35+
private final CdcMetadataConverter delegate;
36+
private final String prefix;
37+
38+
public PrefixedMetadataConverter(CdcMetadataConverter delegate, String prefix) {
39+
this.delegate = delegate;
40+
this.prefix = prefix != null ? prefix : "";
41+
}
42+
43+
@Override
44+
public String columnName() {
45+
return prefix + delegate.columnName();
46+
}
47+
48+
@Override
49+
public String read(JsonNode payload) {
50+
return delegate.read(payload);
51+
}
52+
53+
@Override
54+
public String read(CdcSourceRecord record) {
55+
return delegate.read(record);
56+
}
57+
58+
@Override
59+
public DataType dataType() {
60+
return delegate.dataType();
61+
}
62+
}

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,9 +207,10 @@ public FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> provideRecordPar
207207
return new PostgresRecordParser(
208208
cdcSourceConfig, computedColumns, typeMapping, metadataConverters);
209209
case KAFKA:
210+
return provideDataFormat()
211+
.createParser(typeMapping, computedColumns, metadataConverters);
210212
case PULSAR:
211-
DataFormat dataFormat = provideDataFormat();
212-
return dataFormat.createParser(typeMapping, computedColumns);
213+
return provideDataFormat().createParser(typeMapping, computedColumns);
213214
case MONGODB:
214215
return new MongoDBRecordParser(computedColumns, cdcSourceConfig);
215216
default:

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionFactoryBase.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.COMPUTED_COLUMN;
3131
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.METADATA_COLUMN;
32+
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.METADATA_COLUMN_PREFIX;
3233
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PARTITION_KEYS;
3334
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PRIMARY_KEYS;
3435
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.SYNC_PKEYS_FROM_SOURCE_SCHEMA;
@@ -64,6 +65,11 @@ protected void withParams(MultipleParameterToolAdapter params, SyncTableActionBa
6465
}
6566

6667
if (params.has(METADATA_COLUMN)) {
68+
// Parse optional prefix first
69+
if (params.has(METADATA_COLUMN_PREFIX)) {
70+
action.withMetadataColumnPrefix(params.get(METADATA_COLUMN_PREFIX));
71+
}
72+
6773
List<String> metadataColumns =
6874
new ArrayList<>(params.getMultiParameter(METADATA_COLUMN));
6975
if (metadataColumns.size() == 1) {

0 commit comments

Comments
 (0)