Skip to content

Commit 0e05b7e

Browse files
discivigourJingsongLi
authored andcommitted
[Python] Enable field merge read in row-tracking table (#6399)
1 parent a214e71 commit 0e05b7e

20 files changed

Lines changed: 1708 additions & 29 deletions

paimon-python/pypaimon/manifest/manifest_file_manager.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,13 @@ def read(self, manifest_file_name: str, bucket_filter=None) -> List[ManifestEntr
6060
null_counts=key_dict['_NULL_COUNTS'],
6161
)
6262
value_dict = dict(file_dict['_VALUE_STATS'])
63-
if file_dict.get('_VALUE_STATS_COLS') is None:
64-
fields = self.table.table_schema.fields
65-
elif not file_dict.get('_VALUE_STATS_COLS'):
63+
if file_dict['_VALUE_STATS_COLS'] is None:
64+
if file_dict['_WRITE_COLS'] is None:
65+
fields = self.table.table_schema.fields
66+
else:
67+
read_fields = file_dict['_WRITE_COLS']
68+
fields = [self.table.field_dict[col] for col in read_fields]
69+
elif not file_dict['_VALUE_STATS_COLS']:
6670
fields = []
6771
else:
6872
fields = [self.table.field_dict[col] for col in file_dict['_VALUE_STATS_COLS']]
@@ -89,6 +93,9 @@ def read(self, manifest_file_name: str, bucket_filter=None) -> List[ManifestEntr
8993
embedded_index=file_dict['_EMBEDDED_FILE_INDEX'],
9094
file_source=file_dict['_FILE_SOURCE'],
9195
value_stats_cols=file_dict.get('_VALUE_STATS_COLS'),
96+
external_path=file_dict.get('_EXTERNAL_PATH'),
97+
first_row_id=file_dict['_FIRST_ROW_ID'],
98+
write_cols=file_dict['_WRITE_COLS'],
9299
)
93100
entry = ManifestEntry(
94101
kind=record['_KIND'],
@@ -137,6 +144,9 @@ def write(self, file_name, entries: List[ManifestEntry]):
137144
"_EMBEDDED_FILE_INDEX": entry.file.embedded_index,
138145
"_FILE_SOURCE": entry.file.file_source,
139146
"_VALUE_STATS_COLS": entry.file.value_stats_cols,
147+
"_EXTERNAL_PATH": entry.file.external_path,
148+
"_FIRST_ROW_ID": entry.file.first_row_id,
149+
"_WRITE_COLS": entry.file.write_cols,
140150
}
141151
}
142152
avro_records.append(avro_record)

paimon-python/pypaimon/manifest/schema/data_file_meta.py

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ class DataFileMeta:
4747
file_source: Optional[str] = None
4848
value_stats_cols: Optional[List[str]] = None
4949
external_path: Optional[str] = None
50+
first_row_id: Optional[int] = None
51+
write_cols: Optional[List[str]] = None
5052

5153
# not a schema field, just for internal usage
5254
file_path: str = None
@@ -59,6 +61,58 @@ def set_file_path(self, table_path: Path, partition: GenericRow, bucket: int):
5961
path_builder = path_builder / ("bucket-" + str(bucket)) / self.file_name
6062
self.file_path = str(path_builder)
6163

64+
def assign_first_row_id(self, first_row_id: int) -> 'DataFileMeta':
65+
"""Create a new DataFileMeta with the assigned first_row_id."""
66+
return DataFileMeta(
67+
file_name=self.file_name,
68+
file_size=self.file_size,
69+
row_count=self.row_count,
70+
min_key=self.min_key,
71+
max_key=self.max_key,
72+
key_stats=self.key_stats,
73+
value_stats=self.value_stats,
74+
min_sequence_number=self.min_sequence_number,
75+
max_sequence_number=self.max_sequence_number,
76+
schema_id=self.schema_id,
77+
level=self.level,
78+
extra_files=self.extra_files,
79+
creation_time=self.creation_time,
80+
delete_row_count=self.delete_row_count,
81+
embedded_index=self.embedded_index,
82+
file_source=self.file_source,
83+
value_stats_cols=self.value_stats_cols,
84+
external_path=self.external_path,
85+
first_row_id=first_row_id,
86+
write_cols=self.write_cols,
87+
file_path=self.file_path
88+
)
89+
90+
def assign_sequence_number(self, min_sequence_number: int, max_sequence_number: int) -> 'DataFileMeta':
91+
"""Create a new DataFileMeta with the assigned sequence numbers."""
92+
return DataFileMeta(
93+
file_name=self.file_name,
94+
file_size=self.file_size,
95+
row_count=self.row_count,
96+
min_key=self.min_key,
97+
max_key=self.max_key,
98+
key_stats=self.key_stats,
99+
value_stats=self.value_stats,
100+
min_sequence_number=min_sequence_number,
101+
max_sequence_number=max_sequence_number,
102+
schema_id=self.schema_id,
103+
level=self.level,
104+
extra_files=self.extra_files,
105+
creation_time=self.creation_time,
106+
delete_row_count=self.delete_row_count,
107+
embedded_index=self.embedded_index,
108+
file_source=self.file_source,
109+
value_stats_cols=self.value_stats_cols,
110+
external_path=self.external_path,
111+
first_row_id=self.first_row_id,
112+
write_cols=self.write_cols,
113+
file_path=self.file_path
114+
)
115+
62116

