Skip to content

Commit 3240fd3

Browse files
committed
add_doc
1 parent 913a469 commit 3240fd3

5 files changed

Lines changed: 219 additions & 31 deletions

File tree

docs/content/program-api/python-api.md

Lines changed: 173 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,8 @@ for batch in table_read.to_arrow_batch_reader(splits):
286286
```
287287

288288
#### Python Iterator
289-
You can read the data row by row into a native Python iterator.
289+
290+
You can read the data row by row into a native Python iterator.
290291
This is convenient for custom row-based processing logic.
291292

292293
```python
@@ -365,23 +366,177 @@ print(ray_dataset.to_pandas())
365366
# ...
366367
```
367368

369+
### Incremental Read Between Timestamps
370+
371+
This API allows reading data committed between two snapshot timestamps. The steps are as follows.
372+
373+
- Set the option `CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP` on a copied table via `table.copy({...})`. The value must
374+
be a string: `"startMillis,endMillis"`, where `startMillis` is exclusive and `endMillis` is inclusive.
375+
- Use `SnapshotManager` to obtain snapshot timestamps or you can determine them by yourself.
376+
- Read the data as above.
377+
378+
Example:
379+
380+
```python
381+
from pypaimon import CatalogFactory
382+
from pypaimon.common.core_options import CoreOptions
383+
from pypaimon.snapshot.snapshot_manager import SnapshotManager
384+
385+
# Prepare catalog and obtain a table
386+
catalog = CatalogFactory.create({'warehouse': '/path/to/warehouse'})
387+
table = catalog.get_table('default.your_table_name')
388+
389+
# Assume the table has at least two snapshots (1 and 2)
390+
snapshot_manager = SnapshotManager(table)
391+
t1 = snapshot_manager.get_snapshot_by_id(1).time_millis
392+
t2 = snapshot_manager.get_snapshot_by_id(2).time_millis
393+
394+
# Read records committed between [t1, t2]
395+
table_inc = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP: f"{t1},{t2}"})
396+
397+
read_builder = table_inc.new_read_builder()
398+
table_scan = read_builder.new_scan()
399+
table_read = read_builder.new_read()
400+
splits = table_scan.plan().splits()
401+
402+
# To Arrow
403+
arrow_table = table_read.to_arrow(splits)
404+
405+
# Or to pandas
406+
pandas_df = table_read.to_pandas(splits)
407+
```
408+
409+
### Shard Read
410+
411+
Shard Read allows you to read data in parallel by dividing the table into multiple shards. This is useful for
412+
distributed processing and parallel computation.
413+
414+
You can specify the shard index and total number of shards to read a specific portion of the data:
415+
416+
```python
417+
# Prepare read builder
418+
table = catalog.get_table('database_name.table_name')
419+
read_builder = table.new_read_builder()
420+
table_read = read_builder.new_read()
421+
422+
# Read the second shard (index 1) out of 3 total shards
423+
splits = read_builder.new_scan().with_shard(1, 3).plan().splits()
424+
425+
# Read all shards and concatenate results
426+
splits1 = read_builder.new_scan().with_shard(0, 3).plan().splits()
427+
splits2 = read_builder.new_scan().with_shard(1, 3).plan().splits()
428+
splits3 = read_builder.new_scan().with_shard(2, 3).plan().splits()
429+
430+
# Combine results from all shards
431+
432+
all_splits = splits1 + splits2 + splits3
433+
pa_table = table_read.to_arrow(all_splits)
434+
```
435+
436+
Example with shard read:
437+
438+
```python
439+
import pyarrow as pa
440+
from pypaimon import CatalogFactory, Schema
441+
442+
# Create catalog
443+
catalog_options = {'warehouse': 'file:///path/to/warehouse'}
444+
catalog = CatalogFactory.create(catalog_options)
445+
catalog.create_database("default", False)
446+
# Define schema
447+
pa_schema = pa.schema([
448+
('user_id', pa.int64()),
449+
('item_id', pa.int64()),
450+
('behavior', pa.string()),
451+
('dt', pa.string()),
452+
])
453+
454+
# Create table and write data
455+
schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['dt'])
456+
catalog.create_table('default.test_table', schema, False)
457+
table = catalog.get_table('default.test_table')
458+
459+
# Write data in two batches
460+
write_builder = table.new_batch_write_builder()
461+
462+
# First write
463+
table_write = write_builder.new_write()
464+
table_commit = write_builder.new_commit()
465+
data1 = {
466+
'user_id': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14],
467+
'item_id': [1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009, 1010, 1011, 1012, 1013, 1014],
468+
'behavior': ['a', 'b', 'c', None, 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm'],
469+
'dt': ['p1', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p1'],
470+
}
471+
pa_table = pa.Table.from_pydict(data1, schema=pa_schema)
472+
table_write.write_arrow(pa_table)
473+
table_commit.commit(table_write.prepare_commit())
474+
table_write.close()
475+
table_commit.close()
476+
477+
# Second write
478+
table_write = write_builder.new_write()
479+
table_commit = write_builder.new_commit()
480+
data2 = {
481+
'user_id': [5, 6, 7, 8, 18],
482+
'item_id': [1005, 1006, 1007, 1008, 1018],
483+
'behavior': ['e', 'f', 'g', 'h', 'z'],
484+
'dt': ['p2', 'p1', 'p2', 'p2', 'p1'],
485+
}
486+
pa_table = pa.Table.from_pydict(data2, schema=pa_schema)
487+
table_write.write_arrow(pa_table)
488+
table_commit.commit(table_write.prepare_commit())
489+
table_write.close()
490+
table_commit.close()
491+
492+
# Read specific shard
493+
read_builder = table.new_read_builder()
494+
table_read = read_builder.new_read()
495+
496+
# Read shard 2 out of 3 total shards
497+
splits = read_builder.new_scan().with_shard(2, 3).plan().splits()
498+
shard_data = table_read.to_arrow(splits)
499+
500+
# Verify shard distribution by reading all shards
501+
splits1 = read_builder.new_scan().with_shard(0, 3).plan().splits()
502+
splits2 = read_builder.new_scan().with_shard(1, 3).plan().splits()
503+
splits3 = read_builder.new_scan().with_shard(2, 3).plan().splits()
504+
505+
# Combine all shards should equal full table read
506+
all_shards_data = pa.concat_tables([
507+
table_read.to_arrow(splits1),
508+
table_read.to_arrow(splits2),
509+
table_read.to_arrow(splits3),
510+
])
511+
full_table_data = table_read.to_arrow(read_builder.new_scan().plan().splits())
512+
```
513+
514+
Key points about shard read:
515+
516+
- **Shard Index**: Zero-based index of the shard to read (0 to total_shards-1)
517+
- **Total Shards**: Total number of shards to divide the data into
518+
- **Data Distribution**: Data is distributed evenly across shards, with remainder rows going to the last shard
519+
- **Parallel Processing**: Each shard can be processed independently for better performance
520+
- **Consistency**: Combining all shards should produce the complete table data
521+
368522
## Data Types
369-
| Python Native Type | PyArrow Type | Paimon Type |
370-
| :--- | :--- | :--- |
371-
| `int` | `pyarrow.int8()` | `TINYINT` |
372-
| `int` | `pyarrow.int16()` | `SMALLINT` |
373-
| `int` | `pyarrow.int32()` | `INT` |
374-
| `int` | `pyarrow.int64()` | `BIGINT` |
375-
| `float` | `pyarrow.float32()` | `FLOAT` |
376-
| `float` | `pyarrow.float64()` | `DOUBLE` |
377-
| `bool` | `pyarrow.bool_()` | `BOOLEAN` |
378-
| `str` | `pyarrow.string()` | `STRING`, `CHAR(n)`, `VARCHAR(n)` |
379-
| `bytes` | `pyarrow.binary()` | `BYTES`, `VARBINARY(n)` |
380-
| `bytes` | `pyarrow.binary(length)` | `BINARY(length)` |
381-
| `decimal.Decimal` | `pyarrow.decimal128(precision, scale)` | `DECIMAL(precision, scale)` |
382-
| `datetime.datetime` | `pyarrow.timestamp(unit, tz=None)` | `TIMESTAMP(p)` |
383-
| `datetime.date` | `pyarrow.date32()` | `DATE` |
384-
| `datetime.time` | `pyarrow.time32(unit)` or `pyarrow.time64(unit)` | `TIME(p)` |
523+
524+
| Python Native Type | PyArrow Type | Paimon Type |
525+
|:--------------------|:-------------------------------------------------|:----------------------------------|
526+
| `int` | `pyarrow.int8()` | `TINYINT` |
527+
| `int` | `pyarrow.int16()` | `SMALLINT` |
528+
| `int` | `pyarrow.int32()` | `INT` |
529+
| `int` | `pyarrow.int64()` | `BIGINT` |
530+
| `float` | `pyarrow.float32()` | `FLOAT` |
531+
| `float` | `pyarrow.float64()` | `DOUBLE` |
532+
| `bool` | `pyarrow.bool_()` | `BOOLEAN` |
533+
| `str` | `pyarrow.string()` | `STRING`, `CHAR(n)`, `VARCHAR(n)` |
534+
| `bytes` | `pyarrow.binary()` | `BYTES`, `VARBINARY(n)` |
535+
| `bytes` | `pyarrow.binary(length)` | `BINARY(length)` |
536+
| `decimal.Decimal` | `pyarrow.decimal128(precision, scale)` | `DECIMAL(precision, scale)` |
537+
| `datetime.datetime` | `pyarrow.timestamp(unit, tz=None)` | `TIMESTAMP(p)` |
538+
| `datetime.date` | `pyarrow.date32()` | `DATE` |
539+
| `datetime.time` | `pyarrow.time32(unit)` or `pyarrow.time64(unit)` | `TIME(p)` |
385540

386541
## Predicate
387542

@@ -402,5 +557,4 @@ print(ray_dataset.to_pandas())
402557
| f.contains(literal) | PredicateBuilder.contains(f, literal) |
403558
| f is in [l1, l2] | PredicateBuilder.is_in(f, [l1, l2]) |
404559
| f is not in [l1, l2] | PredicateBuilder.is_not_in(f, [l1, l2]) |
405-
| lower <= f <= upper | PredicateBuilder.between(f, lower, upper) |
406-
560+
| lower <= f <= upper | PredicateBuilder.between(f, lower, upper) |

paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ def between_timestamps(table, predicate: Optional[Predicate], limit: Optional[in
6262
"""
6363
snapshot_manager = SnapshotManager(table)
6464
starting_snapshot = snapshot_manager.earlier_or_equal_time_mills(start_timestamp)
65-
earliest_snapshot = snapshot_manager.get_earliest_snapshot()
65+
earliest_snapshot = snapshot_manager.try_get_earliest_snapshot()
6666

6767
# If earliest_snapshot.time_millis > start_timestamp we should include the earliest_snapshot
6868
if starting_snapshot is None or (earliest_snapshot and earliest_snapshot.time_millis > start_timestamp):

paimon-python/pypaimon/read/table_scan.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ def _create_starting_scanner(self) -> Optional[StartingScanner]:
6464
raise ValueError(
6565
"The incremental-between-timestamp must specific start(exclusive) and end timestamp. But is: " +
6666
options[CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP])
67-
earliest_snapshot = SnapshotManager(self.table).get_earliest_snapshot()
67+
earliest_snapshot = SnapshotManager(self.table).try_get_earliest_snapshot()
6868
latest_snapshot = SnapshotManager(self.table).get_latest_snapshot()
6969
if earliest_snapshot is None or latest_snapshot is None:
7070
return EmptyStartingScanner()

paimon-python/pypaimon/snapshot/snapshot_manager.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -60,15 +60,13 @@ def get_snapshot_path(self, snapshot_id: int) -> Path:
6060
"""
6161
return self.snapshot_dir / f"snapshot-{snapshot_id}"
6262

63-
def get_earliest_snapshot(self) -> Optional[Snapshot]:
64-
"""
65-
Get the earliest snapshot.
66-
67-
Returns:
68-
The earliest snapshot, or None if no snapshots exist
69-
"""
70-
# TODO implement EARLIEST file
71-
return self.get_snapshot_by_id(1)
63+
def try_get_earliest_snapshot(self) -> Optional[Snapshot]:
64+
if self.file_io.exists(self.snapshot_dir / "EARLIEST"):
65+
earliest_content = self.file_io.read_file_utf8(self.snapshot_dir / "EARLIEST")
66+
earliest_snapshot_id = int(earliest_content.strip())
67+
return self.get_snapshot_by_id(earliest_snapshot_id)
68+
else:
69+
return self.get_snapshot_by_id(1)
7270

7371
def earlier_or_equal_time_mills(self, timestamp: int) -> Optional[Snapshot]:
7472
"""

paimon-python/pypaimon/tests/reader_append_only_test.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ def test_incremental_timestamp(self):
290290
t1 = snapshot_manager.get_snapshot_by_id(1).time_millis
291291
t2 = snapshot_manager.get_snapshot_by_id(2).time_millis
292292
# test 1
293-
table = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP: str(timestamp-1) + ',' + str(timestamp)})
293+
table = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP: str(timestamp - 1) + ',' + str(timestamp)})
294294
read_builder = table.new_read_builder()
295295
actual = self._read_test_table(read_builder)
296296
self.assertEqual(len(actual), 0)
@@ -306,6 +306,42 @@ def test_incremental_timestamp(self):
306306
expected = self.expected.slice(4, 4)
307307
self.assertEqual(expected, actual)
308308

309+
def test_incremental_read_multi_snapshots(self):
310+
schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'])
311+
self.catalog.create_table('default.test_incremental_100', schema, False)
312+
table = self.catalog.get_table('default.test_incremental_100')
313+
314+
write_builder = table.new_batch_write_builder()
315+
for i in range(1, 101):
316+
table_write = write_builder.new_write()
317+
table_commit = write_builder.new_commit()
318+
pa_table = pa.Table.from_pydict({
319+
'user_id': [i],
320+
'item_id': [1000 + i],
321+
'behavior': [f'snap{i}'],
322+
'dt': ['p1' if i % 2 == 1 else 'p2'],
323+
}, schema=self.pa_schema)
324+
table_write.write_arrow(pa_table)
325+
table_commit.commit(table_write.prepare_commit())
326+
table_write.close()
327+
table_commit.close()
328+
329+
snapshot_manager = SnapshotManager(table)
330+
t10 = snapshot_manager.get_snapshot_by_id(10).time_millis
331+
t20 = snapshot_manager.get_snapshot_by_id(20).time_millis
332+
333+
table_inc = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP: f"{t10},{t20}"})
334+
read_builder = table_inc.new_read_builder()
335+
actual = self._read_test_table(read_builder).sort_by('user_id')
336+
337+
expected = pa.Table.from_pydict({
338+
'user_id': list(range(11, 21)),
339+
'item_id': [1000 + i for i in range(11, 21)],
340+
'behavior': [f'snap{i}' for i in range(11, 21)],
341+
'dt': ['p1' if i % 2 == 1 else 'p2' for i in range(11, 21)],
342+
}, schema=self.pa_schema).sort_by('user_id')
343+
self.assertEqual(expected, actual)
344+
309345
def _write_test_table(self, table):
310346
write_builder = table.new_batch_write_builder()
311347

0 commit comments

Comments
 (0)