Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions docs/content/cdc-ingestion/kafka-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -288,5 +288,34 @@ There are some useful options to build Flink Kafka Source, but they are not prov
<td>String</td>
<td>When configuring "value.format=debezium-avro" which requires using the Confluence schema registry model for Apache Avro serialization, you need to provide the schema registry URL.</td>
</tr>
<tr>
<td>cdc.skip-corrupt-record</td>
<td>false</td>
<td>Boolean</td>
<td>Whether to skip corrupt or unparsable CDC records during parsing. When enabled, records that fail parsing will be skipped instead of causing the job to fail. The skipped records will be logged if cdc.log-corrupt-record is enabled.</td>
</tr>
<tr>
<td>cdc.log-corrupt-record</td>
<td>true</td>
<td>Boolean</td>
<td>Whether to log full details about corrupt records when they are encountered. This includes the topic, partition, offset, and record payload. When false, only topic-level metadata is logged without record details to prevent PII leakage. <strong>Security Warning:</strong> When set to true, the full record content will be logged, which may include Personally Identifiable Information (PII) or other sensitive data. Ensure your log storage and access controls comply with your organization's data privacy policies. Set to true only for debugging purposes with appropriate security measures in place.</td>
</tr>
</tbody>
</table>

**Example - Skip corrupt records with PII-safe logging:**

