Skip to content

Commit 61179c7

Browse files
authored
[Python] SimpleStats supports BinaryRow (#6444)
1 parent de0696d commit 61179c7

14 files changed

Lines changed: 805 additions & 90 deletions

paimon-python/pypaimon/common/predicate.py

Lines changed: 17 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from pyarrow import dataset as pyarrow_dataset
2828

2929
from pypaimon.manifest.schema.simple_stats import SimpleStats
30+
from pypaimon.table.row.generic_row import GenericRow
3031
from pypaimon.table.row.internal_row import InternalRow
3132

3233

@@ -67,32 +68,31 @@ def test(self, record: InternalRow) -> bool:
6768
raise ValueError(f"Unsupported predicate method: {self.method}")
6869

6970
def test_by_simple_stats(self, stat: SimpleStats, row_count: int) -> bool:
70-
return self.test_by_stats({
71-
"min_values": stat.min_values.to_dict(),
72-
"max_values": stat.max_values.to_dict(),
73-
"null_counts": {
74-
stat.min_values.fields[i].name: stat.null_counts[i] for i in range(len(stat.min_values.fields))
75-
},
76-
"row_count": row_count,
77-
})
78-
79-
def test_by_stats(self, stat: Dict) -> bool:
71+
"""Test predicate against BinaryRow stats with denseIndexMapping like Java implementation."""
8072
if self.method == 'and':
81-
return all(p.test_by_stats(stat) for p in self.literals)
73+
return all(p.test_by_simple_stats(stat, row_count) for p in self.literals)
8274
if self.method == 'or':
83-
t = any(p.test_by_stats(stat) for p in self.literals)
84-
return t
75+
return any(p.test_by_simple_stats(stat, row_count) for p in self.literals)
8576

86-
null_count = stat["null_counts"][self.field]
87-
row_count = stat["row_count"]
77+
# Get null count using the mapped index
78+
null_count = stat.null_counts[self.index] if stat.null_counts and self.index < len(
79+
stat.null_counts) else 0
8880

8981
if self.method == 'isNull':
9082
return null_count is not None and null_count > 0
9183
if self.method == 'isNotNull':
9284
return null_count is None or row_count is None or null_count < row_count
9385

94-
min_value = stat["min_values"][self.field]
95-
max_value = stat["max_values"][self.field]
86+
if not isinstance(stat.min_values, GenericRow):
87+
# Parse field values using BinaryRow's direct field access by name
88+
min_value = stat.min_values.get_field(self.index)
89+
max_value = stat.max_values.get_field(self.index)
90+
else:
91+
# TODO transform partition to BinaryRow
92+
min_values = stat.min_values.to_dict()
93+
max_values = stat.max_values.to_dict()
94+
min_value = min_values[self.field]
95+
max_value = max_values[self.field]
9696

9797
if min_value is None or max_value is None or (null_count is not None and null_count == row_count):
9898
# invalid stats, skip validation
@@ -164,7 +164,6 @@ def __init__(cls, name, bases, dct):
164164

165165

166166
class Tester(ABC, metaclass=RegisterMeta):
167-
168167
name = None
169168

170169
@abstractmethod
@@ -187,7 +186,6 @@ def test_by_arrow(self, val, literals) -> bool:
187186

188187

189188
class Equal(Tester):
190-
191189
name = 'equal'
192190

193191
def test_by_value(self, val, literals) -> bool:
@@ -201,7 +199,6 @@ def test_by_arrow(self, val, literals) -> bool:
201199

202200

203201
class NotEqual(Tester):
204-
205202
name = "notEqual"
206203

207204
def test_by_value(self, val, literals) -> bool:
@@ -215,7 +212,6 @@ def test_by_arrow(self, val, literals) -> bool:
215212

216213

217214
class LessThan(Tester):
218-
219215
name = "lessThan"
220216

221217
def test_by_value(self, val, literals) -> bool:
@@ -229,7 +225,6 @@ def test_by_arrow(self, val, literals) -> bool:
229225

230226

231227
class LessOrEqual(Tester):
232-
233228
name = "lessOrEqual"
234229

235230
def test_by_value(self, val, literals) -> bool:
@@ -243,7 +238,6 @@ def test_by_arrow(self, val, literals) -> bool:
243238

244239

245240
class GreaterThan(Tester):
246-
247241
name = "greaterThan"
248242

249243
def test_by_value(self, val, literals) -> bool:
@@ -257,7 +251,6 @@ def test_by_arrow(self, val, literals) -> bool:
257251

258252

259253
class GreaterOrEqual(Tester):
260-
261254
name = "greaterOrEqual"
262255

263256
def test_by_value(self, val, literals) -> bool:
@@ -271,7 +264,6 @@ def test_by_arrow(self, val, literals) -> bool:
271264

272265

273266
class In(Tester):
274-
275267
name = "in"
276268

277269
def test_by_value(self, val, literals) -> bool:
@@ -285,7 +277,6 @@ def test_by_arrow(self, val, literals) -> bool:
285277

286278

287279
class NotIn(Tester):
288-
289280
name = "notIn"
290281

291282
def test_by_value(self, val, literals) -> bool:
@@ -299,7 +290,6 @@ def test_by_arrow(self, val, literals) -> bool:
299290

300291

301292
class Between(Tester):
302-
303293
name = "between"
304294

305295
def test_by_value(self, val, literals) -> bool:
@@ -313,7 +303,6 @@ def test_by_arrow(self, val, literals) -> bool:
313303

314304

315305
class StartsWith(Tester):
316-
317306
name = "startsWith"
318307

319308
def test_by_value(self, val, literals) -> bool:
@@ -329,7 +318,6 @@ def test_by_arrow(self, val, literals) -> bool:
329318

330319

331320
class EndsWith(Tester):
332-
333321
name = "endsWith"
334322

335323
def test_by_value(self, val, literals) -> bool:
@@ -343,7 +331,6 @@ def test_by_arrow(self, val, literals) -> bool:
343331

344332

345333
class Contains(Tester):
346-
347334
name = "contains"
348335

349336
def test_by_value(self, val, literals) -> bool:
@@ -357,7 +344,6 @@ def test_by_arrow(self, val, literals) -> bool:
357344

358345

359346
class IsNull(Tester):
360-
361347
name = "isNull"
362348

363349
def test_by_value(self, val, literals) -> bool:
@@ -371,7 +357,6 @@ def test_by_arrow(self, val, literals) -> bool:
371357

372358

373359
class IsNotNull(Tester):
374-
375360
name = "isNotNull"
376361

377362
def test_by_value(self, val, literals) -> bool:

paimon-python/pypaimon/manifest/manifest_file_manager.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from pypaimon.manifest.schema.simple_stats import SimpleStats
2727
from pypaimon.table.row.generic_row import (GenericRowDeserializer,
2828
GenericRowSerializer)
29+
from pypaimon.table.row.binary_row import BinaryRow
2930

3031

3132
class ManifestFileManager:
@@ -54,12 +55,11 @@ def read(self, manifest_file_name: str, manifest_entry_filter=None, drop_stats=T
5455
file_dict = dict(record['_FILE'])
5556
key_dict = dict(file_dict['_KEY_STATS'])
5657
key_stats = SimpleStats(
57-
min_values=GenericRowDeserializer.from_bytes(key_dict['_MIN_VALUES'],
58-
self.trimmed_primary_key_fields),
59-
max_values=GenericRowDeserializer.from_bytes(key_dict['_MAX_VALUES'],
60-
self.trimmed_primary_key_fields),
58+
min_values=BinaryRow(key_dict['_MIN_VALUES'], self.trimmed_primary_key_fields),
59+
max_values=BinaryRow(key_dict['_MAX_VALUES'], self.trimmed_primary_key_fields),
6160
null_counts=key_dict['_NULL_COUNTS'],
6261
)
62+
6363
value_dict = dict(file_dict['_VALUE_STATS'])
6464
if file_dict['_VALUE_STATS_COLS'] is None:
6565
if file_dict['_WRITE_COLS'] is None:
@@ -72,8 +72,8 @@ def read(self, manifest_file_name: str, manifest_entry_filter=None, drop_stats=T
7272
else:
7373
fields = [self.table.field_dict[col] for col in file_dict['_VALUE_STATS_COLS']]
7474
value_stats = SimpleStats(
75-
min_values=GenericRowDeserializer.from_bytes(value_dict['_MIN_VALUES'], fields),
76-
max_values=GenericRowDeserializer.from_bytes(value_dict['_MAX_VALUES'], fields),
75+
min_values=BinaryRow(value_dict['_MIN_VALUES'], fields),
76+
max_values=BinaryRow(value_dict['_MAX_VALUES'], fields),
7777
null_counts=value_dict['_NULL_COUNTS'],
7878
)
7979
file_meta = DataFileMeta(

paimon-python/pypaimon/manifest/schema/simple_stats.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,13 @@
2121
from typing import ClassVar
2222

2323
from pypaimon.table.row.generic_row import GenericRow
24+
from pypaimon.table.row.internal_row import InternalRow
2425

2526

2627
@dataclass
2728
class SimpleStats:
28-
min_values: GenericRow
29-
max_values: GenericRow
29+
min_values: InternalRow
30+
max_values: InternalRow
3031
null_counts: Optional[List[int]]
3132

3233
_empty_stats: ClassVar[object] = None
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
19+
from typing import List, Optional, Dict, Any
20+
import threading
21+
22+
from pypaimon.schema.data_types import DataField
23+
from pypaimon.manifest.schema.simple_stats import SimpleStats
24+
from pypaimon.table.row.generic_row import GenericRow
25+
from pypaimon.table.row.projected_row import ProjectedRow
26+
27+
28+
class SimpleStatsEvolution:
29+
"""Converter for array of SimpleColStats."""
30+
31+
def __init__(self, data_fields: List[DataField], index_mapping: Optional[List[int]],
32+
cast_field_getters: Optional[List[Any]]):
33+
self.field_names = [field.name for field in data_fields]
34+
self.index_mapping = index_mapping
35+
self.cast_field_getters = cast_field_getters
36+
self.index_mappings: Dict[tuple, List[int]] = {}
37+
self._lock = threading.Lock()
38+
39+
# Create empty values for optimization
40+
self.empty_values = GenericRow([None] * len(self.field_names), data_fields)
41+
self.empty_null_counts = [0] * len(self.field_names)
42+
43+
def evolution(self, stats: SimpleStats, row_count: Optional[int],
44+
stats_fields: Optional[List[str]]) -> 'SimpleStats':
45+
min_values = stats.min_values
46+
max_values = stats.max_values
47+
null_counts = stats.null_counts
48+
49+
if stats_fields is not None and not stats_fields:
50+
# Optimize for empty dense fields
51+
min_values = self.empty_values
52+
max_values = self.empty_values
53+
null_counts = self.empty_null_counts
54+
elif stats_fields is not None:
55+
# Apply dense field mapping
56+
dense_index_mapping = self._get_dense_index_mapping(stats_fields)
57+
min_values = self._project_row(min_values, dense_index_mapping)
58+
max_values = self._project_row(max_values, dense_index_mapping)
59+
null_counts = self._project_array(null_counts, dense_index_mapping)
60+
61+
if self.index_mapping is not None:
62+
# TODO support schema evolution
63+
min_values = self._project_row(min_values, self.index_mapping)
64+
max_values = self._project_row(max_values, self.index_mapping)
65+
66+
if row_count is None:
67+
raise RuntimeError("Schema Evolution for stats needs row count.")
68+
69+
null_counts = self._evolve_null_counts(null_counts, self.index_mapping, row_count)
70+
71+
return SimpleStats(min_values, max_values, null_counts)
72+
73+
def _get_dense_index_mapping(self, dense_fields: List[str]) -> List[int]:
74+
"""
75+
Get dense index mapping similar to Java:
76+
fieldNames.stream().mapToInt(denseFields::indexOf).toArray()
77+
"""
78+
dense_fields_tuple = tuple(dense_fields)
79+
80+
if dense_fields_tuple not in self.index_mappings:
81+
with self._lock:
82+
# Double-check locking
83+
if dense_fields_tuple not in self.index_mappings:
84+
mapping = []
85+
for field_name in self.field_names:
86+
try:
87+
index = dense_fields.index(field_name)
88+
mapping.append(index)
89+
except ValueError:
90+
mapping.append(-1) # Field not found
91+
self.index_mappings[dense_fields_tuple] = mapping
92+
93+
return self.index_mappings[dense_fields_tuple]
94+
95+
def _project_row(self, row: Any, index_mapping: List[int]) -> Any:
96+
"""Project row based on index mapping using ProjectedRow."""
97+
projected_row = ProjectedRow.from_index_mapping(index_mapping)
98+
return projected_row.replace_row(row)
99+
100+
def _project_array(self, array: List[Any], index_mapping: List[int]) -> List[Any]:
101+
"""Project array based on index mapping."""
102+
if not array:
103+
return [0] * len(index_mapping)
104+
105+
projected = []
106+
for mapped_index in index_mapping:
107+
if mapped_index >= 0 and mapped_index < len(array):
108+
projected.append(array[mapped_index])
109+
else:
110+
projected.append(0) # Default value for missing fields
111+
112+
return projected
113+
114+
def _evolve_null_counts(self, null_counts: List[Any], index_mapping: List[int],
115+
not_found_value: int) -> List[Any]:
116+
"""Evolve null counts with schema evolution mapping."""
117+
evolved = []
118+
for mapped_index in index_mapping:
119+
if mapped_index >= 0 and mapped_index < len(null_counts):
120+
evolved.append(null_counts[mapped_index])
121+
else:
122+
evolved.append(not_found_value) # Use row count for missing fields
123+
124+
return evolved

0 commit comments

Comments
 (0)