Skip to content

Commit 428fab9

Browse files
authored
[python] Filter manifest entry by advance to reduce memory (#6428)
1 parent 2e85151 commit 428fab9

12 files changed

Lines changed: 95 additions & 154 deletions

paimon-python/pypaimon/common/predicate_builder.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,17 +101,27 @@ def between(self, field: str, included_lower_bound: Any, included_upper_bound: A
101101
"""Create a between predicate."""
102102
return self._build_predicate('between', field, [included_lower_bound, included_upper_bound])
103103

104-
def and_predicates(self, predicates: List[Predicate]) -> Predicate:
104+
@staticmethod
105+
def and_predicates(predicates: List[Predicate]) -> Optional[Predicate]:
105106
"""Create an AND predicate from multiple predicates."""
107+
if len(predicates) == 0:
108+
return None
109+
if len(predicates) == 1:
110+
return predicates[0]
106111
return Predicate(
107112
method='and',
108113
index=None,
109114
field=None,
110115
literals=predicates
111116
)
112117

113-
def or_predicates(self, predicates: List[Predicate]) -> Predicate:
118+
@staticmethod
119+
def or_predicates(predicates: List[Predicate]) -> Optional[Predicate]:
114120
"""Create an OR predicate from multiple predicates."""
121+
if len(predicates) == 0:
122+
return None
123+
if len(predicates) == 1:
124+
return predicates[0]
115125
return Predicate(
116126
method='or',
117127
index=None,

paimon-python/pypaimon/manifest/manifest_file_manager.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ def __init__(self, table):
4141
self.primary_key_fields = self.table.table_schema.get_primary_key_fields()
4242
self.trimmed_primary_key_fields = self.table.table_schema.get_trimmed_primary_key_fields()
4343

44-
def read(self, manifest_file_name: str, bucket_filter=None) -> List[ManifestEntry]:
44+
def read(self, manifest_file_name: str, manifest_entry_filter=None) -> List[ManifestEntry]:
4545
manifest_file_path = self.manifest_path / manifest_file_name
4646

4747
entries = []
@@ -105,7 +105,7 @@ def read(self, manifest_file_name: str, bucket_filter=None) -> List[ManifestEntr
105105
total_buckets=record['_TOTAL_BUCKETS'],
106106
file=file_meta
107107
)
108-
if bucket_filter is not None and not bucket_filter(entry):
108+
if manifest_entry_filter is not None and not manifest_entry_filter(entry):
109109
continue
110110
entries.append(entry)
111111
return entries

paimon-python/pypaimon/read/push_down_utils.py

Lines changed: 17 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -19,31 +19,29 @@
1919
from typing import Dict, List, Set
2020

2121
from pypaimon.common.predicate import Predicate
22+
from pypaimon.common.predicate_builder import PredicateBuilder
2223

2324

24-
def to_partition_predicate(input_predicate: 'Predicate', all_fields: List[str], partition_keys: List[str]):
25-
if not input_predicate or not partition_keys:
26-
return None
27-
28-
predicates: list['Predicate'] = _split_and(input_predicate)
29-
predicates = [element for element in predicates if _get_all_fields(element).issubset(partition_keys)]
30-
new_predicate = Predicate(
31-
method='and',
32-
index=None,
33-
field=None,
34-
literals=predicates
35-
)
36-
37-
part_to_index = {element: idx for idx, element in enumerate(partition_keys)}
25+
def filter_and_transform_predicate(input_predicate: Predicate, all_fields: List[str], fields: List[str]):
26+
new_predicate = filter_predicate_by_fields(input_predicate, fields)
27+
part_to_index = {element: idx for idx, element in enumerate(fields)}
3828
mapping: Dict[int, int] = {
3929
i: part_to_index.get(all_fields[i], -1)
4030
for i in range(len(all_fields))
4131
}
42-
4332
return _change_index(new_predicate, mapping)
4433

4534

46-
def _split_and(input_predicate: 'Predicate'):
35+
def filter_predicate_by_fields(input_predicate: Predicate, fields: List[str]):
36+
if not input_predicate or not fields:
37+
return None
38+
39+
predicates: list[Predicate] = _split_and(input_predicate)
40+
predicates = [element for element in predicates if _get_all_fields(element).issubset(fields)]
41+
return PredicateBuilder.and_predicates(predicates)
42+
43+
44+
def _split_and(input_predicate: Predicate):
4745
if not input_predicate:
4846
return list()
4947

@@ -53,65 +51,23 @@ def _split_and(input_predicate: 'Predicate'):
5351
return [input_predicate]
5452

5553

56-
def _change_index(input_predicate: 'Predicate', mapping: Dict[int, int]):
54+
def _change_index(input_predicate: Predicate, mapping: Dict[int, int]):
5755
if not input_predicate:
5856
return None
5957

6058
if input_predicate.method == 'and' or input_predicate.method == 'or':
61-
predicates: list['Predicate'] = input_predicate.literals
59+
predicates: list[Predicate] = input_predicate.literals
6260
new_predicates = [_change_index(element, mapping) for element in predicates]
6361
return input_predicate.new_literals(new_predicates)
6462

6563
return input_predicate.new_index(mapping[input_predicate.index])
6664

6765

68-
def extract_predicate_to_list(result: list, input_predicate: 'Predicate', keys: List[str]):
69-
if not input_predicate or not keys:
70-
return
71-
72-
if input_predicate.method == 'and':
73-
for sub_predicate in input_predicate.literals:
74-
extract_predicate_to_list(result, sub_predicate, keys)
75-
return
76-
elif input_predicate.method == 'or':
77-
# condition: involved keys all belong to primary keys
78-
involved_fields = _get_all_fields(input_predicate)
79-
if involved_fields and involved_fields.issubset(keys):
80-
result.append(input_predicate)
81-
return
82-
83-
if input_predicate.field in keys:
84-
result.append(input_predicate)
85-
86-
87-
def _get_all_fields(predicate: 'Predicate') -> Set[str]:
66+
def _get_all_fields(predicate: Predicate) -> Set[str]:
8867
if predicate.field is not None:
8968
return {predicate.field}
9069
involved_fields = set()
9170
if predicate.literals:
9271
for sub_predicate in predicate.literals:
9372
involved_fields.update(_get_all_fields(sub_predicate))
9473
return involved_fields
95-
96-
97-
def extract_predicate_to_dict(result: Dict, input_predicate: 'Predicate', keys: List[str]):
98-
if not input_predicate or not keys:
99-
return
100-
101-
if input_predicate.method == 'and':
102-
for sub_predicate in input_predicate.literals:
103-
extract_predicate_to_dict(result, sub_predicate, keys)
104-
return
105-
elif input_predicate.method == 'or':
106-
# ensure no recursive and/or
107-
if not input_predicate.literals or any(p.field is None for p in input_predicate.literals):
108-
return
109-
# condition: only one key for 'or', and the key belongs to keys
110-
involved_fields = {p.field for p in input_predicate.literals}
111-
field = involved_fields.pop() if len(involved_fields) == 1 else None
112-
if field is not None and field in keys:
113-
result[field].append(input_predicate)
114-
return
115-
116-
if input_predicate.field in keys:
117-
result[input_predicate.field].append(input_predicate)

paimon-python/pypaimon/read/scanner/full_starting_scanner.py

Lines changed: 25 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,14 @@
2020

2121
from pypaimon.common.core_options import CoreOptions
2222
from pypaimon.common.predicate import Predicate
23-
from pypaimon.common.predicate_builder import PredicateBuilder
2423
from pypaimon.manifest.manifest_file_manager import ManifestFileManager
2524
from pypaimon.manifest.manifest_list_manager import ManifestListManager
2625
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
2726
from pypaimon.manifest.schema.manifest_entry import ManifestEntry
2827
from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta
2928
from pypaimon.read.interval_partition import IntervalPartition, SortedRun
3029
from pypaimon.read.plan import Plan
31-
from pypaimon.read.push_down_utils import (extract_predicate_to_dict,
32-
extract_predicate_to_list,
33-
to_partition_predicate)
30+
from pypaimon.read.push_down_utils import (filter_and_transform_predicate)
3431
from pypaimon.read.scanner.starting_scanner import StartingScanner
3532
from pypaimon.read.split import Split
3633
from pypaimon.snapshot.snapshot_manager import SnapshotManager
@@ -49,14 +46,11 @@ def __init__(self, table, predicate: Optional[Predicate], limit: Optional[int]):
4946
self.manifest_list_manager = ManifestListManager(table)
5047
self.manifest_file_manager = ManifestFileManager(table)
5148

52-
pk_conditions = []
53-
trimmed_pk = [field.name for field in self.table.table_schema.get_trimmed_primary_key_fields()]
54-
extract_predicate_to_list(pk_conditions, self.predicate, trimmed_pk)
55-
self.primary_key_predicate = PredicateBuilder(self.table.fields).and_predicates(pk_conditions)
49+
self.primary_key_predicate = filter_and_transform_predicate(
50+
self.predicate, self.table.field_names, self.table.table_schema.get_trimmed_primary_keys())
5651

57-
partition_conditions = defaultdict(list)
58-
extract_predicate_to_dict(partition_conditions, self.predicate, self.table.partition_keys)
59-
self.partition_key_predicate = partition_conditions
52+
self.partition_key_predicate = filter_and_transform_predicate(
53+
self.predicate, self.table.field_names, self.table.partition_keys)
6054

6155
self.target_split_size = 128 * 1024 * 1024
6256
self.open_file_cost = 4 * 1024 * 1024
@@ -82,29 +76,29 @@ def scan(self) -> Plan:
8276
splits = self._apply_push_down_limit(splits)
8377
return Plan(splits)
8478

85-
def _read_manifest_files(self) -> List[ManifestFileMeta]:
79+
def plan_files(self) -> List[ManifestEntry]:
8680
latest_snapshot = self.snapshot_manager.get_latest_snapshot()
8781
if not latest_snapshot:
8882
return []
8983
manifest_files = self.manifest_list_manager.read_all(latest_snapshot)
90-
partition_predicate = to_partition_predicate(self.predicate, self.table.field_names, self.table.partition_keys)
84+
return self.read_manifest_entries(manifest_files)
9185

92-
def test_predicate(file: ManifestFileMeta) -> bool:
93-
if not partition_predicate:
86+
def read_manifest_entries(self, manifest_files: List[ManifestFileMeta]) -> List[ManifestEntry]:
87+
def filter_manifest_file(file: ManifestFileMeta) -> bool:
88+
if not self.partition_key_predicate:
9489
return True
95-
return partition_predicate.test_by_simple_stats(
90+
return self.partition_key_predicate.test_by_simple_stats(
9691
file.partition_stats,
9792
file.num_added_files + file.num_deleted_files)
9893

99-
return [file for file in manifest_files if test_predicate(file)]
100-
101-
def plan_files(self) -> List[ManifestEntry]:
102-
manifest_files = self._read_manifest_files()
10394
deleted_entries = set()
10495
added_entries = []
10596
for manifest_file in manifest_files:
106-
manifest_entries = self.manifest_file_manager.read(manifest_file.file_name,
107-
lambda row: self._bucket_filter(row))
97+
if not filter_manifest_file(manifest_file):
98+
continue
99+
manifest_entries = self.manifest_file_manager.read(
100+
manifest_file.file_name,
101+
lambda row: self._filter_manifest_entry(row))
108102
for entry in manifest_entries:
109103
if entry.kind == 0:
110104
added_entries.append(entry)
@@ -115,8 +109,6 @@ def plan_files(self) -> List[ManifestEntry]:
115109
entry for entry in added_entries
116110
if (tuple(entry.partition.values), entry.bucket, entry.file.file_name) not in deleted_entries
117111
]
118-
if self.predicate:
119-
file_entries = self._filter_by_predicate(file_entries)
120112
return file_entries
121113

122114
def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 'FullStartingScanner':
@@ -203,12 +195,6 @@ def _primary_key_filter_by_shard(self, file_entries: List[ManifestEntry]) -> Lis
203195
filtered_entries.append(entry)
204196
return filtered_entries
205197

206-
def _bucket_filter(self, entry: Optional[ManifestEntry]) -> bool:
207-
bucket = entry.bucket
208-
if self.only_read_real_buckets and bucket < 0:
209-
return False
210-
return True
211-
212198
def _apply_push_down_limit(self, splits: List[Split]) -> List[Split]:
213199
if self.limit is None:
214200
return splits
@@ -224,45 +210,26 @@ def _apply_push_down_limit(self, splits: List[Split]) -> List[Split]:
224210

225211
return limited_splits
226212

227-
def _filter_by_predicate(self, file_entries: List[ManifestEntry]) -> List[ManifestEntry]:
228-
if not self.predicate:
229-
return file_entries
230-
231-
filtered_files = []
232-
for file_entry in file_entries:
233-
if self.partition_key_predicate and not self._filter_by_partition(file_entry):
234-
continue
235-
if not self._filter_by_stats(file_entry):
236-
continue
237-
filtered_files.append(file_entry)
238-
239-
return filtered_files
240-
241-
def _filter_by_partition(self, file_entry: ManifestEntry) -> bool:
242-
partition_dict = file_entry.partition.to_dict()
243-
for field_name, conditions in self.partition_key_predicate.items():
244-
partition_value = partition_dict[field_name]
245-
for predicate in conditions:
246-
if not predicate.test_by_value(partition_value):
247-
return False
248-
return True
249-
250-
def _filter_by_stats(self, file_entry: ManifestEntry) -> bool:
251-
if file_entry.kind != 0:
213+
def _filter_manifest_entry(self, entry: ManifestEntry) -> bool:
214+
if self.only_read_real_buckets and entry.bucket < 0:
215+
return False
216+
if self.partition_key_predicate and not self.partition_key_predicate.test(entry.partition):
252217
return False
253218
if self.table.is_primary_key_table:
254219
predicate = self.primary_key_predicate
255-
stats = file_entry.file.key_stats
220+
stats = entry.file.key_stats
256221
else:
257222
predicate = self.predicate
258-
stats = file_entry.file.value_stats
223+
stats = entry.file.value_stats
224+
if not predicate:
225+
return True
259226
return predicate.test_by_stats({
260227
"min_values": stats.min_values.to_dict(),
261228
"max_values": stats.max_values.to_dict(),
262229
"null_counts": {
263230
stats.min_values.fields[i].name: stats.null_counts[i] for i in range(len(stats.min_values.fields))
264231
},
265-
"row_count": file_entry.file.row_count,
232+
"row_count": entry.file.row_count
266233
})
267234

268235
def _create_append_only_splits(self, file_entries: List[ManifestEntry]) -> List['Split']:

paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,7 @@ def plan_files(self) -> List[ManifestEntry]:
4343
for snapshot in snapshots_in_range:
4444
# Get manifest files for this snapshot
4545
manifest_files = self.manifest_list_manager.read_delta(snapshot)
46-
47-
# Read all entries from manifest files
48-
for manifest_file in manifest_files:
49-
entries = self.manifest_file_manager.read(manifest_file.file_name)
50-
file_entries.extend(entries)
51-
if self.predicate:
52-
file_entries = self._filter_by_predicate(file_entries)
46+
file_entries.extend(self.read_manifest_entries(manifest_files))
5347
return file_entries
5448

5549
@staticmethod

paimon-python/pypaimon/read/table_read.py

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@
2222

2323
from pypaimon.common.core_options import CoreOptions
2424
from pypaimon.common.predicate import Predicate
25-
from pypaimon.common.predicate_builder import PredicateBuilder
26-
from pypaimon.read.push_down_utils import extract_predicate_to_list
25+
from pypaimon.read.push_down_utils import filter_predicate_by_fields
2726
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
2827
from pypaimon.read.split import Split
2928
from pypaimon.read.split_read import (MergeFileSplitRead, RawFileSplitRead,
@@ -113,14 +112,10 @@ def _push_down_predicate(self) -> Any:
113112
if self.predicate is None:
114113
return None
115114
elif self.table.is_primary_key_table:
116-
result = []
117-
extract_predicate_to_list(result, self.predicate, self.table.primary_keys)
118-
if result:
119-
# the field index is unused for arrow field
120-
pk_predicates = (PredicateBuilder(self.table.fields).and_predicates(result)).to_arrow()
121-
return pk_predicates
122-
else:
115+
pk_predicate = filter_predicate_by_fields(self.predicate, self.table.primary_keys)
116+
if not pk_predicate:
123117
return None
118+
return pk_predicate.to_arrow()
124119
else:
125120
return self.predicate.to_arrow()
126121

paimon-python/pypaimon/schema/table_schema.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,3 +170,6 @@ def get_trimmed_primary_key_fields(self) -> List[DataField]:
170170
"this will result in only one record in a partition")
171171
field_map = {field.name: field for field in self.fields}
172172
return [field_map[name] for name in adjusted if name in field_map]
173+
174+
def get_trimmed_primary_keys(self) -> List[str]:
175+
return [field.name for field in self.get_trimmed_primary_key_fields()]

0 commit comments

Comments
 (0)