Skip to content

Commit e35962b

Browse files
authored
[core] Disable blob type force spill yet (#6427)
1 parent a7b584d commit e35962b

2 files changed

Lines changed: 131 additions & 0 deletions

File tree

paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import java.util.function.Function;
6060

6161
import static org.apache.paimon.format.FileFormat.fileFormat;
62+
import static org.apache.paimon.types.DataTypeRoot.BLOB;
6263
import static org.apache.paimon.utils.StatsCollectorFactories.createStatsFactories;
6364

6465
/** {@link FileStoreWrite} for {@link AppendOnlyFileStore}. */
@@ -78,6 +79,7 @@ public abstract class BaseAppendFileStoreWrite extends MemoryFileStoreWrite<Inte
7879
private RowType writeType;
7980
private @Nullable List<String> writeCols;
8081
private boolean forceBufferSpill = false;
82+
private boolean withBlob;
8183

8284
public BaseAppendFileStoreWrite(
8385
FileIO fileIO,
@@ -100,6 +102,7 @@ public BaseAppendFileStoreWrite(
100102
this.writeCols = null;
101103
this.fileFormat = fileFormat(options);
102104
this.pathFactory = pathFactory;
105+
this.withBlob = rowType.getFieldTypes().stream().anyMatch(t -> t.is(BLOB));
103106

104107
this.fileIndexOptions = options.indexColumnsOptions();
105108
}
@@ -142,6 +145,7 @@ protected RecordWriter<InternalRow> createWriter(
142145
@Override
143146
public void withWriteType(RowType writeType) {
144147
this.writeType = writeType;
148+
this.withBlob = writeType.getFieldTypes().stream().anyMatch(t -> t.is(BLOB));
145149
int fullCount = rowType.getFieldCount();
146150
List<String> fullNames = rowType.getFieldNames();
147151
this.writeCols = writeType.getFieldNames();
@@ -235,6 +239,9 @@ protected void forceBufferSpill() throws Exception {
235239
if (ioManager == null) {
236240
return;
237241
}
242+
if (withBlob) {
243+
return;
244+
}
238245
if (forceBufferSpill) {
239246
return;
240247
}

paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
import org.apache.paimon.CoreOptions;
2222
import org.apache.paimon.compression.CompressOptions;
2323
import org.apache.paimon.data.BinaryRow;
24+
import org.apache.paimon.data.BinaryRowWriter;
2425
import org.apache.paimon.data.BinaryString;
26+
import org.apache.paimon.data.BlobData;
2527
import org.apache.paimon.data.GenericRow;
2628
import org.apache.paimon.data.InternalRow;
2729
import org.apache.paimon.disk.ChannelWithMeta;
@@ -38,9 +40,15 @@
3840
import org.apache.paimon.manifest.FileSource;
3941
import org.apache.paimon.memory.HeapMemorySegmentPool;
4042
import org.apache.paimon.memory.MemoryPoolFactory;
43+
import org.apache.paimon.operation.BaseAppendFileStoreWrite;
4144
import org.apache.paimon.options.MemorySize;
4245
import org.apache.paimon.options.Options;
46+
import org.apache.paimon.schema.Schema;
47+
import org.apache.paimon.schema.TableSchema;
4348
import org.apache.paimon.stats.SimpleStatsConverter;
49+
import org.apache.paimon.table.AppendOnlyFileStoreTable;
50+
import org.apache.paimon.table.FileStoreTableFactory;
51+
import org.apache.paimon.types.BlobType;
4452
import org.apache.paimon.types.DataType;
4553
import org.apache.paimon.types.IntType;
4654
import org.apache.paimon.types.RowType;
@@ -62,6 +70,7 @@
6270
import java.util.ArrayList;
6371
import java.util.Arrays;
6472
import java.util.Collections;
73+
import java.util.HashMap;
6574
import java.util.LinkedList;
6675
import java.util.List;
6776
import java.util.Set;
@@ -394,6 +403,69 @@ public void testSpillWorksAndMoreSmallFilesGenerated() throws Exception {
394403
});
395404
}
396405

406+
@Test
407+
public void testNoSpillWhenMeetBlobType() throws Exception {
408+
// Create a schema with BLOB type
409+
RowType blobSchema =
410+
RowType.builder()
411+
.fields(
412+
new DataType[] {new IntType(), new VarCharType(), new BlobType()},
413+
new String[] {"id", "name", "data"})
414+
.build();
415+
416+
AppendOnlyFileStoreTable table =
417+
(AppendOnlyFileStoreTable)
418+
FileStoreTableFactory.create(
419+
LocalFileIO.create(),
420+
pathFactory.newPath(),
421+
TableSchema.create(
422+
0,
423+
new Schema(
424+
blobSchema.getFields(),
425+
Collections.singletonList("id"),
426+
Collections.emptyList(),
427+
new HashMap<String, String>() {
428+
{
429+
put(
430+
CoreOptions.DATA_EVOLUTION_ENABLED
431+
.key(),
432+
"true");
433+
put(
434+
CoreOptions.ROW_TRACKING_ENABLED
435+
.key(),
436+
"true");
437+
}
438+
},
439+
"")));
440+
BaseAppendFileStoreWrite writer = table.store().newWrite("test");
441+
writer.withIOManager(IOManager.create(tempDir.toString()));
442+
writer.withMemoryPoolFactory(
443+
new MemoryPoolFactory(new HeapMemorySegmentPool(16384L, 1024)));
444+
445+
char[] largeString = new char[990];
446+
Arrays.fill(largeString, 'a');
447+
byte[] largeBlobData = new byte[1024];
448+
Arrays.fill(largeBlobData, (byte) 'b');
449+
450+
BinaryRow binaryRow = new BinaryRow(1);
451+
BinaryRowWriter binaryRowWriter = new BinaryRowWriter(binaryRow);
452+
for (int j = 0; j < 100; j++) {
453+
binaryRowWriter.reset();
454+
binaryRowWriter.writeInt(0, j);
455+
binaryRowWriter.complete();
456+
writer.write(
457+
binaryRow, 0, createBlobRow(j, String.valueOf(largeString), largeBlobData));
458+
}
459+
460+
binaryRowWriter.reset();
461+
binaryRowWriter.writeInt(0, 1000);
462+
binaryRowWriter.complete();
463+
AppendOnlyWriter appendOnlyWriter = (AppendOnlyWriter) writer.createWriter(binaryRow, 0);
464+
RowBuffer buffer = appendOnlyWriter.getWriteBuffer();
465+
assertThat(buffer).isNull();
466+
writer.close();
467+
}
468+
397469
@Test
398470
public void testNoBuffer() throws Exception {
399471
AppendOnlyWriter writer = createEmptyWriter(Long.MAX_VALUE);
@@ -686,4 +758,56 @@ private DataFileMeta generateCompactAfter(List<DataFileMeta> toCompact) throws I
686758
null,
687759
null);
688760
}
761+
762+
private InternalRow createBlobRow(int id, String name, byte[] blobData) {
763+
return GenericRow.of(id, BinaryString.fromString(name), new BlobData(blobData));
764+
}
765+
766+
private AppendOnlyWriter createWriterWithBlobSchema(
767+
RowType schema, long targetFileSize, boolean spillable) {
768+
FileFormat fileFormat = FileFormat.fromIdentifier(AVRO, new Options());
769+
LinkedList<DataFileMeta> toCompact = new LinkedList<>();
770+
BucketedAppendCompactManager compactManager =
771+
new BucketedAppendCompactManager(
772+
Executors.newSingleThreadScheduledExecutor(
773+
new ExecutorThreadFactory("compaction-thread")),
774+
toCompact,
775+
null,
776+
MIN_FILE_NUM,
777+
targetFileSize,
778+
false,
779+
compactBefore -> Collections.emptyList(),
780+
null);
781+
CoreOptions options =
782+
new CoreOptions(Collections.singletonMap("metadata.stats-mode", "truncate(16)"));
783+
AppendOnlyWriter writer =
784+
new AppendOnlyWriter(
785+
LocalFileIO.create(),
786+
IOManager.create(tempDir.toString()),
787+
SCHEMA_ID,
788+
fileFormat,
789+
targetFileSize,
790+
schema,
791+
null,
792+
getMaxSequenceNumber(toCompact),
793+
compactManager,
794+
files -> {
795+
throw new RuntimeException("Can't read back in blob mode");
796+
},
797+
false,
798+
pathFactory,
799+
null,
800+
false,
801+
spillable,
802+
CoreOptions.FILE_COMPRESSION.defaultValue(),
803+
CompressOptions.defaultOptions(),
804+
new StatsCollectorFactories(options),
805+
MemorySize.MAX_VALUE,
806+
new FileIndexOptions(),
807+
true,
808+
false);
809+
writer.setMemoryPool(
810+
new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize()));
811+
return writer;
812+
}
689813
}

0 commit comments

Comments
 (0)