63117
DATA_FILE_META_SCHEMA = {
64118
"type": "record",
@@ -83,9 +137,14 @@ def set_file_path(self, table_path: Path, partition: GenericRow, bucket: int):
83137
"default": None},
84138
{"name": "_DELETE_ROW_COUNT", "type": ["null", "long"], "default": None},
85139
{"name": "_EMBEDDED_FILE_INDEX", "type": ["null", "bytes"], "default": None},
86-
{"name": "_FILE_SOURCE", "type": ["null", "int"], "default": None},
140+
{"name": "_FILE_SOURCE", "type": ["null", "string"], "default": None},
87141
{"name": "_VALUE_STATS_COLS",
88142
"type": ["null", {"type": "array", "items": "string"}],
89143
"default": None},
144+
{"name": "_EXTERNAL_PATH", "type": ["null", "string"], "default": None},
145+
{"name": "_FIRST_ROW_ID", "type": ["null", "long"], "default": None},
146+
{"name": "_WRITE_COLS",
147+
"type": ["null", {"type": "array", "items": "string"}],
148+
"default": None},
90149
]
91150
}

paimon-python/pypaimon/manifest/schema/manifest_entry.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,26 @@ class ManifestEntry:
3131
total_buckets: int
3232
file: DataFileMeta
3333

34+
def assign_first_row_id(self, first_row_id: int) -> 'ManifestEntry':
35+
"""Create a new ManifestEntry with the assigned first_row_id."""
36+
return ManifestEntry(
37+
kind=self.kind,
38+
partition=self.partition,
39+
bucket=self.bucket,
40+
total_buckets=self.total_buckets,
41+
file=self.file.assign_first_row_id(first_row_id)
42+
)
43+
44+
def assign_sequence_number(self, min_sequence_number: int, max_sequence_number: int) -> 'ManifestEntry':
45+
"""Create a new ManifestEntry with the assigned sequence numbers."""
46+
return ManifestEntry(
47+
kind=self.kind,
48+
partition=self.partition,
49+
bucket=self.bucket,
50+
total_buckets=self.total_buckets,
51+
file=self.file.assign_sequence_number(min_sequence_number, max_sequence_number)
52+
)
53+
3454

