-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Expand file tree
/
Copy pathstreaming_table_scan.py
More file actions
435 lines (371 loc) · 18.6 KB
/
streaming_table_scan.py
File metadata and controls
435 lines (371 loc) · 18.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
"""
AsyncStreamingTableScan for continuous streaming reads from Paimon tables.
This module provides async-based streaming reads that continuously poll for
new snapshots and yield Plans as new data arrives. It is the Python equivalent
of Java's DataTableStreamScan.
"""
import asyncio
import logging
import os
from concurrent.futures import Future, ThreadPoolExecutor
from typing import AsyncGenerator, Callable, Iterator, List, Optional
from pypaimon.common.options.core_options import ChangelogProducer
from pypaimon.common.predicate import Predicate
from pypaimon.consumer.consumer import Consumer
from pypaimon.consumer.consumer_manager import ConsumerManager
from pypaimon.manifest.manifest_file_manager import ManifestFileManager
from pypaimon.manifest.manifest_list_manager import ManifestListManager
from pypaimon.read.plan import Plan
from pypaimon.read.scanner.append_table_split_generator import \
AppendTableSplitGenerator
from pypaimon.read.scanner.changelog_follow_up_scanner import \
ChangelogFollowUpScanner
from pypaimon.read.scanner.delta_follow_up_scanner import DeltaFollowUpScanner
from pypaimon.read.scanner.file_scanner import FileScanner
from pypaimon.read.scanner.follow_up_scanner import FollowUpScanner
from pypaimon.read.scanner.incremental_diff_scanner import \
IncrementalDiffScanner
from pypaimon.read.scanner.primary_key_table_split_generator import \
PrimaryKeyTableSplitGenerator
from pypaimon.snapshot.snapshot import Snapshot
from pypaimon.snapshot.snapshot_manager import SnapshotManager
class AsyncStreamingTableScan:
"""
Async streaming table scan for continuous reads from Paimon tables.
This class provides an async iterator that continuously polls for new
snapshots and yields Plans containing splits for new data.
Usage:
scan = AsyncStreamingTableScan(table)
async for plan in scan.stream():
for split in plan.splits():
# Process the data
pass
For synchronous usage:
for plan in scan.stream_sync():
process(plan)
"""
def __init__(
self,
table,
predicate: Optional[Predicate] = None,
poll_interval_ms: int = 1000,
follow_up_scanner: Optional[FollowUpScanner] = None,
bucket_filter: Optional[Callable[[int], bool]] = None,
prefetch_enabled: bool = True,
diff_threshold: int = 10,
consumer_id: Optional[str] = None,
scan_from=None
):
"""Initialize the streaming table scan."""
self.table = table
self.predicate = predicate
self.poll_interval = poll_interval_ms / 1000.0
# Bucket filter for parallel consumption
self._bucket_filter = bucket_filter
# Diff-based catch-up configuration
self._diff_threshold = diff_threshold
self._catch_up_in_progress = False
# Prefetching configuration
self._prefetch_enabled = prefetch_enabled
self._prefetch_future: Optional[Future] = None
self._prefetch_snapshot_id: Optional[int] = None
self._lookahead_skips = 0
self._prefetch_executor = ThreadPoolExecutor(max_workers=1) if prefetch_enabled else None
self._lookahead_size = 10 # How many snapshots to look ahead
# Initialize managers
self._snapshot_manager = SnapshotManager(table)
self._manifest_list_manager = ManifestListManager(table)
self._manifest_file_manager = ManifestFileManager(table)
# Consumer management for persisting streaming progress
self._consumer_id = consumer_id
self._consumer_manager = (
ConsumerManager(table.file_io, table.table_path)
if consumer_id else None
)
# Scanner for determining which snapshots to read
# Auto-select based on changelog-producer if not explicitly provided
self.follow_up_scanner = follow_up_scanner or self._create_follow_up_scanner()
# Starting position (set via StreamReadBuilder.with_scan_from)
self._scan_from = scan_from
# State tracking
self.next_snapshot_id: Optional[int] = None
self._pending_consumer_snapshot: Optional[int] = None
async def stream(self) -> AsyncGenerator[Plan, None]:
"""Yield Plans as new snapshots appear.
On first call, performs an initial full scan of the latest snapshot.
Subsequent iterations poll for new snapshots and yield delta Plans.
Yields:
Plan objects containing splits for reading
"""
# Restore from consumer if available (highest priority — overrides scan_from)
if self.next_snapshot_id is None and self._consumer_manager:
consumer = self._consumer_manager.consumer(self._consumer_id)
if consumer:
self.next_snapshot_id = consumer.next_snapshot
# Resolve scan_from if no position has been set yet (no consumer restore)
if self.next_snapshot_id is None and self._scan_from is not None:
scan_from = self._scan_from
if scan_from == "earliest":
earliest_snapshot = self._snapshot_manager.try_get_earliest_snapshot()
if earliest_snapshot is not None:
self.next_snapshot_id = earliest_snapshot.id + 1
self._stage_consumer()
yield self._create_initial_plan(earliest_snapshot)
self._flush_pending_consumer()
# If no snapshot exists yet, fall through to the polling loop
elif scan_from != "latest":
# Numeric snapshot ID — set directly; diff catch-up handles large gaps
self.next_snapshot_id = int(scan_from)
# Initial scan (default "latest" path, also taken when scan_from is None or "latest")
if self.next_snapshot_id is None:
latest_snapshot = self._snapshot_manager.get_latest_snapshot()
if latest_snapshot:
self.next_snapshot_id = latest_snapshot.id + 1
self._stage_consumer()
yield self._create_initial_plan(latest_snapshot)
# Resumes here when caller calls __anext__() — after caller processed the plan.
self._flush_pending_consumer()
# Check for catch-up scenario: starting from earlier snapshot with large gap.
# This block only executes once per stream() call (before the while True loop).
# Handles --from snapshot:X with many snapshots to process.
if self._should_use_diff_catch_up():
self._catch_up_in_progress = True
try:
latest_snapshot = self._snapshot_manager.get_latest_snapshot()
if latest_snapshot and self.next_snapshot_id:
catch_up_plan = self._create_catch_up_plan(
self.next_snapshot_id,
latest_snapshot
)
self.next_snapshot_id = latest_snapshot.id + 1
self._stage_consumer()
yield catch_up_plan
# Resumes here when caller calls __anext__().
self._flush_pending_consumer()
finally:
self._catch_up_in_progress = False
# Follow-up polling loop with lookahead and optional prefetching
while True:
# Flush any consumer position staged by the previous yield before doing more work.
self._flush_pending_consumer()
plan = None
snapshot_processed = False # Track if we processed (or skipped) a snapshot
# Check if we have a prefetched result ready
prefetch_used = False
if self._prefetch_future is not None:
try:
# Wait for the prefetch thread to complete
# Returns (plan, next_id, skipped_count) tuple
prefetch_result = self._prefetch_future.result(timeout=30)
prefetch_used = True
if prefetch_result is not None:
prefetch_plan, next_id, skipped_count = prefetch_result
self._lookahead_skips += skipped_count
self.next_snapshot_id = next_id
snapshot_processed = skipped_count > 0 or prefetch_plan is not None
if prefetch_plan is not None:
plan = prefetch_plan
except Exception:
# Prefetch failed, fall back to synchronous
prefetch_used = False
finally:
self._prefetch_future = None
self._prefetch_snapshot_id = None
# If prefetch wasn't available or failed, use lookahead to find next scannable
if not prefetch_used:
# Use batch lookahead to find the next scannable snapshot
snapshot, next_id, skipped_count = self._snapshot_manager.find_next_scannable(
self.next_snapshot_id,
self.follow_up_scanner.should_scan,
lookahead_size=self._lookahead_size
)
self._lookahead_skips += skipped_count
self.next_snapshot_id = next_id
# Check if we found a scannable snapshot or skipped some
snapshot_processed = skipped_count > 0 or snapshot is not None
if snapshot is not None:
plan = self._create_follow_up_plan(snapshot)
if plan is not None:
# Start prefetching next scannable snapshot before yielding
if self._prefetch_enabled:
self._start_prefetch(self.next_snapshot_id)
self._stage_consumer()
yield plan
# _flush_pending_consumer() is called at the top of the next iteration.
elif not snapshot_processed:
# No snapshot available yet, wait and poll again
await asyncio.sleep(self.poll_interval)
# If snapshots were processed but plan is None (all skipped), continue loop immediately
def stream_sync(self) -> Iterator[Plan]:
"""
Synchronous wrapper for stream().
Provides a blocking iterator for use in non-async code.
Yields:
Plan objects containing splits for reading
"""
loop = asyncio.new_event_loop()
try:
async_gen = self.stream()
while True:
try:
plan = loop.run_until_complete(async_gen.__anext__())
yield plan
except StopAsyncIteration:
break
finally:
loop.close()
def _stage_consumer(self) -> None:
"""Stage next_snapshot_id to be written to disk on the next generator resume."""
if self._consumer_manager and self._consumer_id and self.next_snapshot_id is not None:
self._pending_consumer_snapshot = self.next_snapshot_id
def _flush_pending_consumer(self) -> None:
"""Flush the staged consumer position to disk.
Called at the resume point after each yield — i.e. when the caller calls
__anext__() to request the next plan, which happens after the caller's loop
body (to_arrow + sink write) has completed. This gives at-least-once semantics:
the consumer file is only advanced after the caller has processed the prior plan.
"""
if self._consumer_manager and self._consumer_id and self._pending_consumer_snapshot is not None:
self._consumer_manager.reset_consumer(
self._consumer_id,
Consumer(next_snapshot=self._pending_consumer_snapshot)
)
self._pending_consumer_snapshot = None
def _start_prefetch(self, snapshot_id: int) -> None:
"""Start prefetching the next scannable snapshot in a background thread."""
if self._prefetch_future is not None or self._prefetch_executor is None:
return # Already prefetching or executor not available
self._prefetch_snapshot_id = snapshot_id
# Submit to thread pool - this starts immediately, not when event loop runs
self._prefetch_future = self._prefetch_executor.submit(
self._fetch_plan_with_lookahead,
snapshot_id
)
def _fetch_plan_with_lookahead(self, start_id: int) -> Optional[tuple]:
"""Find next scannable snapshot via lookahead and create a plan. Runs in thread pool."""
try:
snapshot, next_id, skipped_count = self._snapshot_manager.find_next_scannable(
start_id,
self.follow_up_scanner.should_scan,
lookahead_size=self._lookahead_size
)
if snapshot is None:
return (None, next_id, skipped_count)
plan = self._create_follow_up_plan(snapshot)
return (plan, next_id, skipped_count)
except Exception:
logging.exception("Prefetch failed for snapshot_id=%d; falling back to synchronous", start_id)
return None
def _create_follow_up_plan(self, snapshot: Snapshot) -> Plan:
"""Route to changelog or delta plan based on scanner type."""
if isinstance(self.follow_up_scanner, ChangelogFollowUpScanner):
return self._create_changelog_plan(snapshot)
else:
return self._create_delta_plan(snapshot)
def _create_follow_up_scanner(self) -> FollowUpScanner:
"""Create the appropriate follow-up scanner based on changelog-producer option."""
changelog_producer = self.table.options.changelog_producer()
if changelog_producer == ChangelogProducer.NONE:
return DeltaFollowUpScanner()
else:
# INPUT, FULL_COMPACTION, LOOKUP all use changelog scanner
return ChangelogFollowUpScanner()
def _filter_entries_for_shard(self, entries: List) -> List:
"""Filter manifest entries by bucket filter, if set."""
if self._bucket_filter is not None:
return [e for e in entries if self._bucket_filter(e.bucket)]
return entries
def _create_initial_plan(self, snapshot: Snapshot) -> Plan:
"""Create a Plan for the initial full scan of the latest snapshot."""
def all_manifests():
return self._manifest_list_manager.read_all(snapshot)
starting_scanner = FileScanner(
self.table,
all_manifests,
predicate=self.predicate,
limit=None
)
return starting_scanner.scan()
def _create_delta_plan(self, snapshot: Snapshot) -> Plan:
"""Read new files from delta_manifest_list (changelog-producer=none)."""
manifest_files = self._manifest_list_manager.read_delta(snapshot)
return self._create_plan_from_manifests(manifest_files)
def _create_changelog_plan(self, snapshot: Snapshot) -> Plan:
"""Read from changelog_manifest_list (changelog-producer=input/full-compaction/lookup)."""
manifest_files = self._manifest_list_manager.read_changelog(snapshot)
return self._create_plan_from_manifests(manifest_files)
def _create_plan_from_manifests(self, manifest_files: List) -> Plan:
"""Create splits from manifest files, applying shard filtering."""
if not manifest_files:
return Plan([])
# Use configurable parallelism from table options
max_workers = max(8, self.table.options.scan_manifest_parallelism(os.cpu_count() or 8))
# Read manifest entries from manifest files
entries = self._manifest_file_manager.read_entries_parallel(
manifest_files,
manifest_entry_filter=None,
max_workers=max_workers
)
# Apply shard/bucket filtering for parallel consumption
entries = self._filter_entries_for_shard(entries) if entries else []
if not entries:
return Plan([])
# Get split options from table
options = self.table.options
target_split_size = options.source_split_target_size()
open_file_cost = options.source_split_open_file_cost()
# Create appropriate split generator based on table type
if self.table.is_primary_key_table:
split_generator = PrimaryKeyTableSplitGenerator(
self.table,
target_split_size,
open_file_cost,
deletion_files_map={}
)
else:
split_generator = AppendTableSplitGenerator(
self.table,
target_split_size,
open_file_cost,
deletion_files_map={}
)
splits = split_generator.create_splits(entries)
return Plan(splits)
def _should_use_diff_catch_up(self) -> bool:
"""Check if diff-based catch-up should be used (large gap to latest)."""
if self._catch_up_in_progress:
return False
if self.next_snapshot_id is None:
return False
latest = self._snapshot_manager.get_latest_snapshot()
if latest is None:
return False
gap = latest.id - self.next_snapshot_id
return gap > self._diff_threshold
def _create_catch_up_plan(self, start_id: int, end_snapshot: Snapshot) -> Plan:
"""Create a catch-up plan using diff-based scanning between start and end snapshots."""
# Get start snapshot (one before where we want to start reading).
# If start_id is 0 or 1, fall back to a full scan of end_snapshot.
start_snapshot = None
if start_id > 1:
start_snapshot = self._snapshot_manager.get_snapshot_by_id(start_id - 1)
if start_snapshot is None:
return self._create_initial_plan(end_snapshot)
return IncrementalDiffScanner(self.table).scan(start_snapshot, end_snapshot)