Skip to content

Commit 7a265d7

Browse files
authored
Merge pull request #80 from marancibia/main
New delta sharing example
2 parents a61f0d0 + 8f6551d commit 7a265d7

File tree

3 files changed

+295
-1
lines changed

3 files changed

+295
-1
lines changed

apex/Ask-Oracle/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ models.
1111

1212
## Ask Oracle overview
1313

14-
Now in version 3, the Ask Oracle chatbot supports a broad range of
14+
Now in version 4, the Ask Oracle chatbot supports a broad range of
1515
functionality for Select AI:
1616

1717
- **Chat** -- direct interaction with the LLM specified in your selected

delta-sharing/README.md

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
# Delta Sharing
2+
3+
## Overview
4+
5+
Oracle Autonomous Database supports versioned shares through the open Delta Sharing protocol. Providers publish data from Autonomous Database, and recipients access shares using a JSON profile and query Parquet data for a selected version window. Oracle also provides a fully scriptable sharing workflow through the `DBMS_SHARE` package.
6+
7+
## Files
8+
9+
- `./change-data-feed/Oracle Delta Sharing CDF.ipynb` — Python code that compares two versions of a Delta Share and prints the raw change rows returned for that version window.
10+
11+
## What the notebook shows
12+
13+
This notebook demonstrates a file-based CDF-style workflow for versioned Delta Shares published by Oracle Autonomous Database.
14+
15+
The notebook:
16+
17+
- authenticates with a Delta Sharing profile
18+
- requests changes between `START_VERSION` and `END_VERSION`
19+
- downloads only the Parquet/action files returned for that version window
20+
- displays raw rows together with `_commit_version` and `_change_type`
21+
22+
This makes it useful for validating what changed between two published versions of a share without scanning the full share.
23+
24+
## Important behavior
25+
26+
This sample operates at file level.
27+
28+
When a file changes between two versions:
29+
30+
- rows from the previous file can appear as `delete`
31+
- rows from the replacement file can appear as `insert`
32+
33+
As a result, unchanged rows inside a replaced file can appear as matching delete/insert pairs. The notebook intentionally shows the raw output so downstream logic can derive the net inserts, deletes, and updates.
34+
35+
In practice, this means that if a share is large but only a small incremental change was published, the notebook reads only the files returned for the requested version window rather than scanning the full share.
36+
37+
## References
38+
39+
- [Overview of the Data Share Tool](https://docs.oracle.com/en/cloud/paas/autonomous-database/serverless/adbsb/overview-adp-share.html)
40+
- [Manage Shares with DBMS_SHARE](https://docs.oracle.com/en-us/iaas/autonomous-database-serverless/doc/manage-shares.html)
41+
- [DBMS_SHARE Constants](https://docs.oracle.com/en/cloud/paas/autonomous-database/serverless/adbsb/dbms-share-package-constants.html)
42+
- [High-Level Steps for Receiving Shares for Versioned Data](https://docs.oracle.com/en/database/oracle/sql-developer-web/sdwfd/high-level-steps-recieving-data-shares-versioned-data.html)
43+
- [Sharing Data from On-Premise Oracle Databases](https://blogs.oracle.com/autonomous-ai-database/sharing-data-from-onpremise-oracle-databases)
44+
- [Seamless, Open Data Sharing Between Oracle Autonomous Database and Databricks](https://blogs.oracle.com/autonomous-ai-database/open-data-sharing-between-oracle-and-databricks)
Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "markdown",
5+
"metadata": {
6+
"application/vnd.databricks.v1+cell": {
7+
"cellMetadata": {},
8+
"inputWidgets": {},
9+
"nuid": "47d95cd3-9819-4136-8c6e-da12f04a96ac",
10+
"showTitle": true,
11+
"tableResultSettingsMap": {},
12+
"title": "Overview"
13+
}
14+
},
15+
"source": [
16+
"# Oracle Delta Sharing CDF Smoke Test\n",
17+
"\n",
18+
"This notebook reads **Change Data Feed (CDF)** from an Oracle Autonomous Database\n",
19+
"that publishes a Delta Sharing endpoint, and displays the raw change rows.\n",
20+
"\n",
21+
"## Why a custom REST approach?\n",
22+
"\n",
23+
"| Problem | Root cause | Workaround |\n",
24+
"|---|---|---|\n",
25+
"| `spark.read.format(\"deltaSharing\")` throws `InvocationTargetException` | Spark's Java Delta Sharing connector is incompatible with Oracle endpoints on serverless compute | Use the Python `delta_sharing` REST client instead |\n",
26+
"| `load_table_changes_as_pandas()` throws `KeyError: '_commit_timestamp'` | Oracle's file-level CDF omits the `_commit_timestamp` column that the library expects | Call the REST API directly and parse with `DeltaSharingReader._to_pandas()` |\n",
27+
"| `spark.read.parquet(*urls)` throws `UNSUPPORTED_FILE_SYSTEM` | Spark can't read HTTPS pre-signed URLs from Oracle object storage | Download via HTTP with `_to_pandas()`, then convert to Spark DataFrame |\n",
28+
"\n",
29+
"## Oracle's file-level CDF\n",
30+
"\n",
31+
"Oracle implements CDF at the **file level**, not the row level. When *any* row in a\n",
32+
"data file changes, Oracle replaces the **entire file**. The CDF response therefore\n",
33+
"shows:\n",
34+
"- **All rows from the old file** as `delete`\n",
35+
"- **All rows from the new file** as `insert`\n",
36+
"\n",
37+
"Unchanged rows appear as matching DELETE + INSERT pairs (file-level artifacts).\n",
38+
"To derive the true net changes, cancel out identical pairs — this notebook shows\n",
39+
"the raw output so you can observe this behavior directly."
40+
]
41+
},
42+
{
43+
"cell_type": "code",
44+
"execution_count": 0,
45+
"metadata": {
46+
"application/vnd.databricks.v1+cell": {
47+
"cellMetadata": {},
48+
"inputWidgets": {},
49+
"nuid": "55fe12eb-a206-4f2b-a37d-97fe0d113123",
50+
"showTitle": true,
51+
"tableResultSettingsMap": {},
52+
"title": "Install dependencies"
53+
}
54+
},
55+
"outputs": [],
56+
"source": [
57+
"%pip install delta-sharing -q"
58+
]
59+
},
60+
{
61+
"cell_type": "code",
62+
"execution_count": 0,
63+
"metadata": {
64+
"application/vnd.databricks.v1+cell": {
65+
"cellMetadata": {},
66+
"inputWidgets": {},
67+
"nuid": "351d8f42-1508-4d2d-9973-22ef9849f51f",
68+
"showTitle": true,
69+
"tableResultSettingsMap": {},
70+
"title": "Read CDF and display raw changes"
71+
}
72+
},
73+
"outputs": [],
74+
"source": [
75+
"# ---------------------------------------------------------------------------\n",
76+
"# Oracle Delta Sharing CDF Smoke Test\n",
77+
"# ---------------------------------------------------------------------------\n",
78+
"# Reads raw CDF (Change Data Feed) for a single version window from an\n",
79+
"# Oracle Autonomous Database Delta Sharing endpoint and prints the rows.\n",
80+
"#\n",
81+
"# This script does NOT write anything to a Delta table — it is purely\n",
82+
"# diagnostic, useful for inspecting what Oracle's file-level CDF returns\n",
83+
"# before applying changes downstream.\n",
84+
"# ---------------------------------------------------------------------------\n",
85+
"\n",
86+
"import json\n",
87+
"import time\n",
88+
"\n",
89+
"import pandas as pd\n",
90+
"import delta_sharing\n",
91+
"from delta_sharing.protocol import DeltaSharingProfile, Table\n",
92+
"from delta_sharing.reader import CdfOptions, DeltaSharingReader, to_converters\n",
93+
"from delta_sharing.rest_client import DataSharingRestClient\n",
94+
"from pyspark.sql import functions as F\n",
95+
"\n",
96+
"# ---------------------------------------------------------------------------\n",
97+
"# Configuration — replace placeholders with your own values\n",
98+
"# ---------------------------------------------------------------------------\n",
99+
"\n",
100+
"# Path to the .share profile file on your Databricks workspace.\n",
101+
"# The profile contains the Oracle sharing endpoint URL and credentials.\n",
102+
"# See: https://github.com/delta-io/delta-sharing/blob/main/PROTOCOL.md#profile-file-format\n",
103+
"PROFILE_PATH = \"/Workspace/Users/<your-email>/<your-profile>.share\"\n",
104+
"\n",
105+
"# Coordinates of the shared table (as listed by delta_sharing.SharingClient).\n",
106+
"SHARE_NAME = \"<YOUR_SHARE>\"\n",
107+
"SCHEMA_NAME = \"<YOUR_SCHEMA>\"\n",
108+
"TABLE_NAME = \"<YOUR_TABLE>\"\n",
109+
"\n",
110+
"# CDF version window to inspect.\n",
111+
"# Set both to the same value to see a single version's changes.\n",
112+
"START_VERSION = 1\n",
113+
"END_VERSION = 2\n",
114+
"\n",
115+
"# How many sample rows to print.\n",
116+
"SHOW_SAMPLE_ROWS = 50\n",
117+
"\n",
118+
"# ---------------------------------------------------------------------------\n",
119+
"# Read CDF via Delta Sharing REST API\n",
120+
"# ---------------------------------------------------------------------------\n",
121+
"# We bypass the high-level library functions (load_table_changes_as_pandas,\n",
122+
"# spark.read.format(\"deltaSharing\")) because:\n",
123+
"# 1. Oracle omits _commit_timestamp → KeyError in the library\n",
124+
"# 2. Spark's Java connector throws InvocationTargetException on serverless\n",
125+
"# 3. Pre-signed HTTPS URLs can't be read by spark.read.parquet()\n",
126+
"#\n",
127+
"# Instead we:\n",
128+
"# a) Call list_table_changes() on the REST client to get pre-signed URLs\n",
129+
"# b) Download each Parquet file via HTTP with _to_pandas()\n",
130+
"# c) Concatenate into a single pandas DataFrame, then convert to Spark\n",
131+
"# ---------------------------------------------------------------------------\n",
132+
"\n",
133+
"table_url = f\"{PROFILE_PATH}#{SHARE_NAME}.{SCHEMA_NAME}.{TABLE_NAME}\"\n",
134+
"print(f\"[info] table_url={table_url}\")\n",
135+
"print(f\"[info] requested CDF window={START_VERSION} -> {END_VERSION}\")\n",
136+
"\n",
137+
"t0 = time.time()\n",
138+
"\n",
139+
"profile = DeltaSharingProfile.read_from_file(PROFILE_PATH)\n",
140+
"rest = DataSharingRestClient(profile)\n",
141+
"table_obj = Table(name=TABLE_NAME, share=SHARE_NAME, schema=SCHEMA_NAME)\n",
142+
"\n",
143+
"cdf_opts = CdfOptions(starting_version=START_VERSION, ending_version=END_VERSION)\n",
144+
"response = rest.list_table_changes(table_obj, cdf_opts)\n",
145+
"\n",
146+
"# Parse the schema returned by Oracle so _to_pandas() can cast columns.\n",
147+
"schema_json = json.loads(response.metadata.schema_string)\n",
148+
"converters = to_converters(schema_json)\n",
149+
"\n",
150+
"# Download every Parquet action file returned in the CDF response.\n",
151+
"# Each action corresponds to one physical file on Oracle object storage.\n",
152+
"pdfs = []\n",
153+
"for action in response.actions:\n",
154+
" pdfs.append(\n",
155+
" DeltaSharingReader._to_pandas(\n",
156+
" action,\n",
157+
" converters,\n",
158+
" True, # for_cdf — adds _change_type / _commit_version columns\n",
159+
" None, # limit\n",
160+
" True, # use_delta_format\n",
161+
" )\n",
162+
" )\n",
163+
"\n",
164+
"elapsed = time.time() - t0\n",
165+
"\n",
166+
"# ---------------------------------------------------------------------------\n",
167+
"# Display results\n",
168+
"# ---------------------------------------------------------------------------\n",
169+
"\n",
170+
"if not pdfs:\n",
171+
" print(f\"[info] elapsed_seconds={elapsed:.2f}\")\n",
172+
" print(\"[result] empty CDF response for requested version window\")\n",
173+
"else:\n",
174+
" cdf_pdf = pd.concat(pdfs, ignore_index=True)\n",
175+
" df = spark.createDataFrame(cdf_pdf)\n",
176+
"\n",
177+
" # Safety filter: ensure we only look at the requested version range.\n",
178+
" if \"_commit_version\" in df.columns:\n",
179+
" df = df.filter(\n",
180+
" (F.col(\"_commit_version\") >= F.lit(START_VERSION))\n",
181+
" & (F.col(\"_commit_version\") <= F.lit(END_VERSION))\n",
182+
" )\n",
183+
"\n",
184+
" raw_count = df.count()\n",
185+
" elapsed = time.time() - t0\n",
186+
"\n",
187+
" print(f\"[info] elapsed_seconds={elapsed:.2f}\")\n",
188+
" print(f\"[info] raw_row_count={raw_count:,}\")\n",
189+
" print(f\"[info] columns={df.columns}\")\n",
190+
"\n",
191+
" if raw_count == 0:\n",
192+
" print(\"[result] empty CDF response for requested version window\")\n",
193+
" else:\n",
194+
" # --- Summary: row counts by version and change type ----------------\n",
195+
" if \"_commit_version\" in df.columns and \"_change_type\" in df.columns:\n",
196+
" print(\"[result] rows by commit version and change type\")\n",
197+
" (\n",
198+
" df.groupBy(\"_commit_version\", \"_change_type\")\n",
199+
" .count()\n",
200+
" .orderBy(\"_commit_version\", \"_change_type\")\n",
201+
" .show(100, truncate=False)\n",
202+
" )\n",
203+
"\n",
204+
" # --- Sample rows (truncated for readability) -----------------------\n",
205+
" # NOTE: Because Oracle uses file-level CDF, you will typically see\n",
206+
" # more rows than the actual number of changed rows. For example,\n",
207+
" # inserting 1 row into a file that already has 2 rows produces:\n",
208+
" # 2 deletes (old file: all existing rows)\n",
209+
" # 3 inserts (new file: existing rows + the new row)\n",
210+
" # The 2 matching DELETE+INSERT pairs are unchanged — only the\n",
211+
" # extra INSERT is the real change.\n",
212+
" print(\"[result] sample rows\")\n",
213+
" order_cols = []\n",
214+
" if \"_commit_version\" in df.columns:\n",
215+
" order_cols.append(F.col(\"_commit_version\").desc())\n",
216+
" if \"_change_type\" in df.columns:\n",
217+
" order_cols.append(F.col(\"_change_type\"))\n",
218+
"\n",
219+
" if order_cols:\n",
220+
" df.orderBy(*order_cols).show(SHOW_SAMPLE_ROWS, truncate=True)\n",
221+
" else:\n",
222+
" df.show(SHOW_SAMPLE_ROWS, truncate=True)"
223+
]
224+
}
225+
],
226+
"metadata": {
227+
"application/vnd.databricks.v1+notebook": {
228+
"computePreferences": null,
229+
"dashboards": [],
230+
"environmentMetadata": null,
231+
"inputWidgetPreferences": null,
232+
"language": "python",
233+
"notebookMetadata": {
234+
"pythonIndentUnit": 4
235+
},
236+
"notebookName": "Oracle Delta Sharing CDF Smoke Test GitHub",
237+
"widgets": {}
238+
},
239+
"kernelspec": {
240+
"display_name": "Python 3",
241+
"language": "python",
242+
"name": "python3"
243+
},
244+
"language_info": {
245+
"name": "python"
246+
}
247+
},
248+
"nbformat": 4,
249+
"nbformat_minor": 0
250+
}

0 commit comments

Comments
 (0)