Skip to content

Commit a214e71

Browse files
jerry-024JingsongLi
authored andcommitted
[python] support blob type and blob write and read (#6390)
1 parent 05ad2a0 commit a214e71

12 files changed

Lines changed: 2124 additions & 3 deletions

File tree

paimon-python/pypaimon/common/core_options.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ def __str__(self):
3838
FILE_FORMAT_ORC = "orc"
3939
FILE_FORMAT_AVRO = "avro"
4040
FILE_FORMAT_PARQUET = "parquet"
41+
FILE_FORMAT_BLOB = "blob"
4142
FILE_COMPRESSION = "file.compression"
4243
FILE_COMPRESSION_PER_LEVEL = "file.compression.per.level"
4344
FILE_FORMAT_PER_LEVEL = "file.format.per.level"
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
19+
import io
20+
from typing import List
21+
22+
23+
class DeltaVarintCompressor:
24+
25+
@staticmethod
26+
def compress(data: List[int]) -> bytes:
27+
if not data:
28+
return b''
29+
30+
# Estimate output size (conservative: 5 bytes per varint max)
31+
out = io.BytesIO()
32+
out.seek(0)
33+
34+
# Encode first value directly
35+
DeltaVarintCompressor._encode_varint(data[0], out)
36+
37+
# Encode deltas without intermediate list creation
38+
prev = data[0]
39+
for i in range(1, len(data)):
40+
current = data[i]
41+
delta = current - prev
42+
DeltaVarintCompressor._encode_varint(delta, out)
43+
prev = current
44+
45+
# Return only the used portion of the buffer
46+
position = out.tell()
47+
result = out.getvalue()
48+
out.close()
49+
return result[:position]
50+
51+
@staticmethod
52+
def decompress(compressed: bytes) -> List[int]:
53+
if not compressed:
54+
return []
55+
56+
# Fast path: decode directly into result without intermediate deltas list
57+
in_stream = io.BytesIO(compressed)
58+
result = []
59+
60+
try:
61+
# Decode first value
62+
first_value = DeltaVarintCompressor._decode_varint(in_stream)
63+
result.append(first_value)
64+
65+
# Decode and reconstruct remaining values in one pass
66+
current_value = first_value
67+
while True:
68+
try:
69+
delta = DeltaVarintCompressor._decode_varint(in_stream)
70+
current_value += delta
71+
result.append(current_value)
72+
except RuntimeError:
73+
# End of stream reached
74+
break
75+
76+
except RuntimeError:
77+
# Handle empty stream case
78+
pass
79+
finally:
80+
in_stream.close()
81+
82+
return result
83+
84+
@staticmethod
85+
def _encode_varint(value: int, out: io.BytesIO) -> None:
86+
# ZigZag encoding: maps signed integers to unsigned integers
87+
if value >= 0:
88+
zigzag = value << 1
89+
else:
90+
zigzag = ((-value) << 1) - 1
91+
92+
# Varint encoding
93+
while zigzag >= 0x80:
94+
out.write(bytes([(zigzag & 0x7F) | 0x80]))
95+
zigzag >>= 7
96+
out.write(bytes([zigzag & 0x7F]))
97+
98+
@staticmethod
99+
def _decode_varint(in_stream: io.BytesIO) -> int:
100+
result = 0
101+
shift = 0
102+
while True:
103+
byte_data = in_stream.read(1)
104+
if not byte_data:
105+
if shift == 0:
106+
# Natural end of stream
107+
raise RuntimeError("End of stream")
108+
else:
109+
# Unexpected end in middle of varint
110+
raise RuntimeError("Unexpected end of input")
111+
112+
b = byte_data[0]
113+
result |= (b & 0x7F) << shift
114+
if (b & 0x80) == 0:
115+
break
116+
117+
shift += 7
118+
if shift > 63:
119+
raise RuntimeError("Varint overflow")
120+
121+
# ZigZag decoding: maps unsigned integers back to signed integers
122+
if result & 1:
123+
return -((result + 1) >> 1)
124+
else:
125+
return result >> 1

paimon-python/pypaimon/common/file_io.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@
2828
from pyarrow._fs import FileSystem
2929

3030
from pypaimon.common.config import OssOptions, S3Options
31+
from pypaimon.schema.data_types import DataField, AtomicType, PyarrowFieldParser
32+
from pypaimon.table.row.blob import BlobData
33+
from pypaimon.table.row.generic_row import GenericRow
34+
from pypaimon.table.row.row_kind import RowKind
35+
from pypaimon.write.blob_format_writer import BlobFormatWriter
3136

3237

3338
class FileIO:
@@ -364,3 +369,53 @@ def record_generator():
364369

365370
with self.new_output_stream(path) as output_stream:
366371
fastavro.writer(output_stream, avro_schema, records, **kwargs)
372+
373+
def write_blob(self, path: Path, data: pyarrow.Table, **kwargs):
374+
try:
375+
# Validate input constraints
376+
if data.num_columns != 1:
377+
raise RuntimeError(f"Blob format only supports a single column, got {data.num_columns} columns")
378+
# Check for null values
379+
column = data.column(0)
380+
if column.null_count > 0:
381+
raise RuntimeError("Blob format does not support null values")
382+
# Convert PyArrow schema to Paimon DataFields
383+
# For blob files, we expect exactly one blob column
384+
field = data.schema[0]
385+
if pyarrow.types.is_large_binary(field.type):
386+
fields = [DataField(0, field.name, AtomicType("BLOB"))]
387+
else:
388+
# Convert other types as needed
389+
paimon_type = PyarrowFieldParser.to_paimon_type(field.type, field.nullable)
390+
fields = [DataField(0, field.name, paimon_type)]
391+
# Convert PyArrow Table to records
392+
records_dict = data.to_pydict()
393+
num_rows = data.num_rows
394+
field_name = fields[0].name
395+
with self.new_output_stream(path) as output_stream:
396+
writer = BlobFormatWriter(output_stream)
397+
# Write each row
398+
for i in range(num_rows):
399+
col_data = records_dict[field_name][i]
400+
# Convert to appropriate type based on field type
401+
if hasattr(fields[0].type, 'type') and fields[0].type.type == "BLOB":
402+
if isinstance(col_data, bytes):
403+
blob_data = BlobData(col_data)
404+
else:
405+
# Convert to bytes if needed
406+
if hasattr(col_data, 'as_py'):
407+
col_data = col_data.as_py()
408+
if isinstance(col_data, str):
409+
col_data = col_data.encode('utf-8')
410+
blob_data = BlobData(col_data)
411+
row_values = [blob_data]
412+
else:
413+
row_values = [col_data]
414+
# Create GenericRow and write
415+
row = GenericRow(row_values, fields, RowKind.INSERT)
416+
writer.add_element(row)
417+
writer.close()
418+
419+
except Exception as e:
420+
self.delete_quietly(path)
421+
raise RuntimeError(f"Failed to write blob file {path}: {e}") from e
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
import struct
19+
from pathlib import Path
20+
from typing import List, Optional, Any, Iterator
21+
22+
import pyarrow as pa
23+
import pyarrow.dataset as ds
24+
from pyarrow import RecordBatch
25+
26+
from pypaimon.common.delta_varint_compressor import DeltaVarintCompressor
27+
from pypaimon.common.file_io import FileIO
28+
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
29+
from pypaimon.schema.data_types import DataField, PyarrowFieldParser
30+
from pypaimon.table.row.blob import Blob, BlobDescriptor, BlobRef
31+
from pypaimon.table.row.generic_row import GenericRow
32+
33+
34+
class FormatBlobReader(RecordBatchReader):
35+
36+
def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str],
37+
full_fields: List[DataField], push_down_predicate: Any):
38+
self._file_io = file_io
39+
self._file_path = file_path
40+
self._push_down_predicate = push_down_predicate
41+
42+
# Get file size
43+
self._file_size = file_io.get_file_size(file_path)
44+
45+
# Initialize the low-level blob format reader
46+
self.file_path = file_path
47+
self.blob_lengths: List[int] = []
48+
self.blob_offsets: List[int] = []
49+
self.returned = False
50+
self._read_index()
51+
52+
# Set up fields and schema
53+
if len(read_fields) > 1:
54+
raise RuntimeError("Blob reader only supports one field.")
55+
self._fields = read_fields
56+
full_fields_map = {field.name: field for field in full_fields}
57+
projected_data_fields = [full_fields_map[name] for name in read_fields]
58+
self._schema = PyarrowFieldParser.from_paimon_schema(projected_data_fields)
59+
60+
# Initialize iterator
61+
self._blob_iterator = None
62+
self._current_batch = None
63+
64+
def read_arrow_batch(self) -> Optional[RecordBatch]:
65+
if self._blob_iterator is None:
66+
if self.returned:
67+
return None
68+
self.returned = True
69+
batch_iterator = BlobRecordIterator(self.file_path, self.blob_lengths, self.blob_offsets, self._fields[0])
70+
self._blob_iterator = iter(batch_iterator)
71+
72+
# Collect records for this batch
73+
pydict_data = {name: [] for name in self._fields}
74+
records_in_batch = 0
75+
76+
try:
77+
while True:
78+
# Get next blob record
79+
blob_row = next(self._blob_iterator)
80+
# Check if first read returns None, stop immediately
81+
if blob_row is None:
82+
break
83+
84+
# Extract blob data from the row
85+
blob = blob_row.values[0] # Blob files have single blob field
86+
87+
# Convert blob to appropriate format for each requested field
88+
for field_name in self._fields:
89+
# For blob files, all fields should contain blob data
90+
if isinstance(blob, Blob):
91+
blob_data = blob.to_data()
92+
else:
93+
blob_data = bytes(blob) if blob is not None else None
94+
pydict_data[field_name].append(blob_data)
95+
96+
records_in_batch += 1
97+
98+
except StopIteration:
99+
# Stop immediately when StopIteration occurs
100+
pass
101+
102+
if records_in_batch == 0:
103+
return None
104+
105+
# Create RecordBatch
106+
if self._push_down_predicate is None:
107+
# Convert to Table first, then to RecordBatch
108+
table = pa.Table.from_pydict(pydict_data, self._schema)
109+
if table.num_rows > 0:
110+
return table.to_batches()[0]
111+
else:
112+
return None
113+
else:
114+
# Apply predicate filtering
115+
pa_batch = pa.Table.from_pydict(pydict_data, self._schema)
116+
dataset = ds.InMemoryDataset(pa_batch)
117+
scanner = dataset.scanner(filter=self._push_down_predicate)
118+
combine_chunks = scanner.to_table().combine_chunks()
119+
if combine_chunks.num_rows > 0:
120+
return combine_chunks.to_batches()[0]
121+
else:
122+
return None
123+
124+
def close(self):
125+
self._blob_iterator = None
126+
127+
def _read_index(self) -> None:
128+
with self._file_io.new_input_stream(Path(self.file_path)) as f:
129+
# Seek to header: last 5 bytes
130+
f.seek(self._file_size - 5)
131+
header = f.read(5)
132+
133+
if len(header) != 5:
134+
raise IOError("Invalid blob file: cannot read header")
135+
136+
# Parse header
137+
index_length = struct.unpack('<I', header[:4])[0] # Little endian
138+
version = header[4]
139+
140+
if version != 1:
141+
raise IOError(f"Unsupported blob file version: {version}")
142+
143+
# Read index data
144+
f.seek(self._file_size - 5 - index_length)
145+
index_bytes = f.read(index_length)
146+
147+
if len(index_bytes) != index_length:
148+
raise IOError("Invalid blob file: cannot read index")
149+
150+
# Decompress blob lengths and compute offsets
151+
blob_lengths = DeltaVarintCompressor.decompress(index_bytes)
152+
blob_offsets = []
153+
offset = 0
154+
for length in blob_lengths:
155+
blob_offsets.append(offset)
156+
offset += length
157+
self.blob_lengths = blob_lengths
158+
self.blob_offsets = blob_offsets
159+
160+
161+
class BlobRecordIterator:
162+
MAGIC_NUMBER_SIZE = 4
163+
METADATA_OVERHEAD = 16
164+
165+
def __init__(self, file_path: str, blob_lengths: List[int], blob_offsets: List[int], field_name: str):
166+
self.file_path = file_path
167+
self.field_name = field_name
168+
self.blob_lengths = blob_lengths
169+
self.blob_offsets = blob_offsets
170+
self.current_position = 0
171+
172+
def __iter__(self) -> Iterator[GenericRow]:
173+
return self
174+
175+
def __next__(self) -> GenericRow:
176+
if self.current_position >= len(self.blob_lengths):
177+
raise StopIteration
178+
179+
# Create blob reference for the current blob
180+
# Skip magic number (4 bytes) and exclude length (8 bytes) + CRC (4 bytes) = 12 bytes
181+
blob_offset = self.blob_offsets[self.current_position] + self.MAGIC_NUMBER_SIZE # Skip magic number
182+
blob_length = self.blob_lengths[self.current_position] - self.METADATA_OVERHEAD
183+
184+
# Create BlobDescriptor for this blob
185+
descriptor = BlobDescriptor(self.file_path, blob_offset, blob_length)
186+
blob = BlobRef(descriptor)
187+
188+
self.current_position += 1
189+
190+
# Return as GenericRow with single blob field
191+
from pypaimon.schema.data_types import DataField, AtomicType
192+
from pypaimon.table.row.row_kind import RowKind
193+
194+
fields = [DataField(0, self.field_name, AtomicType("BLOB"))]
195+
return GenericRow([blob], fields, RowKind.INSERT)
196+
197+
def returned_position(self) -> int:
198+
"""Get current position in the iterator."""
199+
return self.current_position

0 commit comments

Comments
 (0)