Skip to content

Commit 4a4907d

Browse files
authored
[core] Fix partial-update sequence-group delete mismatch under projected reads (#7586)
1 parent 409dd85 commit 4a4907d

2 files changed

Lines changed: 48 additions & 1 deletion

File tree

paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -496,6 +496,7 @@ public MergeFunction<KeyValue> create(@Nullable RowType readType) {
496496
RowType targetType = readType != null ? readType : rowType;
497497
Map<Integer, FieldsComparator> projectedSeqComparators = new HashMap<>();
498498
Map<Integer, FieldAggregator> projectedAggregators = new HashMap<>();
499+
Set<Integer> projectedSequenceGroupPartialDelete = sequenceGroupPartialDelete;
499500

500501
if (readType != null) {
501502
// Build index mapping from table schema to read schema
@@ -547,6 +548,12 @@ public MergeFunction<KeyValue> create(@Nullable RowType readType) {
547548
projectedAggregators.put(newIndex, fieldAggregators.get(oldIndex).get());
548549
}
549550
}
551+
552+
projectedSequenceGroupPartialDelete =
553+
sequenceGroupPartialDelete.stream()
554+
.filter(indexMap::containsKey)
555+
.map(indexMap::get)
556+
.collect(Collectors.toSet());
550557
} else {
551558
// Use original mappings
552559
this.fieldSeqComparators.forEach(
@@ -563,7 +570,7 @@ public MergeFunction<KeyValue> create(@Nullable RowType readType) {
563570
projectedAggregators,
564571
!fieldSeqComparators.isEmpty(),
565572
removeRecordOnDelete,
566-
sequenceGroupPartialDelete,
573+
projectedSequenceGroupPartialDelete,
567574
ArrayUtils.toPrimitiveBoolean(
568575
fieldTypes.stream().map(DataType::isNullable).toArray(Boolean[]::new)));
569576
}

paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,46 @@ public void testSequenceGroupPartialDelete() {
132132
validate(func, null, null, null, null, null, null, null);
133133
}
134134

135+
@Test
136+
public void testSequenceGroupPartialDeleteWithProjection() {
137+
Options options = new Options();
138+
options.set("fields.f3.sequence-group", "f1,f2");
139+
options.set("fields.f6.sequence-group", "f4,f5");
140+
options.set("partial-update.remove-record-on-sequence-group", "f3,f6");
141+
RowType rowType =
142+
RowType.of(
143+
DataTypes.INT(),
144+
DataTypes.INT(),
145+
DataTypes.INT(),
146+
DataTypes.INT(),
147+
DataTypes.INT(),
148+
DataTypes.INT(),
149+
DataTypes.INT());
150+
MergeFunctionFactory<KeyValue> factory =
151+
PartialUpdateMergeFunction.factory(options, rowType, ImmutableList.of("f0"));
152+
153+
// Reordered fields
154+
RowType projectedType =
155+
RowType.of(
156+
new DataType[] {
157+
DataTypes.INT(),
158+
DataTypes.INT(),
159+
DataTypes.INT(),
160+
DataTypes.INT(),
161+
DataTypes.INT(),
162+
DataTypes.INT(),
163+
DataTypes.INT()
164+
},
165+
new String[] {"f3", "f6", "f0", "f1", "f2", "f4", "f5"});
166+
MergeFunction<KeyValue> func = factory.create(projectedType);
167+
168+
func.reset();
169+
add(func, 11, 22, 100, 200, 1, 12, 21);
170+
add(func, RowKind.DELETE, 11, 22, 100, 200, 1, 12, 21);
171+
172+
validate(func, null, null, null, null, null, null, null);
173+
}
174+
135175
@Test
136176
public void testMultiSequenceFields() {
137177
Options options = new Options();

0 commit comments

Comments
 (0)