Skip to content

Commit bb19a53

Browse files
authored
[core] Disable blob manifest filter for now (#6443)
1 parent e35962b commit bb19a53

3 files changed

Lines changed: 105 additions & 0 deletions

File tree

paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.paimon.operation.BaseAppendFileStoreWrite;
2828
import org.apache.paimon.operation.BucketSelectConverter;
2929
import org.apache.paimon.operation.BucketedAppendFileStoreWrite;
30+
import org.apache.paimon.operation.DataEvolutionFileStoreScan;
3031
import org.apache.paimon.operation.DataEvolutionSplitRead;
3132
import org.apache.paimon.operation.RawFileSplitRead;
3233
import org.apache.paimon.predicate.Predicate;
@@ -168,6 +169,17 @@ public AppendOnlyFileStoreScan newScan() {
168169
return Optional.empty();
169170
};
170171

172+
if (options().dataEvolutionEnabled()) {
173+
return new DataEvolutionFileStoreScan(
174+
newManifestsReader(),
175+
bucketSelectConverter,
176+
snapshotManager(),
177+
schemaManager,
178+
schema,
179+
manifestFileFactory(),
180+
options.scanManifestParallelism());
181+
}
182+
171183
return new AppendOnlyFileStoreScan(
172184
newManifestsReader(),
173185
bucketSelectConverter,
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.operation;
20+
21+
import org.apache.paimon.manifest.ManifestEntry;
22+
import org.apache.paimon.manifest.ManifestFile;
23+
import org.apache.paimon.predicate.Predicate;
24+
import org.apache.paimon.schema.SchemaManager;
25+
import org.apache.paimon.schema.TableSchema;
26+
import org.apache.paimon.utils.SnapshotManager;
27+
28+
/** {@link FileStoreScan} for data-evolution enabled table. */
29+
public class DataEvolutionFileStoreScan extends AppendOnlyFileStoreScan {
30+
31+
public DataEvolutionFileStoreScan(
32+
ManifestsReader manifestsReader,
33+
BucketSelectConverter bucketSelectConverter,
34+
SnapshotManager snapshotManager,
35+
SchemaManager schemaManager,
36+
TableSchema schema,
37+
ManifestFile.Factory manifestFileFactory,
38+
Integer scanManifestParallelism) {
39+
super(
40+
manifestsReader,
41+
bucketSelectConverter,
42+
snapshotManager,
43+
schemaManager,
44+
schema,
45+
manifestFileFactory,
46+
scanManifestParallelism,
47+
false);
48+
}
49+
50+
public DataEvolutionFileStoreScan withFilter(Predicate predicate) {
51+
return this;
52+
}
53+
54+
/** Note: Keep this thread-safe. */
55+
@Override
56+
protected boolean filterByStats(ManifestEntry entry) {
57+
return true;
58+
}
59+
}

paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.apache.paimon.data.GenericRow;
2424
import org.apache.paimon.data.InternalRow;
2525
import org.apache.paimon.io.DataFileMeta;
26+
import org.apache.paimon.predicate.Predicate;
27+
import org.apache.paimon.predicate.PredicateBuilder;
2628
import org.apache.paimon.reader.DataEvolutionFileReader;
2729
import org.apache.paimon.reader.RecordReader;
2830
import org.apache.paimon.schema.Schema;
@@ -31,6 +33,7 @@
3133
import org.apache.paimon.table.sink.BatchWriteBuilder;
3234
import org.apache.paimon.table.sink.CommitMessage;
3335
import org.apache.paimon.table.sink.CommitMessageImpl;
36+
import org.apache.paimon.table.source.DataSplit;
3437
import org.apache.paimon.table.source.ReadBuilder;
3538
import org.apache.paimon.types.DataTypes;
3639
import org.apache.paimon.types.RowType;
@@ -385,6 +388,37 @@ public void testMoreData() throws Exception {
385388
});
386389
}
387390

391+
@Test
392+
public void testPredicate() throws Exception {
393+
createTableDefault();
394+
Schema schema = schemaDefault();
395+
BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder();
396+
try (BatchTableWrite write = builder.newWrite().withWriteType(schema.rowType())) {
397+
write.write(
398+
GenericRow.of(1, BinaryString.fromString("a"), BinaryString.fromString("b")));
399+
BatchTableCommit commit = builder.newCommit();
400+
List<CommitMessage> commitables = write.prepareCommit();
401+
commit.commit(commitables);
402+
}
403+
404+
RowType writeType1 = schema.rowType().project(Collections.singletonList("f2"));
405+
try (BatchTableWrite write1 = builder.newWrite().withWriteType(writeType1)) {
406+
write1.write(GenericRow.of(BinaryString.fromString("c")));
407+
408+
BatchTableCommit commit = builder.newCommit();
409+
List<CommitMessage> commitables = write1.prepareCommit();
410+
setFirstRowId(commitables, 0L);
411+
commit.commit(commitables);
412+
}
413+
414+
ReadBuilder readBuilder = getTableDefault().newReadBuilder();
415+
PredicateBuilder predicateBuilder = new PredicateBuilder(schema.rowType());
416+
Predicate predicate = predicateBuilder.notEqual(2, BinaryString.fromString("b"));
417+
readBuilder.withFilter(predicate);
418+
assertThat(((DataSplit) readBuilder.newScan().plan().splits().get(0)).dataFiles().size())
419+
.isEqualTo(2);
420+
}
421+
388422
protected Schema schemaDefault() {
389423
Schema.Builder schemaBuilder = Schema.newBuilder();
390424
schemaBuilder.column("f0", DataTypes.INT());

0 commit comments

Comments
 (0)