Skip to content

Commit 67c59be

Browse files
committed
docs: add notebook integration_duckdb_example and refactor names of prev notebooks
1 parent cf8b46e commit 67c59be

File tree

3 files changed

+194
-0
lines changed

3 files changed

+194
-0
lines changed
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "code",
5+
"execution_count": null,
6+
"id": "1",
7+
"metadata": {},
8+
"outputs": [],
9+
"source": [
10+
"# Libraries\n",
11+
"import os\n",
12+
"import tempfile\n",
13+
"\n",
14+
"import duckdb\n",
15+
"import pyarrow as pa\n",
16+
"import pyarrow.compute as pc\n",
17+
"\n",
18+
"from pyiceberg.catalog.sql import SqlCatalog\n",
19+
"\n",
20+
"# Create temporary folders for the warehouse and catalog\n",
21+
"warehouse_path = tempfile.mkdtemp(prefix=\"iceberg_warehouse_\")\n",
22+
"catalog_path = os.path.join(warehouse_path, \"catalog.db\")\n",
23+
"print(\"Temporary warehouse:\", warehouse_path)\n",
24+
"print(\"Temporary catalog:\", catalog_path)\n",
25+
"\n",
26+
"# Create a temporary SQL catalog using SQLite\n",
27+
"catalog = SqlCatalog(name=\"tmp_sql_catalog\", uri=f\"sqlite:///{catalog_path}\", warehouse=f\"file://{warehouse_path}\", properties={})\n",
28+
"# Create the default namespace\n",
29+
"catalog.create_namespace(\"default\")"
30+
]
31+
},
32+
{
33+
"cell_type": "markdown",
34+
"id": "2",
35+
"metadata": {},
36+
"source": [
37+
"## First snapshot\n",
38+
"We create the initial dataset and save it to an Iceberg table to create the first snapshot."
39+
]
40+
},
41+
{
42+
"cell_type": "code",
43+
"execution_count": null,
44+
"id": "2",
45+
"metadata": {},
46+
"outputs": [],
47+
"source": [
48+
"# Initial dataset\n",
49+
"data1 = {\n",
50+
" \"vendor_id\": [1, 2, 1, 2, 1],\n",
51+
" \"trip_distance\": [1.5, 2.3, 0.8, 5.2, 3.1],\n",
52+
" \"fare_amount\": [10.0, 15.5, 6.0, 22.0, 18.0],\n",
53+
" \"tip_amount\": [2.0, 3.0, 1.0, 4.5, 3.5],\n",
54+
" \"passenger_count\": [1, 2, 1, 3, 2],\n",
55+
"}\n",
56+
"df1 = pa.table(data1)\n",
57+
"\n",
58+
"# Create the Iceberg table and append initial data (first snapshot)\n",
59+
"table = catalog.create_table(\"default.sample_trips\", schema=df1.schema)\n",
60+
"table.append(df1)\n",
61+
"print(\"First snapshot rows:\", len(table.scan().to_arrow()))"
62+
]
63+
},
64+
{
65+
"cell_type": "markdown",
66+
"id": "3",
67+
"metadata": {},
68+
"source": [
69+
"## Second snapshot\n",
70+
"We add new data to the same table, creating a second snapshot."
71+
]
72+
},
73+
{
74+
"cell_type": "code",
75+
"execution_count": null,
76+
"id": "3",
77+
"metadata": {},
78+
"outputs": [],
79+
"source": [
80+
"# New dataset for the second snapshot\n",
81+
"data2 = {\n",
82+
" \"vendor_id\": [3, 1],\n",
83+
" \"trip_distance\": [2.0, 1.0],\n",
84+
" \"fare_amount\": [12.0, 8.0],\n",
85+
" \"tip_amount\": [1.5, 2.0],\n",
86+
" \"passenger_count\": [1, 1],\n",
87+
"}\n",
88+
"df2 = pa.table(data2)\n",
89+
"\n",
90+
"# Append new data to the table (second snapshot)\n",
91+
"table.append(df2)\n",
92+
"print(\"Second snapshot total rows:\", len(table.scan().to_arrow()))"
93+
]
94+
},
95+
{
96+
"cell_type": "markdown",
97+
"id": "4",
98+
"metadata": {},
99+
"source": [
100+
"## Compare snapshots using DuckDB\n",
101+
"We load both snapshots into DuckDB as temporary tables to find added and removed rows."
102+
]
103+
},
104+
{
105+
"cell_type": "code",
106+
"execution_count": null,
107+
"id": "4",
108+
"metadata": {},
109+
"outputs": [],
110+
"source": [
111+
"# Get snapshot IDs\n",
112+
"snapshots = table.snapshots()\n",
113+
"first_id = snapshots[0].snapshot_id\n",
114+
"second_id = snapshots[-1].snapshot_id\n",
115+
"print(\"Snapshot IDs:\", first_id, second_id)\n",
116+
"\n",
117+
"# Load snapshots into PyArrow tables\n",
118+
"arrow_first = table.scan(snapshot_id=first_id).to_arrow()\n",
119+
"arrow_second = table.scan(snapshot_id=second_id).to_arrow()\n",
120+
"\n",
121+
"# Connect to DuckDB and register tables\n",
122+
"con = duckdb.connect()\n",
123+
"con.register(\"first_snap\", arrow_first)\n",
124+
"con.register(\"second_snap\", arrow_second)\n",
125+
"\n",
126+
"# Find added rows in the second snapshot\n",
127+
"added_rows = con.execute(\"\"\"\n",
128+
"SELECT * FROM second_snap\n",
129+
"EXCEPT\n",
130+
"SELECT * FROM first_snap\n",
131+
"\"\"\").fetchall()\n",
132+
"\n",
133+
"# Find removed rows compared to the first snapshot\n",
134+
"removed_rows = con.execute(\"\"\"\n",
135+
"SELECT * FROM first_snap\n",
136+
"EXCEPT\n",
137+
"SELECT * FROM second_snap\n",
138+
"\"\"\").fetchall()\n",
139+
"\n",
140+
"print(\"=== ADDED ROWS ===\")\n",
141+
"for r in added_rows:\n",
142+
" print(r)\n",
143+
"\n",
144+
"print(\"\\n=== REMOVED ROWS ===\")\n",
145+
"for r in removed_rows:\n",
146+
" print(r)"
147+
]
148+
},
149+
{
150+
"cell_type": "markdown",
151+
"id": "5",
152+
"metadata": {},
153+
"source": [
154+
"## Filters and aggregations on the second snapshot\n",
155+
"We add a computed column and perform filtering and aggregation using DuckDB."
156+
]
157+
},
158+
{
159+
"cell_type": "code",
160+
"execution_count": null,
161+
"id": "5",
162+
"metadata": {},
163+
"outputs": [],
164+
"source": [
165+
"# Add computed column 'tip_per_mile'\n",
166+
"arrow_second = arrow_second.append_column(\"tip_per_mile\", pc.divide(arrow_second[\"tip_amount\"], arrow_second[\"trip_distance\"]))\n",
167+
"con.register(\"second_snap\", arrow_second)\n",
168+
"\n",
169+
"# Filter rows with tip_per_mile > 1.0\n",
170+
"filtered_df = con.execute(\"SELECT * FROM second_snap WHERE tip_per_mile > 1.0\").fetchdf()\n",
171+
"print(\"Filtered rows (tip_per_mile > 1.0):\")\n",
172+
"print(filtered_df)\n",
173+
"\n",
174+
"# Aggregate total fare by vendor\n",
175+
"agg_df = con.execute(\"SELECT vendor_id, SUM(fare_amount) AS total_fare FROM second_snap GROUP BY vendor_id\").fetchdf()\n",
176+
"print(\"Total fare per vendor:\")\n",
177+
"print(agg_df)"
178+
]
179+
}
180+
],
181+
"metadata": {
182+
"kernelspec": {
183+
"display_name": "Python 3 (ipykernel)",
184+
"language": "python",
185+
"name": "python3"
186+
},
187+
"language_info": {
188+
"name": "python",
189+
"version": "3.12"
190+
}
191+
},
192+
"nbformat": 4,
193+
"nbformat_minor": 5
194+
}

0 commit comments

Comments
 (0)