Skip to content

Commit 61b6a36

Browse files
committed
docs: add notebook integration_duckdb_example and refactor names of prev notebooks
1 parent cf8b46e commit 61b6a36

File tree

3 files changed

+190
-0
lines changed

3 files changed

+190
-0
lines changed
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "code",
5+
"execution_count": null,
6+
"id": "1",
7+
"metadata": {},
8+
"outputs": [],
9+
"source": [
10+
"# Libraries\n",
11+
"import duckdb\n",
12+
"import pyarrow as pa\n",
13+
"import pyarrow.compute as pc\n",
14+
"import os\n",
15+
"import tempfile\n",
16+
"from pyiceberg.catalog.sql import SqlCatalog\n",
17+
"\n",
18+
"# Create temporary folders for the warehouse and catalog\n",
19+
"warehouse_path = tempfile.mkdtemp(prefix='iceberg_warehouse_')\n",
20+
"catalog_path = os.path.join(warehouse_path, 'catalog.db')\n",
21+
"print('Temporary warehouse:', warehouse_path)\n",
22+
"print('Temporary catalog:', catalog_path)\n",
23+
"\n",
24+
"# Create a temporary SQL catalog using SQLite\n",
25+
"catalog = SqlCatalog(\n",
26+
" name='tmp_sql_catalog',\n",
27+
" uri=f'sqlite:///{catalog_path}',\n",
28+
" warehouse=f'file://{warehouse_path}',\n",
29+
" properties={}\n",
30+
")\n",
31+
"# Create the default namespace\n",
32+
"catalog.create_namespace('default')"
33+
]
34+
},
35+
{
36+
"cell_type": "markdown",
37+
"id": "2",
38+
"metadata": {},
39+
"source": [
40+
"## First snapshot\n",
41+
"We create the initial dataset and save it to an Iceberg table to create the first snapshot."
42+
]
43+
},
44+
{
45+
"cell_type": "code",
46+
"execution_count": null,
47+
"id": "2",
48+
"metadata": {},
49+
"outputs": [],
50+
"source": [
51+
"# Initial dataset\n",
52+
"data1 = {\n",
53+
" 'vendor_id':[1,2,1,2,1],\n",
54+
" 'trip_distance':[1.5,2.3,0.8,5.2,3.1],\n",
55+
" 'fare_amount':[10.0,15.5,6.0,22.0,18.0],\n",
56+
" 'tip_amount':[2.0,3.0,1.0,4.5,3.5],\n",
57+
" 'passenger_count':[1,2,1,3,2]\n",
58+
"}\n",
59+
"df1 = pa.table(data1)\n",
60+
"\n",
61+
"# Create the Iceberg table and append initial data (first snapshot)\n",
62+
"table = catalog.create_table('default.sample_trips', schema=df1.schema)\n",
63+
"table.append(df1)\n",
64+
"print('First snapshot rows:', len(table.scan().to_arrow()))"
65+
]
66+
},
67+
{
68+
"cell_type": "markdown",
69+
"id": "3",
70+
"metadata": {},
71+
"source": [
72+
"## Second snapshot\n",
73+
"We add new data to the same table, creating a second snapshot."
74+
]
75+
},
76+
{
77+
"cell_type": "code",
78+
"execution_count": null,
79+
"id": "3",
80+
"metadata": {},
81+
"outputs": [],
82+
"source": [
83+
"# New dataset for the second snapshot\n",
84+
"data2 = {\n",
85+
" 'vendor_id':[3,1],\n",
86+
" 'trip_distance':[2.0,1.0],\n",
87+
" 'fare_amount':[12.0,8.0],\n",
88+
" 'tip_amount':[1.5,2.0],\n",
89+
" 'passenger_count':[1,1]\n",
90+
"}\n",
91+
"df2 = pa.table(data2)\n",
92+
"\n",
93+
"# Append new data to the table (second snapshot)\n",
94+
"table.append(df2)\n",
95+
"print('Second snapshot total rows:', len(table.scan().to_arrow()))"
96+
]
97+
},
98+
{
99+
"cell_type": "markdown",
100+
"id": "4",
101+
"metadata": {},
102+
"source": [
103+
"## Compare snapshots using DuckDB\n",
104+
"We load both snapshots into DuckDB as temporary tables to find added and removed rows."
105+
]
106+
},
107+
{
108+
"cell_type": "code",
109+
"execution_count": null,
110+
"id": "4",
111+
"metadata": {},
112+
"outputs": [],
113+
"source": [
114+
"# Get snapshot IDs\n",
115+
"snapshots = table.snapshots()\n",
116+
"first_id = snapshots[0].snapshot_id\n",
117+
"second_id = snapshots[-1].snapshot_id\n",
118+
"print('Snapshot IDs:', first_id, second_id)\n",
119+
"\n",
120+
"# Load snapshots into PyArrow tables\n",
121+
"arrow_first = table.scan(snapshot_id=first_id).to_arrow()\n",
122+
"arrow_second = table.scan(snapshot_id=second_id).to_arrow()\n",
123+
"\n",
124+
"# Connect to DuckDB and register tables\n",
125+
"con = duckdb.connect()\n",
126+
"con.register('first_snap', arrow_first)\n",
127+
"con.register('second_snap', arrow_second)\n",
128+
"\n",
129+
"# Find added rows in the second snapshot\n",
130+
"added_rows = con.execute('''\n",
131+
"SELECT * FROM second_snap\n",
132+
"EXCEPT\n",
133+
"SELECT * FROM first_snap\n",
134+
"''').fetchall()\n",
135+
"\n",
136+
"# Find removed rows compared to the first snapshot\n",
137+
"removed_rows = con.execute('''\n",
138+
"SELECT * FROM first_snap\n",
139+
"EXCEPT\n",
140+
"SELECT * FROM second_snap\n",
141+
"''').fetchall()\n",
142+
"\n",
143+
"print('=== ADDED ROWS ===')\n",
144+
"for r in added_rows:\n",
145+
" print(r)\n",
146+
"\n",
147+
"print('\\n=== REMOVED ROWS ===')\n",
148+
"for r in removed_rows:\n",
149+
" print(r)"
150+
]
151+
},
152+
{
153+
"cell_type": "markdown",
154+
"id": "5",
155+
"metadata": {},
156+
"source": [
157+
"## Filters and aggregations on the second snapshot\n",
158+
"We add a computed column and perform filtering and aggregation using DuckDB."
159+
]
160+
},
161+
{
162+
"cell_type": "code",
163+
"execution_count": null,
164+
"id": "5",
165+
"metadata": {},
166+
"outputs": [],
167+
"source": [
168+
"# Add computed column 'tip_per_mile'\n",
169+
"arrow_second = arrow_second.append_column('tip_per_mile', pc.divide(arrow_second['tip_amount'], arrow_second['trip_distance']))\n",
170+
"con.register('second_snap', arrow_second)\n",
171+
"\n",
172+
"# Filter rows with tip_per_mile > 1.0\n",
173+
"filtered_df = con.execute('SELECT * FROM second_snap WHERE tip_per_mile > 1.0').fetchdf()\n",
174+
"print('Filtered rows (tip_per_mile > 1.0):')\n",
175+
"print(filtered_df)\n",
176+
"\n",
177+
"# Aggregate total fare by vendor\n",
178+
"agg_df = con.execute('SELECT vendor_id, SUM(fare_amount) AS total_fare FROM second_snap GROUP BY vendor_id').fetchdf()\n",
179+
"print('Total fare per vendor:')\n",
180+
"print(agg_df)"
181+
]
182+
}
183+
],
184+
"metadata": {
185+
"kernelspec": {"display_name":"Python 3 (ipykernel)","language":"python","name":"python3"},
186+
"language_info":{"name":"python","version":"3.12"}
187+
},
188+
"nbformat": 4,
189+
"nbformat_minor": 5
190+
}

0 commit comments

Comments
 (0)