Skip to content

Commit fe98a6c

Browse files
authored
[Improvement] Hoist StructProjection.create out of per-record loop in CombinedDeleteFilter (#4145)
* Hoist StructProjection.create out of per-record loop in initializeBloomFilter StructProjection is designed to be reused via .wrap(). Previously, StructProjection.create(requiredSchema, deleteSchema) was called inside the per-record forEach, creating N*M objects (records x schemas). Pre-build a Map<Set<Integer>, StructProjection> once before iterating records, then call .wrap(record) on the pre-built projection inside the loop.
1 parent 6c03791 commit fe98a6c

2 files changed

Lines changed: 358 additions & 6 deletions

File tree

amoro-format-iceberg/src/main/java/org/apache/amoro/io/reader/CombinedDeleteFilter.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -288,22 +288,20 @@ private BloomFilter<StructLike> initializeBloomFilter() {
288288
BloomFilter.create(StructLikeFunnel.INSTANCE, dataRecordCnt, 0.001);
289289

290290
Map<Set<Integer>, InternalRecordWrapper> recordWrappers = Maps.newHashMap();
291+
Map<Set<Integer>, StructProjection> structProjections = Maps.newHashMap();
291292
for (Map.Entry<Set<Integer>, Schema> deleteSchemaEntry : deleteSchemaByDeleteIds.entrySet()) {
292293
Set<Integer> ids = deleteSchemaEntry.getKey();
293294
Schema deleteSchema = deleteSchemaEntry.getValue();
294295

295-
InternalRecordWrapper internalRecordWrapper =
296-
new InternalRecordWrapper(deleteSchema.asStruct());
297-
recordWrappers.put(ids, internalRecordWrapper);
296+
recordWrappers.put(ids, new InternalRecordWrapper(deleteSchema.asStruct()));
297+
structProjections.put(ids, StructProjection.create(requiredSchema, deleteSchema));
298298
}
299299

300300
try (CloseableIterable<Record> deletes = readRecords()) {
301301
for (Record record : deletes) {
302302
recordWrappers.forEach(
303303
(ids, internalRecordWrapper) -> {
304-
Schema deleteSchema = deleteSchemaByDeleteIds.get(ids);
305-
StructProjection projection =
306-
StructProjection.create(requiredSchema, deleteSchema).wrap(record);
304+
StructProjection projection = structProjections.get(ids).wrap(record);
307305
StructLike deletePK = internalRecordWrapper.copyFor(projection);
308306
bloomFilter.put(deletePK);
309307
});
Lines changed: 354 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,354 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.amoro.io;
20+
21+
import org.apache.amoro.BasicTableTestHelper;
22+
import org.apache.amoro.TableFormat;
23+
import org.apache.amoro.catalog.BasicCatalogTestHelper;
24+
import org.apache.amoro.catalog.TableTestBase;
25+
import org.apache.amoro.io.reader.CombinedDeleteFilter;
26+
import org.apache.amoro.io.reader.DeleteCache;
27+
import org.apache.amoro.io.reader.GenericCombinedIcebergDataReader;
28+
import org.apache.amoro.optimizing.RewriteFilesInput;
29+
import org.apache.amoro.shade.guava32.com.google.common.collect.Iterables;
30+
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
31+
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
32+
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
33+
import org.apache.iceberg.DataFile;
34+
import org.apache.iceberg.DeleteFile;
35+
import org.apache.iceberg.FileFormat;
36+
import org.apache.iceberg.Schema;
37+
import org.apache.iceberg.TableProperties;
38+
import org.apache.iceberg.TestHelpers;
39+
import org.apache.iceberg.data.FileHelpers;
40+
import org.apache.iceberg.data.GenericRecord;
41+
import org.apache.iceberg.data.IdentityPartitionConverters;
42+
import org.apache.iceberg.data.Record;
43+
import org.apache.iceberg.io.CloseableIterable;
44+
import org.apache.iceberg.io.OutputFileFactory;
45+
import org.apache.iceberg.types.TypeUtil;
46+
import org.junit.Assert;
47+
import org.junit.Before;
48+
import org.junit.Test;
49+
import org.junit.runner.RunWith;
50+
import org.junit.runners.Parameterized;
51+
52+
import java.io.IOException;
53+
import java.util.ArrayList;
54+
import java.util.Arrays;
55+
import java.util.Collections;
56+
import java.util.List;
57+
import java.util.Map;
58+
import java.util.stream.IntStream;
59+
60+
/**
61+
* Tests for StructProjection hoisting in CombinedDeleteFilter.initializeBloomFilter().
62+
*
63+
* <p>Verifies that hoisting StructProjection.create() outside the per-record loop (reusing the
64+
* projection object via .wrap()) produces correct bloom filter initialization and equality-delete
65+
* filtering results, including the multiple-delete-schema scenario.
66+
*/
67+
@RunWith(Parameterized.class)
68+
public class TestCombinedDeleteFilterStructProjection extends TableTestBase {
69+
70+
private final FileFormat fileFormat;
71+
72+
@Parameterized.Parameters(name = "fileFormat = {0}")
73+
public static Object[][] parameters() {
74+
return new Object[][] {{FileFormat.PARQUET}, {FileFormat.AVRO}, {FileFormat.ORC}};
75+
}
76+
77+
public TestCombinedDeleteFilterStructProjection(FileFormat fileFormat) {
78+
super(
79+
new BasicCatalogTestHelper(TableFormat.ICEBERG),
80+
new BasicTableTestHelper(false, false, buildTableProperties(fileFormat)));
81+
this.fileFormat = fileFormat;
82+
System.setProperty(DeleteCache.DELETE_CACHE_ENABLED, "false");
83+
}
84+
85+
private static Map<String, String> buildTableProperties(FileFormat fileFormat) {
86+
Map<String, String> props = Maps.newHashMapWithExpectedSize(3);
87+
props.put(TableProperties.FORMAT_VERSION, "2");
88+
props.put(TableProperties.DEFAULT_FILE_FORMAT, fileFormat.name());
89+
props.put(TableProperties.DELETE_DEFAULT_FILE_FORMAT, fileFormat.name());
90+
return props;
91+
}
92+
93+
/**
94+
* Lower the bloom-filter threshold so the filter is activated with a small dataset (< 3 data
95+
* records) while still having > threshold eq-delete records.
96+
*/
97+
private static final long BLOOM_TRIGGER = 2L;
98+
99+
@Before
100+
public void resetBloomFilterThreshold() {
101+
CombinedDeleteFilter.FILTER_EQ_DELETE_TRIGGER_RECORD_COUNT = BLOOM_TRIGGER;
102+
}
103+
104+
// ---------------------------------------------------------------------------
105+
// helpers
106+
// ---------------------------------------------------------------------------
107+
108+
private OutputFileFactory outputFileFactory() {
109+
return OutputFileFactory.builderFor(getMixedTable().asUnkeyedTable(), 0, 1)
110+
.format(fileFormat)
111+
.build();
112+
}
113+
114+
private DataFile writeDataFile(List<Record> records) throws IOException {
115+
return FileHelpers.writeDataFile(
116+
getMixedTable().asUnkeyedTable(),
117+
outputFileFactory().newOutputFile(TestHelpers.Row.of()).encryptingOutputFile(),
118+
TestHelpers.Row.of(),
119+
records);
120+
}
121+
122+
private DeleteFile writeEqDeleteFile(List<Record> records, Schema deleteSchema)
123+
throws IOException {
124+
return FileHelpers.writeDeleteFile(
125+
getMixedTable().asUnkeyedTable(),
126+
outputFileFactory().newOutputFile(TestHelpers.Row.of()).encryptingOutputFile(),
127+
TestHelpers.Row.of(),
128+
records,
129+
deleteSchema);
130+
}
131+
132+
private GenericCombinedIcebergDataReader buildReader(RewriteFilesInput input) {
133+
return new GenericCombinedIcebergDataReader(
134+
getMixedTable().io(),
135+
getMixedTable().schema(),
136+
getMixedTable().spec(),
137+
getMixedTable().asUnkeyedTable().encryption(),
138+
null,
139+
false,
140+
IdentityPartitionConverters::convertConstant,
141+
false,
142+
null,
143+
input,
144+
"");
145+
}
146+
147+
// ---------------------------------------------------------------------------
148+
// Test 1: bloom filter is active and correctly identifies equality-deleted rows
149+
// with a single delete schema (hoisted StructProjection reuse path)
150+
// ---------------------------------------------------------------------------
151+
152+
/**
153+
* Verifies that with the bloom-filter path active, records matched by equality-delete files are
154+
* filtered out and the remainder survives — exercising the hoisted StructProjection code.
155+
*/
156+
@Test
157+
public void testBloomFilterWithHoistedProjection_singleDeleteSchema() throws IOException {
158+
// 3 data rows: id=1,2,3
159+
DataFile dataFile =
160+
writeDataFile(
161+
Arrays.asList(
162+
MixedDataTestHelpers.createRecord(1, "alice", 10L, "1970-01-01T08:00:00"),
163+
MixedDataTestHelpers.createRecord(2, "bob", 20L, "1970-01-01T08:00:00"),
164+
MixedDataTestHelpers.createRecord(3, "carol", 30L, "1970-01-01T08:00:00")));
165+
166+
// eq-delete on id: delete id=1 and id=2 (> BLOOM_TRIGGER records so filter activates)
167+
Schema idSchema = TypeUtil.select(BasicTableTestHelper.TABLE_SCHEMA, Sets.newHashSet(1));
168+
GenericRecord idRec = GenericRecord.create(idSchema);
169+
List<Record> deleteRecords = new ArrayList<>();
170+
deleteRecords.add(idRec.copy("id", 1));
171+
deleteRecords.add(idRec.copy("id", 2));
172+
deleteRecords.add(idRec.copy("id", 99)); // extra to push count above threshold
173+
DeleteFile eqDeleteFile = writeEqDeleteFile(deleteRecords, idSchema);
174+
175+
RewriteFilesInput input =
176+
new RewriteFilesInput(
177+
new DataFile[] {MixedDataTestHelpers.wrapIcebergDataFile(dataFile, 1L)},
178+
new DataFile[] {MixedDataTestHelpers.wrapIcebergDataFile(dataFile, 1L)},
179+
new DeleteFile[] {},
180+
new DeleteFile[] {MixedDataTestHelpers.wrapIcebergDeleteFile(eqDeleteFile, 2L)},
181+
getMixedTable());
182+
183+
GenericCombinedIcebergDataReader reader = buildReader(input);
184+
Assert.assertTrue("Bloom filter should be active", reader.getDeleteFilter().isFilterEqDelete());
185+
186+
try (CloseableIterable<Record> surviving = reader.readData()) {
187+
List<Record> result = Lists.newArrayList(surviving);
188+
Assert.assertEquals("Only id=3 should survive", 1, result.size());
189+
Assert.assertEquals(3, result.get(0).get(0));
190+
}
191+
192+
try (CloseableIterable<Record> deleted = reader.readDeletedData()) {
193+
Assert.assertEquals(
194+
"id=1 and id=2 should be reported as deleted", 2, Iterables.size(deleted));
195+
}
196+
197+
reader.close();
198+
}
199+
200+
// ---------------------------------------------------------------------------
201+
// Test 2: multiple delete schemas — the per-schema StructProjection loop in
202+
// initializeBloomFilter must wrap() each record against each schema
203+
// ---------------------------------------------------------------------------
204+
205+
/**
206+
* Uses two equality-delete files whose schemas differ (id-only vs id+name). Both must be put into
207+
* the bloom filter correctly so that applyEqDeletesForSchema can later verify membership.
208+
*/
209+
@Test
210+
public void testBloomFilterWithHoistedProjection_multipleDeleteSchemas() throws IOException {
211+
DataFile dataFile =
212+
writeDataFile(
213+
Arrays.asList(
214+
MixedDataTestHelpers.createRecord(1, "alice", 10L, "1970-01-01T08:00:00"),
215+
MixedDataTestHelpers.createRecord(2, "bob", 20L, "1970-01-01T08:00:00"),
216+
MixedDataTestHelpers.createRecord(3, "carol", 30L, "1970-01-01T08:00:00")));
217+
218+
// Schema A: delete by id only
219+
Schema idSchema = TypeUtil.select(BasicTableTestHelper.TABLE_SCHEMA, Sets.newHashSet(1));
220+
GenericRecord idRec = GenericRecord.create(idSchema);
221+
List<Record> deleteByIdRecords = new ArrayList<>();
222+
IntStream.rangeClosed(1, 3).forEach(id -> deleteByIdRecords.add(idRec.copy("id", id)));
223+
DeleteFile eqDeleteById = writeEqDeleteFile(deleteByIdRecords, idSchema);
224+
225+
// Schema B: delete by id + name (different schema → separate bloom-filter projection)
226+
Schema idNameSchema = TypeUtil.select(BasicTableTestHelper.TABLE_SCHEMA, Sets.newHashSet(1, 2));
227+
GenericRecord idNameRec = GenericRecord.create(idNameSchema);
228+
List<Record> deleteByIdNameRecords = new ArrayList<>();
229+
IntStream.rangeClosed(1, 3)
230+
.forEach(
231+
id ->
232+
deleteByIdNameRecords.add(
233+
idNameRec.copy("id", id, "name", id == 1 ? "alice" : "other")));
234+
DeleteFile eqDeleteByIdName = writeEqDeleteFile(deleteByIdNameRecords, idNameSchema);
235+
236+
RewriteFilesInput input =
237+
new RewriteFilesInput(
238+
new DataFile[] {MixedDataTestHelpers.wrapIcebergDataFile(dataFile, 1L)},
239+
new DataFile[] {MixedDataTestHelpers.wrapIcebergDataFile(dataFile, 1L)},
240+
new DeleteFile[] {},
241+
new DeleteFile[] {
242+
MixedDataTestHelpers.wrapIcebergDeleteFile(eqDeleteById, 2L),
243+
MixedDataTestHelpers.wrapIcebergDeleteFile(eqDeleteByIdName, 3L)
244+
},
245+
getMixedTable());
246+
247+
GenericCombinedIcebergDataReader reader = buildReader(input);
248+
Assert.assertTrue("Bloom filter should be active", reader.getDeleteFilter().isFilterEqDelete());
249+
250+
// id=1,2,3 are all deleted by eqDeleteById; none should survive
251+
try (CloseableIterable<Record> surviving = reader.readData()) {
252+
Assert.assertEquals("All records should be deleted", 0, Iterables.size(surviving));
253+
}
254+
255+
try (CloseableIterable<Record> deleted = reader.readDeletedData()) {
256+
Assert.assertEquals("All 3 rows should appear as deleted", 3, Iterables.size(deleted));
257+
}
258+
259+
reader.close();
260+
}
261+
262+
// ---------------------------------------------------------------------------
263+
// Test 3: verify that the bloom filter does NOT wrongly exclude rows that are
264+
// present in the data but absent from the eq-delete files
265+
// ---------------------------------------------------------------------------
266+
267+
/**
268+
* Ensures false-negative freedom: records NOT covered by any equality-delete survive even when
269+
* the bloom filter path is active (i.e. the hoisted StructProjection wraps records faithfully).
270+
*/
271+
@Test
272+
public void testBloomFilterWithHoistedProjection_noFalseNegatives() throws IOException {
273+
DataFile dataFile =
274+
writeDataFile(
275+
Arrays.asList(
276+
MixedDataTestHelpers.createRecord(10, "diana", 100L, "1970-01-01T08:00:00"),
277+
MixedDataTestHelpers.createRecord(20, "eve", 200L, "1970-01-01T08:00:00"),
278+
MixedDataTestHelpers.createRecord(30, "frank", 300L, "1970-01-01T08:00:00")));
279+
280+
// eq-delete on id: only id=10 is deleted; insert extra entries to exceed bloom threshold
281+
Schema idSchema = TypeUtil.select(BasicTableTestHelper.TABLE_SCHEMA, Sets.newHashSet(1));
282+
GenericRecord idRec = GenericRecord.create(idSchema);
283+
List<Record> deleteRecords = new ArrayList<>();
284+
deleteRecords.add(idRec.copy("id", 10));
285+
// pad to exceed BLOOM_TRIGGER
286+
deleteRecords.add(idRec.copy("id", 999));
287+
deleteRecords.add(idRec.copy("id", 9999));
288+
DeleteFile eqDeleteFile = writeEqDeleteFile(deleteRecords, idSchema);
289+
290+
RewriteFilesInput input =
291+
new RewriteFilesInput(
292+
new DataFile[] {MixedDataTestHelpers.wrapIcebergDataFile(dataFile, 1L)},
293+
new DataFile[] {MixedDataTestHelpers.wrapIcebergDataFile(dataFile, 1L)},
294+
new DeleteFile[] {},
295+
new DeleteFile[] {MixedDataTestHelpers.wrapIcebergDeleteFile(eqDeleteFile, 2L)},
296+
getMixedTable());
297+
298+
GenericCombinedIcebergDataReader reader = buildReader(input);
299+
Assert.assertTrue("Bloom filter should be active", reader.getDeleteFilter().isFilterEqDelete());
300+
301+
try (CloseableIterable<Record> surviving = reader.readData()) {
302+
List<Record> result = Lists.newArrayList(surviving);
303+
Assert.assertEquals("id=20 and id=30 should survive", 2, result.size());
304+
}
305+
306+
reader.close();
307+
}
308+
309+
// ---------------------------------------------------------------------------
310+
// Test 4: bloom filter inactive (below threshold) — non-bloom code path
311+
// should also work correctly after the refactor
312+
// ---------------------------------------------------------------------------
313+
314+
/**
315+
* Resets the threshold above the delete-record count so the bloom filter is not activated.
316+
* Confirms the non-bloom code path still correctly applies equality deletes.
317+
*/
318+
@Test
319+
public void testEqualityDeleteWithoutBloomFilter() throws IOException {
320+
// Set threshold high so bloom filter is NOT activated
321+
CombinedDeleteFilter.FILTER_EQ_DELETE_TRIGGER_RECORD_COUNT = 1_000_000L;
322+
323+
DataFile dataFile =
324+
writeDataFile(
325+
Arrays.asList(
326+
MixedDataTestHelpers.createRecord(1, "alice", 10L, "1970-01-01T08:00:00"),
327+
MixedDataTestHelpers.createRecord(2, "bob", 20L, "1970-01-01T08:00:00"),
328+
MixedDataTestHelpers.createRecord(3, "carol", 30L, "1970-01-01T08:00:00")));
329+
330+
Schema idSchema = TypeUtil.select(BasicTableTestHelper.TABLE_SCHEMA, Sets.newHashSet(1));
331+
GenericRecord idRec = GenericRecord.create(idSchema);
332+
DeleteFile eqDeleteFile =
333+
writeEqDeleteFile(Collections.singletonList(idRec.copy("id", 2)), idSchema);
334+
335+
RewriteFilesInput input =
336+
new RewriteFilesInput(
337+
new DataFile[] {MixedDataTestHelpers.wrapIcebergDataFile(dataFile, 1L)},
338+
new DataFile[] {MixedDataTestHelpers.wrapIcebergDataFile(dataFile, 1L)},
339+
new DeleteFile[] {},
340+
new DeleteFile[] {MixedDataTestHelpers.wrapIcebergDeleteFile(eqDeleteFile, 2L)},
341+
getMixedTable());
342+
343+
GenericCombinedIcebergDataReader reader = buildReader(input);
344+
Assert.assertFalse(
345+
"Bloom filter should NOT be active", reader.getDeleteFilter().isFilterEqDelete());
346+
347+
try (CloseableIterable<Record> surviving = reader.readData()) {
348+
List<Record> result = Lists.newArrayList(surviving);
349+
Assert.assertEquals("id=1 and id=3 should survive", 2, result.size());
350+
}
351+
352+
reader.close();
353+
}
354+
}

0 commit comments

Comments
 (0)