Skip to content

Commit 8bda95e

Browse files
discivigourJingsongLi
authored andcommitted
[Python] Introduce incremental-between read by timestamp (#6391)
1 parent 0e05b7e commit 8bda95e

25 files changed

Lines changed: 1008 additions & 495 deletions

docs/content/program-api/python-api.md

Lines changed: 173 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,8 @@ for batch in table_read.to_arrow_batch_reader(splits):
286286
```
287287

288288
#### Python Iterator
289-
You can read the data row by row into a native Python iterator.
289+
290+
You can read the data row by row into a native Python iterator.
290291
This is convenient for custom row-based processing logic.
291292

292293
```python
@@ -365,23 +366,177 @@ print(ray_dataset.to_pandas())
365366
# ...
366367
```
367368

369+
### Incremental Read Between Timestamps
370+
371+
This API allows reading data committed between two snapshot timestamps. The steps are as follows.
372+
373+
- Set the option `CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP` on a copied table via `table.copy({...})`. The value must
374+
be a string: `"startMillis,endMillis"`, where `startMillis` is exclusive and `endMillis` is inclusive.
375+
- Use `SnapshotManager` to obtain snapshot timestamps or you can determine them by yourself.
376+
- Read the data as above.
377+
378+
Example:
379+
380+
```python
381+
from pypaimon import CatalogFactory
382+
from pypaimon.common.core_options import CoreOptions
383+
from pypaimon.snapshot.snapshot_manager import SnapshotManager
384+
385+
# Prepare catalog and obtain a table
386+
catalog = CatalogFactory.create({'warehouse': '/path/to/warehouse'})
387+
table = catalog.get_table('default.your_table_name')
388+
389+
# Assume the table has at least two snapshots (1 and 2)
390+
snapshot_manager = SnapshotManager(table)
391+
t1 = snapshot_manager.get_snapshot_by_id(1).time_millis
392+
t2 = snapshot_manager.get_snapshot_by_id(2).time_millis
393+
394+
# Read records committed between [t1, t2]
395+
table_inc = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP: f"{t1},{t2}"})
396+
397+
read_builder = table_inc.new_read_builder()
398+
table_scan = read_builder.new_scan()
399+
table_read = read_builder.new_read()
400+
splits = table_scan.plan().splits()
401+
402+
# To Arrow
403+
arrow_table = table_read.to_arrow(splits)
404+
405+
# Or to pandas
406+
pandas_df = table_read.to_pandas(splits)
407+
```
408+
409+
### Shard Read
410+
411+
Shard Read allows you to read data in parallel by dividing the table into multiple shards. This is useful for
412+
distributed processing and parallel computation.
413+
414+
You can specify the shard index and total number of shards to read a specific portion of the data:
415+
416+
```python
417+
# Prepare read builder
418+
table = catalog.get_table('database_name.table_name')
419+
read_builder = table.new_read_builder()
420+
table_read = read_builder.new_read()
421+
422+
# Read the second shard (index 1) out of 3 total shards
423+
splits = read_builder.new_scan().with_shard(1, 3).plan().splits()
424+
425+
# Read all shards and concatenate results
426+
splits1 = read_builder.new_scan().with_shard(0, 3).plan().splits()
427+
splits2 = read_builder.new_scan().with_shard(1, 3).plan().splits()
428+
splits3 = read_builder.new_scan().with_shard(2, 3).plan().splits()
429+
430+
# Combine results from all shards
431+
432+
all_splits = splits1 + splits2 + splits3
433+
pa_table = table_read.to_arrow(all_splits)
434+
```
435+
436+
Example with shard read:
437+
438+
```python
439+
import pyarrow as pa
440+
from pypaimon import CatalogFactory, Schema
441+
442+
# Create catalog
443+
catalog_options = {'warehouse': 'file:///path/to/warehouse'}
444+
catalog = CatalogFactory.create(catalog_options)
445+
catalog.create_database("default", False)
446+
# Define schema
447+
pa_schema = pa.schema([
448+
('user_id', pa.int64()),
449+
('item_id', pa.int64()),
450+
('behavior', pa.string()),
451+
('dt', pa.string()),
452+
])
453+
454+
# Create table and write data
455+
schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['dt'])
456+
catalog.create_table('default.test_table', schema, False)
457+
table = catalog.get_table('default.test_table')
458+
459+
# Write data in two batches
460+
write_builder = table.new_batch_write_builder()
461+
462+
# First write
463+
table_write = write_builder.new_write()
464+
table_commit = write_builder.new_commit()
465+
data1 = {
466+
'user_id': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14],
467+
'item_id': [1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009, 1010, 1011, 1012, 1013, 1014],
468+
'behavior': ['a', 'b', 'c', None, 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm'],
469+
'dt': ['p1', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p1'],
470+
}
471+
pa_table = pa.Table.from_pydict(data1, schema=pa_schema)
472+
table_write.write_arrow(pa_table)
473+
table_commit.commit(table_write.prepare_commit())
474+
table_write.close()
475+
table_commit.close()
476+
477+
# Second write
478+
table_write = write_builder.new_write()
479+
table_commit = write_builder.new_commit()
480+
data2 = {
481+
'user_id': [5, 6, 7, 8, 18],
482+
'item_id': [1005, 1006, 1007, 1008, 1018],
483+
'behavior': ['e', 'f', 'g', 'h', 'z'],
484+
'dt': ['p2', 'p1', 'p2', 'p2', 'p1'],
485+
}
486+
pa_table = pa.Table.from_pydict(data2, schema=pa_schema)
487+
table_write.write_arrow(pa_table)
488+
table_commit.commit(table_write.prepare_commit())
489+
table_write.close()
490+
table_commit.close()
491+
492+
# Read specific shard
493+
read_builder = table.new_read_builder()
494+
table_read = read_builder.new_read()
495+
496+
# Read shard 2 out of 3 total shards
497+
splits = read_builder.new_scan().with_shard(2, 3).plan().splits()
498+
shard_data = table_read.to_arrow(splits)
499+
500+
# Verify shard distribution by reading all shards
501+
splits1 = read_builder.new_scan().with_shard(0, 3).plan().splits()
502+
splits2 = read_builder.new_scan().with_shard(1, 3).plan().splits()
503+
splits3 = read_builder.new_scan().with_shard(2, 3).plan().splits()
504+
505+
# Combine all shards should equal full table read
506+
all_shards_data = pa.concat_tables([
507+
table_read.to_arrow(splits1),
508+
table_read.to_arrow(splits2),
509+
table_read.to_arrow(splits3),
510+
])
511+
full_table_data = table_read.to_arrow(read_builder.new_scan().plan().splits())
512+
```
513+
514+
Key points about shard read:
515+
516+
- **Shard Index**: Zero-based index of the shard to read (0 to total_shards-1)
517+
- **Total Shards**: Total number of shards to divide the data into
518+
- **Data Distribution**: Data is distributed evenly across shards, with remainder rows going to the last shard
519+
- **Parallel Processing**: Each shard can be processed independently for better performance
520+
- **Consistency**: Combining all shards should produce the complete table data
521+
368522
## Data Types
369-
| Python Native Type | PyArrow Type | Paimon Type |
370-
| :--- | :--- | :--- |
371-
| `int` | `pyarrow.int8()` | `TINYINT` |
372-
| `int` | `pyarrow.int16()` | `SMALLINT` |
373-
| `int` | `pyarrow.int32()` | `INT` |
374-
| `int` | `pyarrow.int64()` | `BIGINT` |
375-
| `float` | `pyarrow.float32()` | `FLOAT` |
376-
| `float` | `pyarrow.float64()` | `DOUBLE` |
377-
| `bool` | `pyarrow.bool_()` | `BOOLEAN` |
378-
| `str` | `pyarrow.string()` | `STRING`, `CHAR(n)`, `VARCHAR(n)` |
379-
| `bytes` | `pyarrow.binary()` | `BYTES`, `VARBINARY(n)` |
380-
| `bytes` | `pyarrow.binary(length)` | `BINARY(length)` |
381-
| `decimal.Decimal` | `pyarrow.decimal128(precision, scale)` | `DECIMAL(precision, scale)` |
382-
| `datetime.datetime` | `pyarrow.timestamp(unit, tz=None)` | `TIMESTAMP(p)` |
383-
| `datetime.date` | `pyarrow.date32()` | `DATE` |
384-
| `datetime.time` | `pyarrow.time32(unit)` or `pyarrow.time64(unit)` | `TIME(p)` |
523+
524+
| Python Native Type | PyArrow Type | Paimon Type |
525+
|:--------------------|:-------------------------------------------------|:----------------------------------|
526+
| `int` | `pyarrow.int8()` | `TINYINT` |
527+
| `int` | `pyarrow.int16()` | `SMALLINT` |
528+
| `int` | `pyarrow.int32()` | `INT` |
529+
| `int` | `pyarrow.int64()` | `BIGINT` |
530+
| `float` | `pyarrow.float32()` | `FLOAT` |
531+
| `float` | `pyarrow.float64()` | `DOUBLE` |
532+
| `bool` | `pyarrow.bool_()` | `BOOLEAN` |
533+
| `str` | `pyarrow.string()` | `STRING`, `CHAR(n)`, `VARCHAR(n)` |
534+
| `bytes` | `pyarrow.binary()` | `BYTES`, `VARBINARY(n)` |
535+
| `bytes` | `pyarrow.binary(length)` | `BINARY(length)` |
536+
| `decimal.Decimal` | `pyarrow.decimal128(precision, scale)` | `DECIMAL(precision, scale)` |
537+
| `datetime.datetime` | `pyarrow.timestamp(unit, tz=None)` | `TIMESTAMP(p)` |
538+
| `datetime.date` | `pyarrow.date32()` | `DATE` |
539+
| `datetime.time` | `pyarrow.time32(unit)` or `pyarrow.time64(unit)` | `TIME(p)` |
385540

386541
## Predicate
387542

@@ -402,5 +557,4 @@ print(ray_dataset.to_pandas())
402557
| f.contains(literal) | PredicateBuilder.contains(f, literal) |
403558
| f is in [l1, l2] | PredicateBuilder.is_in(f, [l1, l2]) |
404559
| f is not in [l1, l2] | PredicateBuilder.is_not_in(f, [l1, l2]) |
405-
| lower <= f <= upper | PredicateBuilder.between(f, lower, upper) |
406-
560+
| lower <= f <= upper | PredicateBuilder.between(f, lower, upper) |

paimon-python/pypaimon/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18-
from pypaimon.filesystem.pvfs import PaimonVirtualFileSystem
1918
from pypaimon.catalog.catalog_factory import CatalogFactory
19+
from pypaimon.filesystem.pvfs import PaimonVirtualFileSystem
2020
from pypaimon.schema.schema import Schema
2121

2222
__version__ = "0.3.dev"

paimon-python/pypaimon/common/core_options.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,5 +45,6 @@ def __str__(self):
4545
FILE_BLOCK_SIZE = "file.block-size"
4646
# Scan options
4747
SCAN_FALLBACK_BRANCH = "scan.fallback-branch"
48+
INCREMENTAL_BETWEEN_TIMESTAMP = "incremental-between-timestamp"
4849
# Commit options
4950
COMMIT_USER_PREFIX = "commit.user-prefix"

paimon-python/pypaimon/filesystem/pvfs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
from fsspec.implementations.local import LocalFileSystem
3030
from readerwriterlock import rwlock
3131

32-
from pypaimon.api.api_response import GetTableTokenResponse, GetTableResponse
32+
from pypaimon.api.api_response import GetTableResponse, GetTableTokenResponse
3333
from pypaimon.api.client import AlreadyExistsException, NoSuchResourceException
3434
from pypaimon.api.rest_api import RESTApi
3535
from pypaimon.common.config import CatalogOptions, OssOptions, PVFSOptions

paimon-python/pypaimon/manifest/manifest_file_manager.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,11 @@
1515
# See the License for the specific language governing permissions and
1616
# limitations under the License.
1717
################################################################################
18-
import fastavro
1918
from io import BytesIO
2019
from typing import List
2120

21+
import fastavro
22+
2223
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
2324
from pypaimon.manifest.schema.manifest_entry import (MANIFEST_ENTRY_SCHEMA,
2425
ManifestEntry)

paimon-python/pypaimon/manifest/manifest_list_manager.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ def read_all(self, snapshot: Snapshot) -> List[ManifestFileMeta]:
4747
manifest_files.extend(delta_manifests)
4848
return manifest_files
4949

50+
def read_delta(self, snapshot: Snapshot) -> List[ManifestFileMeta]:
51+
return self.read(snapshot.delta_manifest_list)
52+
5053
def read(self, manifest_list_name: str) -> List[ManifestFileMeta]:
5154
manifest_files = []
5255

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
"""
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
"""
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
"""
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
"""
18+
from pypaimon.read.plan import Plan
19+
from pypaimon.read.scanner.starting_scanner import StartingScanner
20+
21+
22+
class EmptyStartingScanner(StartingScanner):
23+
24+
def scan(self) -> Plan:
25+
return Plan([])

0 commit comments

Comments
 (0)