Skip to content

Commit cfbfbeb

Browse files
authored
[python] Support blob read && write (#6420)
1 parent 87bdda4 commit cfbfbeb

14 files changed

Lines changed: 1862 additions & 26 deletions

paimon-python/pypaimon/common/core_options.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,5 @@ def __str__(self):
4949
INCREMENTAL_BETWEEN_TIMESTAMP = "incremental-between-timestamp"
5050
# Commit options
5151
COMMIT_USER_PREFIX = "commit.user-prefix"
52+
ROW_TRACKING_ENABLED = "row-tracking.enabled"
53+
DATA_EVOLUTION_ENABLED = "data-evolution.enabled"

paimon-python/pypaimon/common/file_io.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import pyarrow
2626
from packaging.version import parse
2727
from pyarrow._fs import FileSystem
28-
2928
from pypaimon.common.config import OssOptions, S3Options
3029
from pypaimon.common.uri_reader import UriReaderFactory
3130
from pypaimon.schema.data_types import DataField, AtomicType, PyarrowFieldParser

paimon-python/pypaimon/read/reader/concat_batch_reader.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import collections
2020
from typing import Callable, List, Optional
2121

22+
import pyarrow as pa
2223
from pyarrow import RecordBatch
2324

2425
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
@@ -76,3 +77,70 @@ def read_arrow_batch(self) -> Optional[RecordBatch]:
7677
return batch.slice(0, self.split_end_row - cur_begin)
7778
else:
7879
return batch
80+
81+
82+
class MergeAllBatchReader(RecordBatchReader):
83+
"""
84+
A reader that accepts multiple reader suppliers and concatenates all their arrow batches
85+
into one big batch. This is useful when you want to merge all data from multiple sources
86+
into a single batch for processing.
87+
"""
88+
89+
def __init__(self, reader_suppliers: List[Callable]):
90+
self.reader_suppliers = reader_suppliers
91+
self.merged_batch: Optional[RecordBatch] = None
92+
self.batch_created = False
93+
94+
def read_arrow_batch(self) -> Optional[RecordBatch]:
95+
if self.batch_created:
96+
return None
97+
98+
all_batches = []
99+
100+
# Read all batches from all reader suppliers
101+
for supplier in self.reader_suppliers:
102+
reader = supplier()
103+
try:
104+
while True:
105+
batch = reader.read_arrow_batch()
106+
if batch is None:
107+
break
108+
all_batches.append(batch)
109+
finally:
110+
reader.close()
111+
112+
# Concatenate all batches into one big batch
113+
if all_batches:
114+
# For PyArrow < 17.0.0, use Table.concat_tables approach
115+
# Convert batches to tables and concatenate
116+
tables = [pa.Table.from_batches([batch]) for batch in all_batches]
117+
if len(tables) == 1:
118+
# Single table, just get the first batch
119+
self.merged_batch = tables[0].to_batches()[0]
120+
else:
121+
# Multiple tables, concatenate them
122+
concatenated_table = pa.concat_tables(tables)
123+
# Convert back to a single batch by taking all batches and combining
124+
all_concatenated_batches = concatenated_table.to_batches()
125+
if len(all_concatenated_batches) == 1:
126+
self.merged_batch = all_concatenated_batches[0]
127+
else:
128+
# If still multiple batches, we need to manually combine them
129+
# This shouldn't happen with concat_tables, but just in case
130+
combined_arrays = []
131+
for i in range(len(all_concatenated_batches[0].columns)):
132+
column_arrays = [batch.column(i) for batch in all_concatenated_batches]
133+
combined_arrays.append(pa.concat_arrays(column_arrays))
134+
self.merged_batch = pa.RecordBatch.from_arrays(
135+
combined_arrays,
136+
names=all_concatenated_batches[0].schema.names
137+
)
138+
else:
139+
self.merged_batch = None
140+
141+
self.batch_created = True
142+
return self.merged_batch
143+
144+
def close(self) -> None:
145+
self.merged_batch = None
146+
self.batch_created = False

paimon-python/pypaimon/read/scanner/full_starting_scanner.py

Lines changed: 116 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from collections import defaultdict
1919
from typing import Callable, List, Optional
2020

21+
from pypaimon.common.core_options import CoreOptions
2122
from pypaimon.common.predicate import Predicate
2223
from pypaimon.common.predicate_builder import PredicateBuilder
2324
from pypaimon.manifest.manifest_file_manager import ManifestFileManager
@@ -65,13 +66,16 @@ def __init__(self, table, predicate: Optional[Predicate], limit: Optional[int],
6566

6667
self.only_read_real_buckets = True if int(
6768
self.table.options.get('bucket', -1)) == BucketMode.POSTPONE_BUCKET.value else False
69+
self.data_evolution = self.table.options.get(CoreOptions.DATA_EVOLUTION_ENABLED, 'false').lower() == 'true'
6870

6971
def scan(self) -> Plan:
7072
file_entries = self.plan_files()
7173
if not file_entries:
7274
return Plan([])
7375
if self.table.is_primary_key_table:
7476
splits = self._create_primary_key_splits(file_entries)
77+
elif self.data_evolution:
78+
splits = self._create_data_evolution_splits(file_entries)
7579
else:
7680
splits = self._create_append_only_splits(file_entries)
7781

@@ -104,7 +108,7 @@ def plan_files(self) -> List[ManifestEntry]:
104108
file_entries = self._filter_by_predicate(file_entries)
105109
return file_entries
106110

107-
def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 'TableScan':
111+
def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 'FullStartingScanner':
108112
if idx_of_this_subtask >= number_of_para_subtasks:
109113
raise Exception("idx_of_this_subtask must be less than number_of_para_subtasks")
110114
self.idx_of_this_subtask = idx_of_this_subtask
@@ -357,3 +361,114 @@ def _pack_for_ordered(items: List, weight_func: Callable, target_weight: int) ->
357361
packed.append(bin_items)
358362

359363
return packed
364+
365+
def _create_data_evolution_splits(self, file_entries: List[ManifestEntry]) -> List['Split']:
366+
partitioned_files = defaultdict(list)
367+
for entry in file_entries:
368+
partitioned_files[(tuple(entry.partition.values), entry.bucket)].append(entry)
369+
370+
if self.idx_of_this_subtask is not None:
371+
partitioned_files, plan_start_row, plan_end_row = self._append_only_filter_by_shard(partitioned_files)
372+
373+
def weight_func(file_list: List[DataFileMeta]) -> int:
374+
return max(sum(f.file_size for f in file_list), self.open_file_cost)
375+
376+
splits = []
377+
for key, file_entries in partitioned_files.items():
378+
if not file_entries:
379+
continue
380+
381+
data_files: List[DataFileMeta] = [e.file for e in file_entries]
382+
383+
# Split files by firstRowId for data evolution
384+
split_by_row_id = self._split_by_row_id(data_files)
385+
386+
# Pack the split groups for optimal split sizes
387+
packed_files: List[List[List[DataFileMeta]]] = self._pack_for_ordered(split_by_row_id, weight_func,
388+
self.target_split_size)
389+
390+
# Flatten the packed files and build splits
391+
flatten_packed_files: List[List[DataFileMeta]] = [
392+
[file for sub_pack in pack for file in sub_pack]
393+
for pack in packed_files
394+
]
395+
396+
splits += self._build_split_from_pack(flatten_packed_files, file_entries, False)
397+
398+
if self.idx_of_this_subtask is not None:
399+
self._compute_split_start_end_row(splits, plan_start_row, plan_end_row)
400+
return splits
401+
402+
def _split_by_row_id(self, files: List[DataFileMeta]) -> List[List[DataFileMeta]]:
403+
split_by_row_id = []
404+
405+
def sort_key(file: DataFileMeta) -> tuple:
406+
first_row_id = file.first_row_id if file.first_row_id is not None else float('-inf')
407+
is_blob = 1 if self._is_blob_file(file.file_name) else 0
408+
# For files with same firstRowId, sort by maxSequenceNumber in descending order
409+
# (larger sequence number means more recent data)
410+
max_seq = file.max_sequence_number
411+
return (first_row_id, is_blob, -max_seq)
412+
413+
sorted_files = sorted(files, key=sort_key)
414+
415+
# Filter blob files to only include those within the row ID range of non-blob files
416+
sorted_files = self._filter_blob(sorted_files)
417+
418+
# Split files by firstRowId
419+
last_row_id = -1
420+
check_row_id_start = 0
421+
current_split = []
422+
423+
for file in sorted_files:
424+
first_row_id = file.first_row_id
425+
if first_row_id is None:
426+
# Files without firstRowId are treated as individual splits
427+
split_by_row_id.append([file])
428+
continue
429+
430+
if not self._is_blob_file(file.file_name) and first_row_id != last_row_id:
431+
if current_split:
432+
split_by_row_id.append(current_split)
433+
434+
# Validate that files don't overlap
435+
if first_row_id < check_row_id_start:
436+
file_names = [f.file_name for f in sorted_files]
437+
raise ValueError(
438+
f"There are overlapping files in the split: {file_names}, "
439+
f"the wrong file is: {file.file_name}"
440+
)
441+
442+
current_split = []
443+
last_row_id = first_row_id
444+
check_row_id_start = first_row_id + file.row_count
445+
446+
current_split.append(file)
447+
448+
if current_split:
449+
split_by_row_id.append(current_split)
450+
451+
return split_by_row_id
452+
453+
@staticmethod
454+
def _is_blob_file(file_name: str) -> bool:
455+
return file_name.endswith('.blob')
456+
457+
@staticmethod
458+
def _filter_blob(files: List[DataFileMeta]) -> List[DataFileMeta]:
459+
result = []
460+
row_id_start = -1
461+
row_id_end = -1
462+
463+
for file in files:
464+
if not FullStartingScanner._is_blob_file(file.file_name):
465+
if file.first_row_id is not None:
466+
row_id_start = file.first_row_id
467+
row_id_end = file.first_row_id + file.row_count
468+
result.append(file)
469+
else:
470+
if file.first_row_id is not None and row_id_start != -1:
471+
if row_id_start <= file.first_row_id < row_id_end:
472+
result.append(file)
473+
474+
return result

paimon-python/pypaimon/read/split_read.py

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
2727
from pypaimon.read.interval_partition import IntervalPartition, SortedRun
2828
from pypaimon.read.partition_info import PartitionInfo
29-
from pypaimon.read.reader.concat_batch_reader import ConcatBatchReader, ShardBatchReader
29+
from pypaimon.read.reader.concat_batch_reader import ConcatBatchReader, ShardBatchReader, MergeAllBatchReader
3030
from pypaimon.read.reader.concat_record_reader import ConcatRecordReader
3131
from pypaimon.read.reader.data_file_batch_reader import DataFileBatchReader
3232
from pypaimon.read.reader.data_evolution_merge_reader import DataEvolutionMergeReader
@@ -73,21 +73,21 @@ def __init__(self, table, predicate: Optional[Predicate], push_down_predicate,
7373
def create_reader(self) -> RecordReader:
7474
"""Create a record reader for the given split."""
7575

76-
def file_reader_supplier(self, file_path: str, for_merge_read: bool):
76+
def file_reader_supplier(self, file_path: str, for_merge_read: bool, read_fields: List[str]):
7777
_, extension = os.path.splitext(file_path)
7878
file_format = extension[1:]
7979

8080
format_reader: RecordBatchReader
8181
if file_format == CoreOptions.FILE_FORMAT_AVRO:
82-
format_reader = FormatAvroReader(self.table.file_io, file_path, self._get_final_read_data_fields(),
82+
format_reader = FormatAvroReader(self.table.file_io, file_path, read_fields,
8383
self.read_fields, self.push_down_predicate)
8484
elif file_format == CoreOptions.FILE_FORMAT_BLOB:
8585
blob_as_descriptor = self.table.options.get(CoreOptions.FILE_BLOB_AS_DESCRIPTOR, False)
86-
format_reader = FormatBlobReader(self.table.file_io, file_path, self._get_final_read_data_fields(),
86+
format_reader = FormatBlobReader(self.table.file_io, file_path, read_fields,
8787
self.read_fields, self.push_down_predicate, blob_as_descriptor)
8888
elif file_format == CoreOptions.FILE_FORMAT_PARQUET or file_format == CoreOptions.FILE_FORMAT_ORC:
8989
format_reader = FormatPyArrowReader(self.table.file_io, file_format, file_path,
90-
self._get_final_read_data_fields(), self.push_down_predicate)
90+
read_fields, self.push_down_predicate)
9191
else:
9292
raise ValueError(f"Unexpected file format: {file_format}")
9393

@@ -253,7 +253,12 @@ class RawFileSplitRead(SplitRead):
253253
def create_reader(self) -> RecordReader:
254254
data_readers = []
255255
for file_path in self.split.file_paths:
256-
supplier = partial(self.file_reader_supplier, file_path=file_path, for_merge_read=False)
256+
supplier = partial(
257+
self.file_reader_supplier,
258+
file_path=file_path,
259+
for_merge_read=False,
260+
read_fields=self._get_final_read_data_fields(),
261+
)
257262
data_readers.append(supplier)
258263

259264
if not data_readers:
@@ -274,7 +279,12 @@ def _get_all_data_fields(self):
274279

275280
class MergeFileSplitRead(SplitRead):
276281
def kv_reader_supplier(self, file_path):
277-
reader_supplier = partial(self.file_reader_supplier, file_path=file_path, for_merge_read=True)
282+
reader_supplier = partial(
283+
self.file_reader_supplier,
284+
file_path=file_path,
285+
for_merge_read=True,
286+
read_fields=self._get_final_read_data_fields()
287+
)
278288
return KeyValueWrapReader(reader_supplier(), len(self.trimmed_primary_key), self.value_arity)
279289

280290
def section_reader_supplier(self, section: List[SortedRun]):
@@ -317,7 +327,7 @@ def create_reader(self) -> RecordReader:
317327
if len(need_merge_files) == 1 or not self.read_fields:
318328
# No need to merge fields, just create a single file reader
319329
suppliers.append(
320-
lambda f=need_merge_files[0]: self._create_file_reader(f)
330+
lambda f=need_merge_files[0]: self._create_file_reader(f, self._get_final_read_data_fields())
321331
)
322332
else:
323333
suppliers.append(
@@ -424,26 +434,30 @@ def _create_union_reader(self, need_merge_files: List[DataFileMeta]) -> RecordRe
424434
self.read_fields = read_fields # create reader based on read_fields
425435
# Create reader for this bunch
426436
if len(bunch.files()) == 1:
427-
file_record_readers[i] = self._create_file_reader(bunch.files()[0])
437+
file_record_readers[i] = self._create_file_reader(
438+
bunch.files()[0], [field.name for field in read_fields]
439+
)
428440
else:
429441
# Create concatenated reader for multiple files
430442
suppliers = [
431-
lambda f=file: self._create_file_reader(f) for file in bunch.files()
443+
lambda f=file: self._create_file_reader(
444+
f, [field.name for field in read_fields]
445+
) for file in bunch.files()
432446
]
433-
file_record_readers[i] = ConcatRecordReader(suppliers)
447+
file_record_readers[i] = MergeAllBatchReader(suppliers)
434448
self.read_fields = table_fields
435449

436450
# Validate that all required fields are found
437451
for i, field in enumerate(all_read_fields):
438452
if row_offsets[i] == -1:
439-
if not field.type.is_nullable():
453+
if not field.type.nullable:
440454
raise ValueError(f"Field {field} is not null but can't find any file contains it.")
441455

442456
return DataEvolutionMergeReader(row_offsets, field_offsets, file_record_readers)
443457

444-
def _create_file_reader(self, file: DataFileMeta) -> RecordReader:
458+
def _create_file_reader(self, file: DataFileMeta, read_fields: [str]) -> RecordReader:
445459
"""Create a file reader for a single file."""
446-
return self.file_reader_supplier(file_path=file.file_path, for_merge_read=False)
460+
return self.file_reader_supplier(file_path=file.file_path, for_merge_read=False, read_fields=read_fields)
447461

448462
def _split_field_bunches(self, need_merge_files: List[DataFileMeta]) -> List[FieldBunch]:
449463
"""Split files into field bunches."""

paimon-python/pypaimon/read/table_read.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import pandas
2121
import pyarrow
2222

23+
from pypaimon.common.core_options import CoreOptions
2324
from pypaimon.common.predicate import Predicate
2425
from pypaimon.common.predicate_builder import PredicateBuilder
2526
from pypaimon.read.push_down_utils import extract_predicate_to_list
@@ -132,7 +133,7 @@ def _create_split_read(self, split: Split) -> SplitRead:
132133
read_type=self.read_type,
133134
split=split
134135
)
135-
elif self.table.options.get('data-evolution.enabled', 'false').lower() == 'true':
136+
elif self.table.options.get(CoreOptions.DATA_EVOLUTION_ENABLED, 'false').lower() == 'true':
136137
return DataEvolutionSplitRead(
137138
table=self.table,
138139
predicate=self.predicate,

paimon-python/pypaimon/schema/data_types.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -548,6 +548,7 @@ def to_paimon_field(field_idx: int, pa_field: pyarrow.Field) -> DataField:
548548

549549
@staticmethod
550550
def to_paimon_schema(pa_schema: pyarrow.Schema) -> List[DataField]:
551+
# Convert PyArrow schema to Paimon fields
551552
fields = []
552553
for i, pa_field in enumerate(pa_schema):
553554
pa_field: pyarrow.Field

0 commit comments

Comments
 (0)