Pangolin supports Iceberg's time-travel capabilities, allowing users to query tables as they existed at a specific point in time or snapshot. This is essential for:
- Auditing historical data states
- Reproducing past analysis results
- Recovering from accidental data modifications
- Comparing data across time periods
Time-travel queries are performed using Iceberg's standard query parameters.
Query the table as it was at a specific snapshot:
GET /v1/analytics/namespaces/sales/tables/transactions?snapshot-id=123456789
Authorization: Bearer <token>
X-Pangolin-Tenant: <tenant-id>Query the table as it was at a specific timestamp (milliseconds since epoch):
GET /v1/analytics/namespaces/sales/tables/transactions?as-of-timestamp=1678886400000
Authorization: Bearer <token>
X-Pangolin-Tenant: <tenant-id>from pyiceberg.catalog import load_catalog
from datetime import datetime
catalog = load_catalog(
"pangolin",
**{
"uri": "http://localhost:8080",
"prefix": "analytics",
"token": "your-jwt-token",
}
)
table = catalog.load_table("sales.transactions")
# Query by snapshot ID
snapshot_id = 123456789
df = table.scan(snapshot_id=snapshot_id).to_pandas()
# Query by timestamp
timestamp = datetime(2024, 1, 15, 10, 30, 0)
df = table.scan(as_of_timestamp=timestamp).to_pandas()
# Query as of yesterday
from datetime import timedelta
yesterday = datetime.now() - timedelta(days=1)
df = table.scan(as_of_timestamp=yesterday).to_pandas()-- Query by snapshot ID
SELECT * FROM analytics.sales.transactions VERSION AS OF 123456789;
-- Query by timestamp
SELECT * FROM analytics.sales.transactions TIMESTAMP AS OF '2024-01-15 10:30:00';
-- Query as of yesterday
SELECT * FROM analytics.sales.transactions TIMESTAMP AS OF current_timestamp() - INTERVAL 1 DAY;-- Query by snapshot ID
SELECT * FROM iceberg.analytics.sales.transactions FOR VERSION AS OF 123456789;
-- Query by timestamp
SELECT * FROM iceberg.analytics.sales.transactions FOR TIMESTAMP AS OF TIMESTAMP '2024-01-15 10:30:00';from pyiceberg.catalog import load_catalog
catalog = load_catalog("pangolin", **config)
table = catalog.load_table("sales.transactions")
# Get table metadata
metadata = table.metadata
# List all snapshots
for snapshot in metadata.snapshots:
print(f"Snapshot ID: {snapshot.snapshot_id}")
print(f"Timestamp: {snapshot.timestamp_ms}")
print(f"Operation: {snapshot.summary.get('operation')}")
print("---")# Get specific snapshot
snapshot_id = 123456789
snapshot = table.snapshot_by_id(snapshot_id)
print(f"Snapshot ID: {snapshot.snapshot_id}")
print(f"Parent ID: {snapshot.parent_snapshot_id}")
print(f"Timestamp: {snapshot.timestamp_ms}")
print(f"Manifest List: {snapshot.manifest_list}")When a time-travel parameter is provided:
- Pangolin retrieves the current metadata for the table
- Verifies if the requested snapshot exists in the metadata history
- Returns the metadata pointing to the requested snapshot
- Client reads data files from the snapshot's manifest list
If the snapshot is not found (e.g., expired and cleaned up), a 404 Not Found is returned.
Iceberg tables retain snapshots based on configuration:
- Default: Last 5 snapshots
- Time-based: Snapshots from last 7 days
from pyiceberg.catalog import load_catalog
catalog = load_catalog("pangolin", **config)
table = catalog.load_table("sales.transactions")
# Update table properties
table.update_properties(
**{
"history.expire.max-snapshot-age-ms": "604800000", # 7 days
"history.expire.min-snapshots-to-keep": "10"
}
)# Expire snapshots older than 7 days
table.expire_snapshots(
older_than=datetime.now() - timedelta(days=7),
retain_last=5
)# Compare data from last month to current
last_month = datetime.now() - timedelta(days=30)
df_last_month = table.scan(as_of_timestamp=last_month).to_pandas()
df_current = table.scan().to_pandas()
# Find differences
diff = df_current.merge(df_last_month, how='outer', indicator=True)
new_records = diff[diff['_merge'] == 'left_only']
deleted_records = diff[diff['_merge'] == 'right_only']# Find snapshot before deletion
snapshots = table.metadata.snapshots
for snapshot in reversed(snapshots):
if snapshot.summary.get('operation') == 'delete':
# Use snapshot before this one
previous_snapshot = snapshot.parent_snapshot_id
df = table.scan(snapshot_id=previous_snapshot).to_pandas()
break# Reproduce analysis from specific date
analysis_date = datetime(2024, 1, 15)
df = table.scan(as_of_timestamp=analysis_date).to_pandas()
# Run same analysis
result = df.groupby('category').agg({'amount': 'sum'})Balance between:
- Storage costs: More snapshots = more metadata storage
- Audit requirements: Compliance may require longer retention
- Recovery needs: Keep enough history for rollback scenarios
Prefer timestamp-based queries for business logic:
# Good: Business-friendly
df = table.scan(as_of_timestamp=datetime(2024, 1, 1)).to_pandas()
# Less ideal: Requires knowing snapshot IDs
df = table.scan(snapshot_id=123456789).to_pandas()For important milestones, document snapshot IDs:
# Production release snapshot
PROD_RELEASE_V1 = 123456789
df = table.scan(snapshot_id=PROD_RELEASE_V1).to_pandas()Automate snapshot expiration to manage storage:
# Weekly cleanup job
table.expire_snapshots(
older_than=datetime.now() - timedelta(days=30),
retain_last=10
)Verify time travel works before relying on it:
# Test query
snapshots = table.metadata.snapshots
if len(snapshots) > 1:
df = table.scan(snapshot_id=snapshots[0].snapshot_id).to_pandas()
assert len(df) >= 0 # Verify query worksCause: Snapshot expired or never existed.
Solution:
- List available snapshots:
table.metadata.snapshots - Check retention policy
- Use a more recent snapshot or timestamp
Cause: No snapshots exist for the requested timestamp.
Solution:
- Check oldest available snapshot:
min(s.timestamp_ms for s in table.metadata.snapshots) - Use a more recent timestamp
- Increase snapshot retention if needed
Cause: Old snapshots may reference many small files.
Solution:
- Consider table compaction
- Use more recent snapshots when possible
- Optimize query filters to reduce data scanned
- Branch Management - Git-like branching for catalogs
- Merge Conflicts - Merging branches
- PyIceberg Testing - PyIceberg integration