Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions destination/iceberg/arrow-writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (w *ArrowWriter) getOrCreateWriter(ctx context.Context, pKey string, values
return writer, nil
}

// extract partitions records and tracks deletes for upsert mode.
// extract partitions records and tracks deletes for upsert mode ("d"/"u"/"i" only; "c"/"r" skip dedup).
func (w *ArrowWriter) extract(ctx context.Context, records []types.RawRecord) error {
for _, rec := range records {
pKey, values, err := w.getRecordPartition(rec, rec.OlakeColumns[constants.OlakeTimestamp].(time.Time))
Expand All @@ -168,8 +168,7 @@ func (w *ArrowWriter) extract(ctx context.Context, records []types.RawRecord) er
writer.data = append(writer.data, rec)
recordOpType := rec.OlakeColumns[constants.OpType].(string)
recordOlakeID := rec.OlakeColumns[constants.OlakeID].(string)
// Track deletes for upsert operations (d, u, c all need delete handling)
if w.upsertMode && (recordOpType == "d" || recordOpType == "u" || recordOpType == "c") {
if w.upsertMode && (recordOpType == "d" || recordOpType == "u" || recordOpType == "i") {
filePosition := writer.dataWriter.currentRowCount + int64(len(writer.data)-1)

if _, exists := writer.olakeIDPosition[recordOlakeID]; !exists {
Expand All @@ -193,6 +192,11 @@ func (w *ArrowWriter) extract(ctx context.Context, records []types.RawRecord) er
}
}
}

// Normalise "i" → "c" in the data file so downstream consumers see a consistent op type.
if recordOpType == "i" {
rec.OlakeColumns[constants.OpType] = "c"
}
}

return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ private RecordWrapper convertRecord(Boolean upsert, Map<String, Integer> fieldNa
}
// check if it is append only or upsert
if(!upsert) {
// TODO: need a discussion previously Operation.Insert was being used
return new RecordWrapper(genericRow, Operation.READ);
}
return new RecordWrapper(genericRow, cdcOpValue(data.getRecordType()));
Expand Down Expand Up @@ -138,7 +137,7 @@ public Operation cdcOpValue(String cdcOpField) {
case "u" -> Operation.UPDATE;
case "d" -> Operation.DELETE;
case "r" -> Operation.READ;
case "c" -> Operation.INSERT;
case "c" -> Operation.CREATE;
case "i" -> Operation.INSERT;
default ->
throw new RuntimeException("Unexpected `" + cdcOpField + "` operation value received, expecting one of ['u','d','r','c', 'i']");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,13 @@ public void write(Record row) throws IOException {
if (rowOperation == Operation.DELETE && !keepDeletes) {
// deletes. doing hard delete. when keepDeletes = FALSE we dont keep deleted record
writer.deleteKey(keyProjection.wrap(row));
} else if (rowOperation == Operation.CREATE) {
// Steady-state CDC insert: no prior committed row exists for this key, skip equality delete.
writer.write(row);
} else {
// We are deleting key even for insert operations to avoid duplicate records for handling inserts happening while full-load
// Phantom read possible: equality-delete before write to evict any prior committed version.
// _op_type normalisation ("i" -> "c") is done upstream in IcebergTableOperator
// for all writer types before reaching here.
writer.deleteKey(keyProjection.wrap(row));
writer.write(row);
Comment thread
hash-data marked this conversation as resolved.
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,16 @@ public IcebergTableOperator(boolean upsert_records) {
cdcSourceTsMsField = "_cdc_timestamp";
}

static final ImmutableMap<Operation, Integer> CDC_OPERATION_PRIORITY = ImmutableMap.of(Operation.INSERT, 1,
static final ImmutableMap<Operation, Integer> CDC_OPERATION_PRIORITY = ImmutableMap.of(
Operation.INSERT, 1, Operation.CREATE, 1,
Operation.READ, 2, Operation.UPDATE, 3, Operation.DELETE, 4);
private static final Logger LOGGER = LoggerFactory.getLogger(IcebergTableOperator.class);
private static final ObjectMapper mapper = new ObjectMapper();

private static final String STATE_KEY_2PC = "olake_2pc";
private static final String STATE_FIELD_LATEST_THREAD_ID = "id";
private static final String STATE_FIELD_FULL_REFRESH_COMMITTED_IDS = "full_refresh_committed_ids";
private static final String STATE_FIELD_DEDUP_INSERTS = "dedup_inserts";


@ConfigProperty(name = "debezium.sink.iceberg.upsert-dedup-column", defaultValue = "_cdc_timestamp")
Expand Down Expand Up @@ -270,6 +272,15 @@ public void addToTablePerSchema(String threadID, Table icebergTable, List<Record
try {
for (RecordWrapper record : events) {
try{
// Normalise _op_type "i" → "c" before routing to any writer.
// - Delta writers (upsert=true): op() == INSERT, field == "i" → both would work
// - Append writers (upsert=false, AppendMode/backfill): op() == READ, field == "i"
// → op()-based check misses these entirely
// op() on RecordWrapper is immutable, so delta writers still see Operation.INSERT
// and correctly fire the equality-delete path in BaseDeltaTaskWriter.
if ("i".equals(record.getField("_op_type"))) {
record.setField("_op_type", "c");
}
writer.write(record);
}catch (Exception ex) {
LOGGER.error("Failed to write data: {}, exception: {}", record,ex);
Expand Down Expand Up @@ -457,13 +468,15 @@ private void updateJsonState(Table table, UpdateProperties updateProperties, Str
}
} else {
// No payload => backfill/snapshot style: append threadId to full_refresh_committed_ids
// and mark that the first CDC sync must use equality deletes (overlap window open).
com.fasterxml.jackson.databind.node.ArrayNode committedIds;
if (rootNode.has(STATE_FIELD_FULL_REFRESH_COMMITTED_IDS) && rootNode.get(STATE_FIELD_FULL_REFRESH_COMMITTED_IDS).isArray()) {
committedIds = (com.fasterxml.jackson.databind.node.ArrayNode) rootNode.get(STATE_FIELD_FULL_REFRESH_COMMITTED_IDS);
} else {
committedIds = rootNode.putArray(STATE_FIELD_FULL_REFRESH_COMMITTED_IDS);
}
committedIds.add(threadId);
rootNode.put(STATE_FIELD_DEDUP_INSERTS, true);
}

updateProperties.set(STATE_KEY_2PC, mapper.writeValueAsString(rootNode));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
package io.debezium.server.iceberg.tableoperator;

public enum Operation {
INSERT,
INSERT, // "i" — first-CDC overlap insert; equality-delete before write
CREATE, // "c" — steady-state CDC insert; no prior committed row, skip equality delete
UPDATE,
DELETE,
READ
Expand Down
10 changes: 10 additions & 0 deletions destination/parquet/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,16 @@ func (p *Parquet) Setup(_ context.Context, stream types.StreamInterface, schema
func (p *Parquet) Write(_ context.Context, records []types.RawRecord) error {
// TODO: use batch writing feature of pq writer
for _, record := range records {
// Normalise "i" -? "c": Parquet has no equality-delete concept; downstream
// consumers must see a consistent "c" for all CDC inserts.
// OlakeColumns covers the non-normalized path; Data covers the normalized path
// where FlattenAndCleanData has already merged OlakeColumns into Data.
if opType, ok := record.OlakeColumns[constants.OpType].(string); ok && opType == "i" {
record.OlakeColumns[constants.OpType] = "c"
if _, exists := record.Data[constants.OpType]; exists {
record.Data[constants.OpType] = "c"
}
}
partitionedPath := p.getPartitionedFilePath(record.Data, record.OlakeColumns[constants.OlakeTimestamp].(time.Time))
partitionFiles, exists := p.partitionedFiles[partitionedPath]
if !exists {
Expand Down
11 changes: 7 additions & 4 deletions drivers/abstract/abstract.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func generateThreadID(streamID, hash string) string {
// The writer parameter can be either:
// - *destination.WriterThread for a single writer
// - map[string]*destination.WriterThread for multiple writers keyed by stream ID
func handleWriterCleanup(ctx context.Context, cancel context.CancelFunc, err *error, writer any, threadID string, mtState *any) {
func handleWriterCleanup(ctx context.Context, cancel context.CancelFunc, err *error, writer any, threadID string, mtState *any, dedupInserts *bool) {
if r := recover(); r != nil {
*err = utils.Ternary(*err == nil, fmt.Errorf("panic recovered: %v", r), fmt.Errorf("%s: panic recovered: %v", *err, r)).(error)
}
Expand All @@ -259,10 +259,13 @@ func handleWriterCleanup(ctx context.Context, cancel context.CancelFunc, err *er
var metadataState any
var closeErr error
if mtState != nil {
metadataState, closeErr = types.SetMetadataState(*mtState, threadID)
if closeErr != nil {
closeErr = fmt.Errorf("failed to set metadata state: %s", closeErr)
ms, setErr := types.SetMetadataState(*mtState, threadID)
if setErr != nil {
closeErr = fmt.Errorf("failed to set metadata state: %s", setErr)
Comment thread
hash-data marked this conversation as resolved.
cancel()
}
types.SetDedupInserts(ms, dedupInserts)
metadataState = ms
}

switch w := writer.(type) {
Expand Down
2 changes: 1 addition & 1 deletion drivers/abstract/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (a *AbstractDriver) Backfill(mainCtx context.Context, backfilledStreams cha
logger.Infof("finished chunk min[%v] and max[%v] of stream %s", chunk.Min, chunk.Max, stream.ID())
}(backfillCtx)

defer handleWriterCleanup(backfillCtx, backfillCtxCancel, &err, inserter, threadID, nil)
defer handleWriterCleanup(backfillCtx, backfillCtxCancel, &err, inserter, threadID, nil, nil)

if prevMetadataState != nil {
if slices.Contains(prevMetadataState.FullRefreshCommittedIDs, threadID) {
Expand Down
25 changes: 19 additions & 6 deletions drivers/abstract/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,12 @@ func (a *AbstractDriver) streamChanges(mainCtx context.Context, pool *destinatio
}()

var finalMetadataState any
clearDedup := false
writers := make(map[string]*destination.WriterThread)
metadataStates := make(map[string]any)
// true (default) → overlap window open, inserts emit "i" (equality delete + write).
// false → steady-state, inserts emit "c" (write only).
dedupInserts := make(map[string]bool, len(streams))

for _, stream := range streams {
threadID := generateThreadID(stream.ID(), "")
Expand All @@ -122,19 +126,24 @@ func (a *AbstractDriver) streamChanges(mainCtx context.Context, pool *destinatio
}
writers[stream.ID()] = w
var writerMetaState any

dedupInserts[stream.ID()] = true
if writerMeta != nil {
writerMetaState = writerMeta.State
if writerMeta.DedupInserts != nil {
dedupInserts[stream.ID()] = *writerMeta.DedupInserts
}
}
metadataStates[stream.ID()] = writerMetaState
}

defer handleWriterCleanup(cdcCtx, cdcCtxCancel, &err, writers, "", &finalMetadataState)
defer handleWriterCleanup(cdcCtx, cdcCtxCancel, &err, writers, "", &finalMetadataState, &clearDedup)

finalMetadataState, err = a.driver.StreamChanges(cdcCtx, streamIndex, metadataStates, func(ctx context.Context, change CDCChange) error {
writer := writers[change.Stream.ID()]
olakeColumns := map[string]any{
constants.OlakeID: utils.GetKeysHash(change.Data, change.Stream.GetStream().SourceDefinedPrimaryKey.Array()...),
constants.OpType: mapChangeKindToOperationType(change.Kind),
constants.OpType: mapChangeKindToOperationType(change.Kind, dedupInserts[change.Stream.ID()]),
constants.CdcTimestamp: change.Timestamp,
constants.OlakeTimestamp: time.Now().UTC(),
}
Expand All @@ -149,18 +158,22 @@ func (a *AbstractDriver) streamChanges(mainCtx context.Context, pool *destinatio

return writer.Push(ctx, types.CreateRawRecord(filteredData, olakeColumns))
})

return err
}

// mapChangeKindToOperationType converts CDC change kind to operation type code.
// "delete" -> "d", "update" -> "u", "insert"/"create" -> "c"
func mapChangeKindToOperationType(kind string) string {
// mapInsertOpType returns the _op_type string for a CDC change.
// Inserts emit "i" during the backfill overlap window (dedupInserts=true) and "c" otherwise.
func mapChangeKindToOperationType(kind string, dedupInserts bool) string {
switch kind {
case "delete":
return "d"
case "update":
return "u"
default: // "insert", "create", etc.
default:
if dedupInserts {
return "i"
}
return "c"
}
}
2 changes: 1 addition & 1 deletion drivers/abstract/incremental.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (a *AbstractDriver) Incremental(mainCtx context.Context, pool *destination.
}(incrementalCtx)

var mtState any
defer handleWriterCleanup(incrementalCtx, incrementalCtxCancel, &err, inserter, threadID, &mtState)
defer handleWriterCleanup(incrementalCtx, incrementalCtxCancel, &err, inserter, threadID, &mtState, nil)

defer func() {
mtStateMap := map[string]any{primaryCursor: a.FormatCursorValue(maxPrimaryCursorValue)}
Expand Down
10 changes: 10 additions & 0 deletions types/metadata_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,16 @@ type MetadataState struct {
ID any `json:"id,omitempty"`
State any `json:"state,omitempty"`
FullRefreshCommittedIDs []string `json:"full_refresh_committed_ids,omitempty"`
// nil/true = overlap window open -> inserts emit "i" (equality delete + write), prevents phantom reads
// false = steady state -> inserts emit "c" (write only).
DedupInserts *bool `json:"dedup_inserts,omitempty"`
}

// SetDedupInserts sets DedupInserts on ms only when both ms and dedupInserts are non-nil.
func SetDedupInserts(ms *MetadataState, dedupInserts *bool) {
if ms != nil && dedupInserts != nil {
ms.DedupInserts = dedupInserts
}
}

// SetMetadataState creates a MetadataState with State always stored as a JSON string.
Expand Down
2 changes: 1 addition & 1 deletion utils/testutils/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func (cfg *IntegrationTest) runSyncAndVerify(
schema map[string]interface{},
isCDC bool,
) error {
destDBPrefix := fmt.Sprintf("integration_%s", cfg.TestConfig.Driver)
destDBPrefix := utils.Ternary(cfg.TestConfig.DataFormat != "", fmt.Sprintf("integration_%s_%s", cfg.TestConfig.Driver, cfg.TestConfig.DataFormat), fmt.Sprintf("integration_%s", cfg.TestConfig.Driver)).(string)
cmd := syncCommand(*cfg.TestConfig, useState, destinationType, "--destination-database-prefix", destDBPrefix)

// Execute operation before sync if needed
Expand Down
Loading