```bash
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-{{< version >}}.jar \
kafka_sync_table \
--warehouse hdfs:///path/to/warehouse \
--database test_db \
--table test_table \
--kafka_conf properties.bootstrap.servers=127.0.0.1:9020 \
--kafka_conf topic=orders \
--kafka_conf value.format=debezium-avro \
--kafka_conf schema.registry.url=http://localhost:8081 \
--kafka_conf cdc.skip-corrupt-record=true \
--kafka_conf cdc.log-corrupt-record=false
```
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,25 @@ public class CdcSourceRecord implements Serializable {
// TODO Use generics to support more scenarios.
private final Object value;

public CdcSourceRecord(@Nullable String topic, @Nullable Object key, Object value) {
@Nullable private final Integer partition;

@Nullable private final Long offset;

public CdcSourceRecord(
@Nullable String topic,
@Nullable Object key,
Object value,
@Nullable Integer partition,
@Nullable Long offset) {
this.topic = topic;
this.key = key;
this.value = value;
this.partition = partition;
this.offset = offset;
}

public CdcSourceRecord(@Nullable String topic, @Nullable Object key, Object value) {
this(topic, key, value, null, null);
}

public CdcSourceRecord(Object value) {
Expand All @@ -59,6 +74,16 @@ public Object getValue() {
return value;
}

@Nullable
public Integer getPartition() {
return partition;
}

@Nullable
public Long getOffset() {
return offset;
}

@Override
public boolean equals(Object o) {
if (!(o instanceof CdcSourceRecord)) {
Expand All @@ -68,16 +93,30 @@ public boolean equals(Object o) {
CdcSourceRecord that = (CdcSourceRecord) o;
return Objects.equals(topic, that.topic)
&& Objects.equals(key, that.key)
&& Objects.equals(value, that.value);
&& Objects.equals(value, that.value)
&& Objects.equals(partition, that.partition)
&& Objects.equals(offset, that.offset);
}

@Override
public int hashCode() {
return Objects.hash(topic, key, value);
return Objects.hash(topic, key, value, partition, offset);
}

@Override
public String toString() {
return topic + ": " + key + " " + value;
StringBuilder sb = new StringBuilder();
if (topic != null) {
sb.append("topic=").append(topic);
}
if (partition != null) {
sb.append(", partition=").append(partition);
}
if (offset != null) {
sb.append(", offset=").append(offset);
}
sb.append("\nkey: ").append(key);
sb.append("\nvalue: ").append(value);
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PULSAR_CONF;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.checkOneRequiredOption;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.checkRequiredOptions;
import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.LOG_CORRUPT_RECORD;
import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.SKIP_CORRUPT_RECORD;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Provide different function according to CDC source type. */
Expand Down Expand Up @@ -209,7 +211,13 @@ public FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> provideRecordPar
case KAFKA:
case PULSAR:
DataFormat dataFormat = provideDataFormat();
return dataFormat.createParser(typeMapping, computedColumns);
return dataFormat.createParser(
typeMapping,
computedColumns,
cdcSourceConfig.getBoolean(
SKIP_CORRUPT_RECORD.key(), SKIP_CORRUPT_RECORD.defaultValue()),
cdcSourceConfig.getBoolean(
LOG_CORRUPT_RECORD.key(), LOG_CORRUPT_RECORD.defaultValue()));
case MONGODB:
return new MongoDBRecordParser(computedColumns, cdcSourceConfig);
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ public abstract class AbstractRecordParser
protected final TypeMapping typeMapping;
protected final List<ComputedColumn> computedColumns;

private boolean skipCorruptRecord = false;
private boolean logCorruptRecord = false;

public AbstractRecordParser(TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
this.typeMapping = typeMapping;
this.computedColumns = computedColumns;
Expand All @@ -72,8 +75,22 @@ public Schema buildSchema(CdcSourceRecord record) {
Optional<RichCdcMultiplexRecord> recordOpt = extractRecords().stream().findFirst();
return recordOpt.map(RichCdcMultiplexRecord::buildSchema).orElse(null);
} catch (Exception e) {
logInvalidSourceRecord(record);
throw e;
if (skipCorruptRecord) {
logCorruptRecordMetadata(record, "schema build");
if (logCorruptRecord) {
logInvalidSourceRecord(record);
LOG.warn(
"Skipping corrupt or unparsable source record during schema build.", e);
} else {
LOG.warn(
"Skipping corrupt or unparsable source record during schema build (record details not logged due to PII concerns).",
e);
}
return null;
} else {
logInvalidSourceRecord(record);
throw e;
}
}
}

Expand All @@ -83,8 +100,20 @@ public void flatMap(CdcSourceRecord value, Collector<RichCdcMultiplexRecord> out
setRoot(value);
extractRecords().forEach(out::collect);
} catch (Exception e) {
logInvalidSourceRecord(value);
throw e;
if (skipCorruptRecord) {
logCorruptRecordMetadata(value, "record processing");
if (logCorruptRecord) {
logInvalidSourceRecord(value);
LOG.warn("Skipping corrupt or unparsable source record.", e);
} else {
LOG.warn(
"Skipping corrupt or unparsable source record (record details not logged due to PII concerns).",
e);
}
} else {
logInvalidSourceRecord(value);
throw e;
}
}
}

Expand Down Expand Up @@ -129,8 +158,49 @@ protected RichCdcMultiplexRecord createRecord(
protected abstract String getDatabaseName();

private void logInvalidSourceRecord(CdcSourceRecord record) {
LOG.error("Invalid source record:\n{}", record.toString());
StringBuilder msg = new StringBuilder("Invalid source record");
if (record.getTopic() != null) {
msg.append(" from topic: ").append(record.getTopic());
}
if (record.getPartition() != null) {
msg.append(", partition: ").append(record.getPartition());
}
if (record.getOffset() != null) {
msg.append(", offset: ").append(record.getOffset());
}
LOG.error("{}\n{}", msg.toString(), record.toString());
}

private void logCorruptRecordMetadata(CdcSourceRecord record, String context) {
String topic = record.getTopic();
if (topic != null) {
LOG.warn("Corrupt record detected during {} from topic: {}", context, topic);
} else {
LOG.warn("Corrupt record detected during {} (no topic information available)", context);
}
}

protected abstract String format();

/**
* Configure whether to skip corrupt records that fail parsing.
*
* @param skip if true, corrupt records will be skipped instead of causing job failure
* @return this parser instance for chaining
*/
public AbstractRecordParser withSkipCorruptRecord(boolean skip) {
this.skipCorruptRecord = skip;
return this;
}

/**
* Configure whether to log details about corrupt records.
*
* @param log if true, corrupt records will be logged with details (may contain PII)
* @return this parser instance for chaining
*/
public AbstractRecordParser withLogCorruptRecord(boolean log) {
this.logCorruptRecord = log;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,33 @@ public interface DataFormat {
* Creates a new instance of {@link AbstractRecordParser} for this data format with the
* specified configurations.
*
* @param typeMapping Type mapping options.
* @param computedColumns List of computed columns to be considered by the parser.
* @return A new instance of {@link AbstractRecordParser}.
*/
AbstractRecordParser createParser(
TypeMapping typeMapping, List<ComputedColumn> computedColumns);

/**
* Creates a new instance of {@link AbstractRecordParser} for this data format with the
* specified configurations and corrupt record handling options.
*
* @param typeMapping Type mapping options.
* @param computedColumns List of computed columns to be considered by the parser.
* @param skipCorruptRecord Whether to skip corrupt records instead of throwing exceptions.
* @param logCorruptRecord Whether to log corrupt record details (may contain PII).
* @return A new instance of {@link AbstractRecordParser}.
*/
default AbstractRecordParser createParser(
TypeMapping typeMapping,
List<ComputedColumn> computedColumns,
boolean skipCorruptRecord,
boolean logCorruptRecord) {
return createParser(typeMapping, computedColumns)
.withSkipCorruptRecord(skipCorruptRecord)
.withLogCorruptRecord(logCorruptRecord);
}

KafkaDeserializationSchema<CdcSourceRecord> createKafkaDeserializer(
Configuration cdcSourceConfig);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public CdcSourceRecord deserialize(ConsumerRecord<byte[], byte[]> message) throw
key = (GenericRecord) keyContainerWithVersion.container();
}
GenericRecord value = (GenericRecord) valueContainerWithVersion.container();
return new CdcSourceRecord(topic, key, value);
return new CdcSourceRecord(topic, key, value, message.partition(), message.offset());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public CdcSourceRecord deserialize(ConsumerRecord<byte[], byte[]> message) throw
}

JsonNode valueNode = objectMapper.readValue(message.value(), JsonNode.class);
return new CdcSourceRecord(null, keyNode, valueNode);
return new CdcSourceRecord(
message.topic(), keyNode, valueNode, message.partition(), message.offset());
} catch (Exception e) {
LOG.error("Invalid Json:\n{}", new String(message.value()));
throw e;
Expand Down
Loading