diff --git a/docs/content/cdc-ingestion/kafka-cdc.md b/docs/content/cdc-ingestion/kafka-cdc.md index 7ca3b2728cf9..d2d70e3b3776 100644 --- a/docs/content/cdc-ingestion/kafka-cdc.md +++ b/docs/content/cdc-ingestion/kafka-cdc.md @@ -288,5 +288,34 @@ There are some useful options to build Flink Kafka Source, but they are not prov String When configuring "value.format=debezium-avro" which requires using the Confluence schema registry model for Apache Avro serialization, you need to provide the schema registry URL. + + cdc.skip-corrupt-record + false + Boolean + Whether to skip corrupt or unparsable CDC records during parsing. When enabled, records that fail parsing will be skipped instead of causing the job to fail. The skipped records will be logged if cdc.log-corrupt-record is enabled. + + + cdc.log-corrupt-record + true + Boolean + Whether to log full details about corrupt records when they are encountered. This includes the topic, partition, offset, and record payload. When false, only topic-level metadata is logged without record details to prevent PII leakage. Security Warning: When set to true, the full record content will be logged, which may include Personally Identifiable Information (PII) or other sensitive data. Ensure your log storage and access controls comply with your organization's data privacy policies. Set to true only for debugging purposes with appropriate security measures in place. + + +**Example - Skip corrupt records with PII-safe logging:** + +```bash +/bin/flink run \ + /path/to/paimon-flink-action-{{< version >}}.jar \ + kafka_sync_table \ + --warehouse hdfs:///path/to/warehouse \ + --database test_db \ + --table test_table \ + --kafka_conf properties.bootstrap.servers=127.0.0.1:9020 \ + --kafka_conf topic=orders \ + --kafka_conf value.format=debezium-avro \ + --kafka_conf schema.registry.url=http://localhost:8081 \ + --kafka_conf cdc.skip-corrupt-record=true \ + --kafka_conf cdc.log-corrupt-record=false +``` diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcSourceRecord.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcSourceRecord.java index 51a14534c4c9..19d46cba6ff7 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcSourceRecord.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcSourceRecord.java @@ -35,10 +35,25 @@ public class CdcSourceRecord implements Serializable { // TODO Use generics to support more scenarios. private final Object value; - public CdcSourceRecord(@Nullable String topic, @Nullable Object key, Object value) { + @Nullable private final Integer partition; + + @Nullable private final Long offset; + + public CdcSourceRecord( + @Nullable String topic, + @Nullable Object key, + Object value, + @Nullable Integer partition, + @Nullable Long offset) { this.topic = topic; this.key = key; this.value = value; + this.partition = partition; + this.offset = offset; + } + + public CdcSourceRecord(@Nullable String topic, @Nullable Object key, Object value) { + this(topic, key, value, null, null); } public CdcSourceRecord(Object value) { @@ -59,6 +74,16 @@ public Object getValue() { return value; } + @Nullable + public Integer getPartition() { + return partition; + } + + @Nullable + public Long getOffset() { + return offset; + } + @Override public boolean equals(Object o) { if (!(o instanceof CdcSourceRecord)) { @@ -68,16 +93,30 @@ public boolean equals(Object o) { CdcSourceRecord that = (CdcSourceRecord) o; return Objects.equals(topic, that.topic) && Objects.equals(key, that.key) - && Objects.equals(value, that.value); + && Objects.equals(value, that.value) + && Objects.equals(partition, that.partition) + && Objects.equals(offset, that.offset); } @Override public int hashCode() { - return Objects.hash(topic, key, value); + return Objects.hash(topic, key, value, partition, offset); } @Override public String toString() { - return topic + ": " + key + " " + value; + StringBuilder sb = new StringBuilder(); + if (topic != null) { + sb.append("topic=").append(topic); + } + if (partition != null) { + sb.append(", partition=").append(partition); + } + if (offset != null) { + sb.append(", offset=").append(offset); + } + sb.append("\nkey: ").append(key); + sb.append("\nvalue: ").append(value); + return sb.toString(); } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java index 323078aef65c..e83ecdcc7645 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java @@ -48,6 +48,8 @@ import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PULSAR_CONF; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.checkOneRequiredOption; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.checkRequiredOptions; +import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.LOG_CORRUPT_RECORD; +import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.SKIP_CORRUPT_RECORD; import static org.apache.paimon.utils.Preconditions.checkArgument; /** Provide different function according to CDC source type. */ @@ -209,7 +211,13 @@ public FlatMapFunction provideRecordPar case KAFKA: case PULSAR: DataFormat dataFormat = provideDataFormat(); - return dataFormat.createParser(typeMapping, computedColumns); + return dataFormat.createParser( + typeMapping, + computedColumns, + cdcSourceConfig.getBoolean( + SKIP_CORRUPT_RECORD.key(), SKIP_CORRUPT_RECORD.defaultValue()), + cdcSourceConfig.getBoolean( + LOG_CORRUPT_RECORD.key(), LOG_CORRUPT_RECORD.defaultValue())); case MONGODB: return new MongoDBRecordParser(computedColumns, cdcSourceConfig); default: diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java index 85442067b981..3a4db4132fb2 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java @@ -56,6 +56,9 @@ public abstract class AbstractRecordParser protected final TypeMapping typeMapping; protected final List computedColumns; + private boolean skipCorruptRecord = false; + private boolean logCorruptRecord = false; + public AbstractRecordParser(TypeMapping typeMapping, List computedColumns) { this.typeMapping = typeMapping; this.computedColumns = computedColumns; @@ -72,8 +75,22 @@ public Schema buildSchema(CdcSourceRecord record) { Optional recordOpt = extractRecords().stream().findFirst(); return recordOpt.map(RichCdcMultiplexRecord::buildSchema).orElse(null); } catch (Exception e) { - logInvalidSourceRecord(record); - throw e; + if (skipCorruptRecord) { + logCorruptRecordMetadata(record, "schema build"); + if (logCorruptRecord) { + logInvalidSourceRecord(record); + LOG.warn( + "Skipping corrupt or unparsable source record during schema build.", e); + } else { + LOG.warn( + "Skipping corrupt or unparsable source record during schema build (record details not logged due to PII concerns).", + e); + } + return null; + } else { + logInvalidSourceRecord(record); + throw e; + } } } @@ -83,8 +100,20 @@ public void flatMap(CdcSourceRecord value, Collector out setRoot(value); extractRecords().forEach(out::collect); } catch (Exception e) { - logInvalidSourceRecord(value); - throw e; + if (skipCorruptRecord) { + logCorruptRecordMetadata(value, "record processing"); + if (logCorruptRecord) { + logInvalidSourceRecord(value); + LOG.warn("Skipping corrupt or unparsable source record.", e); + } else { + LOG.warn( + "Skipping corrupt or unparsable source record (record details not logged due to PII concerns).", + e); + } + } else { + logInvalidSourceRecord(value); + throw e; + } } } @@ -129,8 +158,49 @@ protected RichCdcMultiplexRecord createRecord( protected abstract String getDatabaseName(); private void logInvalidSourceRecord(CdcSourceRecord record) { - LOG.error("Invalid source record:\n{}", record.toString()); + StringBuilder msg = new StringBuilder("Invalid source record"); + if (record.getTopic() != null) { + msg.append(" from topic: ").append(record.getTopic()); + } + if (record.getPartition() != null) { + msg.append(", partition: ").append(record.getPartition()); + } + if (record.getOffset() != null) { + msg.append(", offset: ").append(record.getOffset()); + } + LOG.error("{}\n{}", msg.toString(), record.toString()); + } + + private void logCorruptRecordMetadata(CdcSourceRecord record, String context) { + String topic = record.getTopic(); + if (topic != null) { + LOG.warn("Corrupt record detected during {} from topic: {}", context, topic); + } else { + LOG.warn("Corrupt record detected during {} (no topic information available)", context); + } } protected abstract String format(); + + /** + * Configure whether to skip corrupt records that fail parsing. + * + * @param skip if true, corrupt records will be skipped instead of causing job failure + * @return this parser instance for chaining + */ + public AbstractRecordParser withSkipCorruptRecord(boolean skip) { + this.skipCorruptRecord = skip; + return this; + } + + /** + * Configure whether to log details about corrupt records. + * + * @param log if true, corrupt records will be logged with details (may contain PII) + * @return this parser instance for chaining + */ + public AbstractRecordParser withLogCorruptRecord(boolean log) { + this.logCorruptRecord = log; + return this; + } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java index 711f596ac545..e9246acabff4 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java @@ -38,12 +38,33 @@ public interface DataFormat { * Creates a new instance of {@link AbstractRecordParser} for this data format with the * specified configurations. * + * @param typeMapping Type mapping options. * @param computedColumns List of computed columns to be considered by the parser. * @return A new instance of {@link AbstractRecordParser}. */ AbstractRecordParser createParser( TypeMapping typeMapping, List computedColumns); + /** + * Creates a new instance of {@link AbstractRecordParser} for this data format with the + * specified configurations and corrupt record handling options. + * + * @param typeMapping Type mapping options. + * @param computedColumns List of computed columns to be considered by the parser. + * @param skipCorruptRecord Whether to skip corrupt records instead of throwing exceptions. + * @param logCorruptRecord Whether to log corrupt record details (may contain PII). + * @return A new instance of {@link AbstractRecordParser}. + */ + default AbstractRecordParser createParser( + TypeMapping typeMapping, + List computedColumns, + boolean skipCorruptRecord, + boolean logCorruptRecord) { + return createParser(typeMapping, computedColumns) + .withSkipCorruptRecord(skipCorruptRecord) + .withLogCorruptRecord(logCorruptRecord); + } + KafkaDeserializationSchema createKafkaDeserializer( Configuration cdcSourceConfig); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java index eea364d460de..d5ed33697752 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java @@ -76,7 +76,7 @@ public CdcSourceRecord deserialize(ConsumerRecord message) throw key = (GenericRecord) keyContainerWithVersion.container(); } GenericRecord value = (GenericRecord) valueContainerWithVersion.container(); - return new CdcSourceRecord(topic, key, value); + return new CdcSourceRecord(topic, key, value, message.partition(), message.offset()); } @Override diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchema.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchema.java index 887af5f6060a..73d3f4d269eb 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchema.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchema.java @@ -76,7 +76,8 @@ public CdcSourceRecord deserialize(ConsumerRecord message) throw } JsonNode valueNode = objectMapper.readValue(message.value(), JsonNode.class); - return new CdcSourceRecord(null, keyNode, valueNode); + return new CdcSourceRecord( + message.topic(), keyNode, valueNode, message.partition(), message.offset()); } catch (Exception e) { LOG.error("Invalid Json:\n{}", new String(message.value())); throw e;