3555
MANIFEST_ENTRY_SCHEMA = {
3656
"type": "record",
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
from typing import List, Optional
19+
20+
import pyarrow as pa
21+
from pyarrow import RecordBatch
22+
23+
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
24+
25+
26+
class DataEvolutionMergeReader(RecordBatchReader):
27+
"""
28+
This is a union reader which contains multiple inner readers, Each reader is responsible for reading one file.
29+
30+
This reader, assembling multiple reader into one big and great reader, will merge the batches from all readers.
31+
32+
For example, if rowOffsets is {0, 2, 0, 1, 2, 1} and fieldOffsets is {0, 0, 1, 1, 1, 0}, it means:
33+
- The first field comes from batch0, and it is at offset 0 in batch0.
34+
- The second field comes from batch2, and it is at offset 0 in batch2.
35+
- The third field comes from batch0, and it is at offset 1 in batch0.
36+
- The fourth field comes from batch1, and it is at offset 1 in batch1.
37+
- The fifth field comes from batch2, and it is at offset 1 in batch2.
38+
- The sixth field comes from batch1, and it is at offset 0 in batch1.
39+
"""
40+
41+
def __init__(self, row_offsets: List[int], field_offsets: List[int], readers: List[Optional[RecordBatchReader]]):
42+
if row_offsets is None:
43+
raise ValueError("Row offsets must not be null")
44+
if field_offsets is None:
45+
raise ValueError("Field offsets must not be null")
46+
if len(row_offsets) != len(field_offsets):
47+
raise ValueError("Row offsets and field offsets must have the same length")
48+
if not row_offsets:
49+
raise ValueError("Row offsets must not be empty")
50+
if not readers or len(readers) < 1:
51+
raise ValueError("Readers should be more than 0")
52+
self.row_offsets = row_offsets
53+
self.field_offsets = field_offsets
54+
self.readers = readers
55+
56+
def read_arrow_batch(self) -> Optional[RecordBatch]:
57+
batches: List[Optional[RecordBatch]] = [None] * len(self.readers)
58+
for i, reader in enumerate(self.readers):
59+
if reader is not None:
60+
batch = reader.read_arrow_batch()
61+
if batch is None:
62+
# all readers are aligned, as long as one returns null, the others will also have no data
63+
return None
64+
batches[i] = batch
65+
# Assemble record batches from batches based on row_offsets and field_offsets
66+
columns = []
67+
names = []
68+
for i in range(len(self.row_offsets)):
69+
batch_index = self.row_offsets[i]
70+
field_index = self.field_offsets[i]
71+
if batches[batch_index] is not None:
72+
column = batches[batch_index].column(field_index)
73+
columns.append(column)
74+
names.append(batches[batch_index].schema.names[field_index])
75+
if columns:
76+
return pa.RecordBatch.from_arrays(columns, names)
77+
return None
78+
79+
def close(self) -> None:
80+
try:
81+
for reader in self.readers:
82+
if reader is not None:
83+
reader.close()
84+
except Exception as e:
85+
raise IOError("Failed to close inner readers") from e

paimon-python/pypaimon/read/reader/data_file_record_reader.py renamed to paimon-python/pypaimon/read/reader/data_file_batch_reader.py

File renamed without changes.
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
"""
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
"""
18+
"""
19+
FieldBunch classes for organizing files by field in data evolution.
20+
21+
These classes help organize DataFileMeta objects into groups based on their field content,
22+
supporting both regular data files and blob files.
23+
"""
24+
from abc import ABC
25+
from typing import List
26+
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
27+
28+
29+
class FieldBunch(ABC):
30+
"""Interface for files organized by field."""
31+
32+
def row_count(self) -> int:
33+
"""Return the total row count for this bunch."""
34+
...
35+
36+
def files(self) -> List[DataFileMeta]:
37+
"""Return the list of files in this bunch."""
38+
...
39+
40+
41+
class DataBunch(FieldBunch):
42+
"""Files for a single data file."""
43+
44+
def __init__(self, data_file: DataFileMeta):
45+
self.data_file = data_file
46+
47+
def row_count(self) -> int:
48+
return self.data_file.row_count
49+
50+
def files(self) -> List[DataFileMeta]:
51+
return [self.data_file]
52+
53+
54+
class BlobBunch(FieldBunch):
55+
"""Files for partial field (blob files)."""
56+
57+
def __init__(self, expected_row_count: int):
58+
self._files: List[DataFileMeta] = []
59+
self.expected_row_count = expected_row_count
60+
self.latest_first_row_id = -1
61+
self.expected_next_first_row_id = -1
62+
self.latest_max_sequence_number = -1
63+
self._row_count = 0
64+
65+
def add(self, file: DataFileMeta) -> None:
66+
"""Add a blob file to this bunch."""
67+
if not self._is_blob_file(file.file_name):
68+
raise ValueError("Only blob file can be added to a blob bunch.")
69+
70+
if file.first_row_id == self.latest_first_row_id:
71+
if file.max_sequence_number >= self.latest_max_sequence_number:
72+
raise ValueError(
73+
"Blob file with same first row id should have decreasing sequence number."
74+
)
75+
return
76+
77+
if self._files:
78+
first_row_id = file.first_row_id
79+
if first_row_id < self.expected_next_first_row_id:
80+
if file.max_sequence_number >= self.latest_max_sequence_number:
81+
raise ValueError(
82+
"Blob file with overlapping row id should have decreasing sequence number."
83+
)
84+
return
85+
elif first_row_id > self.expected_next_first_row_id:
86+
raise ValueError(
87+
f"Blob file first row id should be continuous, expect "
88+
f"{self.expected_next_first_row_id} but got {first_row_id}"
89+
)
90+
91+
if file.schema_id != self._files[0].schema_id:
92+
raise ValueError(
93+
"All files in a blob bunch should have the same schema id."
94+
)
95+
if file.write_cols != self._files[0].write_cols:
96+
raise ValueError(
97+
"All files in a blob bunch should have the same write columns."
98+
)
99+
100+
self._files.append(file)
101+
self._row_count += file.row_count
102+
if self._row_count > self.expected_row_count:
103+
raise ValueError(
104+
f"Blob files row count exceed the expect {self.expected_row_count}"
105+
)
106+
107+
self.latest_max_sequence_number = file.max_sequence_number
108+
self.latest_first_row_id = file.first_row_id
109+
self.expected_next_first_row_id = self.latest_first_row_id + file.row_count
110+
111+
def row_count(self) -> int:
112+
return self._row_count
113+
114+
def files(self) -> List[DataFileMeta]:
115+
return self._files
116+
117+
@staticmethod
118+
def _is_blob_file(file_name: str) -> bool:
119+
"""Check if a file is a blob file based on its extension."""
120+
return file_name.endswith('.blob')

0 commit comments

Comments
 (0)