|
| 1 | +################################################################################ |
| 2 | +# Licensed to the Apache Software Foundation (ASF) under one |
| 3 | +# or more contributor license agreements. See the NOTICE file |
| 4 | +# distributed with this work for additional information |
| 5 | +# regarding copyright ownership. The ASF licenses this file |
| 6 | +# to you under the Apache License, Version 2.0 (the |
| 7 | +# "License"); you may not use this file except in compliance |
| 8 | +# with the License. You may obtain a copy of the License at |
| 9 | +# |
| 10 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | +# |
| 12 | +# Unless required by applicable law or agreed to in writing, software |
| 13 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 14 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 15 | +# See the License for the specific language governing permissions and |
| 16 | +# limitations under the License. |
| 17 | +################################################################################ |
| 18 | +import struct |
| 19 | +from pathlib import Path |
| 20 | +from typing import List, Optional, Any, Iterator |
| 21 | + |
| 22 | +import pyarrow as pa |
| 23 | +import pyarrow.dataset as ds |
| 24 | +from pyarrow import RecordBatch |
| 25 | + |
| 26 | +from pypaimon.common.delta_varint_compressor import DeltaVarintCompressor |
| 27 | +from pypaimon.common.file_io import FileIO |
| 28 | +from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader |
| 29 | +from pypaimon.schema.data_types import DataField, PyarrowFieldParser |
| 30 | +from pypaimon.table.row.blob import Blob, BlobDescriptor, BlobRef |
| 31 | +from pypaimon.table.row.generic_row import GenericRow |
| 32 | + |
| 33 | + |
| 34 | +class FormatBlobReader(RecordBatchReader): |
| 35 | + |
| 36 | + def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str], |
| 37 | + full_fields: List[DataField], push_down_predicate: Any): |
| 38 | + self._file_io = file_io |
| 39 | + self._file_path = file_path |
| 40 | + self._push_down_predicate = push_down_predicate |
| 41 | + |
| 42 | + # Get file size |
| 43 | + self._file_size = file_io.get_file_size(file_path) |
| 44 | + |
| 45 | + # Initialize the low-level blob format reader |
| 46 | + self.file_path = file_path |
| 47 | + self.blob_lengths: List[int] = [] |
| 48 | + self.blob_offsets: List[int] = [] |
| 49 | + self.returned = False |
| 50 | + self._read_index() |
| 51 | + |
| 52 | + # Set up fields and schema |
| 53 | + if len(read_fields) > 1: |
| 54 | + raise RuntimeError("Blob reader only supports one field.") |
| 55 | + self._fields = read_fields |
| 56 | + full_fields_map = {field.name: field for field in full_fields} |
| 57 | + projected_data_fields = [full_fields_map[name] for name in read_fields] |
| 58 | + self._schema = PyarrowFieldParser.from_paimon_schema(projected_data_fields) |
| 59 | + |
| 60 | + # Initialize iterator |
| 61 | + self._blob_iterator = None |
| 62 | + self._current_batch = None |
| 63 | + |
| 64 | + def read_arrow_batch(self) -> Optional[RecordBatch]: |
| 65 | + if self._blob_iterator is None: |
| 66 | + if self.returned: |
| 67 | + return None |
| 68 | + self.returned = True |
| 69 | + batch_iterator = BlobRecordIterator(self.file_path, self.blob_lengths, self.blob_offsets, self._fields[0]) |
| 70 | + self._blob_iterator = iter(batch_iterator) |
| 71 | + |
| 72 | + # Collect records for this batch |
| 73 | + pydict_data = {name: [] for name in self._fields} |
| 74 | + records_in_batch = 0 |
| 75 | + |
| 76 | + try: |
| 77 | + while True: |
| 78 | + # Get next blob record |
| 79 | + blob_row = next(self._blob_iterator) |
| 80 | + # Check if first read returns None, stop immediately |
| 81 | + if blob_row is None: |
| 82 | + break |
| 83 | + |
| 84 | + # Extract blob data from the row |
| 85 | + blob = blob_row.values[0] # Blob files have single blob field |
| 86 | + |
| 87 | + # Convert blob to appropriate format for each requested field |
| 88 | + for field_name in self._fields: |
| 89 | + # For blob files, all fields should contain blob data |
| 90 | + if isinstance(blob, Blob): |
| 91 | + blob_data = blob.to_data() |
| 92 | + else: |
| 93 | + blob_data = bytes(blob) if blob is not None else None |
| 94 | + pydict_data[field_name].append(blob_data) |
| 95 | + |
| 96 | + records_in_batch += 1 |
| 97 | + |
| 98 | + except StopIteration: |
| 99 | + # Stop immediately when StopIteration occurs |
| 100 | + pass |
| 101 | + |
| 102 | + if records_in_batch == 0: |
| 103 | + return None |
| 104 | + |
| 105 | + # Create RecordBatch |
| 106 | + if self._push_down_predicate is None: |
| 107 | + # Convert to Table first, then to RecordBatch |
| 108 | + table = pa.Table.from_pydict(pydict_data, self._schema) |
| 109 | + if table.num_rows > 0: |
| 110 | + return table.to_batches()[0] |
| 111 | + else: |
| 112 | + return None |
| 113 | + else: |
| 114 | + # Apply predicate filtering |
| 115 | + pa_batch = pa.Table.from_pydict(pydict_data, self._schema) |
| 116 | + dataset = ds.InMemoryDataset(pa_batch) |
| 117 | + scanner = dataset.scanner(filter=self._push_down_predicate) |
| 118 | + combine_chunks = scanner.to_table().combine_chunks() |
| 119 | + if combine_chunks.num_rows > 0: |
| 120 | + return combine_chunks.to_batches()[0] |
| 121 | + else: |
| 122 | + return None |
| 123 | + |
| 124 | + def close(self): |
| 125 | + self._blob_iterator = None |
| 126 | + |
| 127 | + def _read_index(self) -> None: |
| 128 | + with self._file_io.new_input_stream(Path(self.file_path)) as f: |
| 129 | + # Seek to header: last 5 bytes |
| 130 | + f.seek(self._file_size - 5) |
| 131 | + header = f.read(5) |
| 132 | + |
| 133 | + if len(header) != 5: |
| 134 | + raise IOError("Invalid blob file: cannot read header") |
| 135 | + |
| 136 | + # Parse header |
| 137 | + index_length = struct.unpack('<I', header[:4])[0] # Little endian |
| 138 | + version = header[4] |
| 139 | + |
| 140 | + if version != 1: |
| 141 | + raise IOError(f"Unsupported blob file version: {version}") |
| 142 | + |
| 143 | + # Read index data |
| 144 | + f.seek(self._file_size - 5 - index_length) |
| 145 | + index_bytes = f.read(index_length) |
| 146 | + |
| 147 | + if len(index_bytes) != index_length: |
| 148 | + raise IOError("Invalid blob file: cannot read index") |
| 149 | + |
| 150 | + # Decompress blob lengths and compute offsets |
| 151 | + blob_lengths = DeltaVarintCompressor.decompress(index_bytes) |
| 152 | + blob_offsets = [] |
| 153 | + offset = 0 |
| 154 | + for length in blob_lengths: |
| 155 | + blob_offsets.append(offset) |
| 156 | + offset += length |
| 157 | + self.blob_lengths = blob_lengths |
| 158 | + self.blob_offsets = blob_offsets |
| 159 | + |
| 160 | + |
| 161 | +class BlobRecordIterator: |
| 162 | + MAGIC_NUMBER_SIZE = 4 |
| 163 | + METADATA_OVERHEAD = 16 |
| 164 | + |
| 165 | + def __init__(self, file_path: str, blob_lengths: List[int], blob_offsets: List[int], field_name: str): |
| 166 | + self.file_path = file_path |
| 167 | + self.field_name = field_name |
| 168 | + self.blob_lengths = blob_lengths |
| 169 | + self.blob_offsets = blob_offsets |
| 170 | + self.current_position = 0 |
| 171 | + |
| 172 | + def __iter__(self) -> Iterator[GenericRow]: |
| 173 | + return self |
| 174 | + |
| 175 | + def __next__(self) -> GenericRow: |
| 176 | + if self.current_position >= len(self.blob_lengths): |
| 177 | + raise StopIteration |
| 178 | + |
| 179 | + # Create blob reference for the current blob |
| 180 | + # Skip magic number (4 bytes) and exclude length (8 bytes) + CRC (4 bytes) = 12 bytes |
| 181 | + blob_offset = self.blob_offsets[self.current_position] + self.MAGIC_NUMBER_SIZE # Skip magic number |
| 182 | + blob_length = self.blob_lengths[self.current_position] - self.METADATA_OVERHEAD |
| 183 | + |
| 184 | + # Create BlobDescriptor for this blob |
| 185 | + descriptor = BlobDescriptor(self.file_path, blob_offset, blob_length) |
| 186 | + blob = BlobRef(descriptor) |
| 187 | + |
| 188 | + self.current_position += 1 |
| 189 | + |
| 190 | + # Return as GenericRow with single blob field |
| 191 | + from pypaimon.schema.data_types import DataField, AtomicType |
| 192 | + from pypaimon.table.row.row_kind import RowKind |
| 193 | + |
| 194 | + fields = [DataField(0, self.field_name, AtomicType("BLOB"))] |
| 195 | + return GenericRow([blob], fields, RowKind.INSERT) |
| 196 | + |
| 197 | + def returned_position(self) -> int: |
| 198 | + """Get current position in the iterator.""" |
| 199 | + return self.current_position |
0 commit comments