Skip to content

Commit b3ac038

Browse files
xinlian12Copilot
andauthored
Add throughput bucket samples for Cosmos Spark connector (#48734)
Add Python (.ipynb) and Scala sample notebooks demonstrating server-side throughput bucket configuration as an alternative to SDK-based global throughput control. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 822df77 commit b3ac038

File tree

2 files changed

+626
-0
lines changed

2 files changed

+626
-0
lines changed
Lines changed: 348 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,348 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "markdown",
5+
"source": [
6+
"**Secrets**\n\nThe secrets below like the Cosmos account key are retrieved from a secret scope. If you don't have defined a secret scope for a Cosmos Account you want to use when going through this sample you can find the instructions on how to create one here:\n- Here you can [Create a new secret scope](./#secrets/createScope) for the current Databricks workspace\n - See how you can create an [Azure Key Vault backed secret scope](https://docs.microsoft.com/azure/databricks/security/secrets/secret-scopes#--create-an-azure-key-vault-backed-secret-scope) \n - See how you can create a [Databricks backed secret scope](https://docs.microsoft.com/azure/databricks/security/secrets/secret-scopes#create-a-databricks-backed-secret-scope)\n- And here you can find information on how to [add secrets to your Spark configuration](https://docs.microsoft.com/azure/databricks/security/secrets/secrets#read-a-secret)\nIf you don't want to use secrets at all you can of course also just assign the values in clear-text below - but for obvious reasons we recommend the usage of secrets."
7+
],
8+
"metadata": {
9+
"application/vnd.databricks.v1+cell": {
10+
"title": "",
11+
"showTitle": false,
12+
"inputWidgets": {},
13+
"nuid": "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
14+
}
15+
}
16+
},
17+
{
18+
"cell_type": "code",
19+
"source": [
20+
"cosmosEndpoint = spark.conf.get(\"spark.cosmos.accountEndpoint\")\ncosmosMasterKey = spark.conf.get(\"spark.cosmos.accountKey\")"
21+
],
22+
"metadata": {
23+
"application/vnd.databricks.v1+cell": {
24+
"title": "",
25+
"showTitle": false,
26+
"inputWidgets": {},
27+
"nuid": "b2c3d4e5-f6a7-8901-bcde-f12345678901"
28+
}
29+
},
30+
"outputs": [],
31+
"execution_count": 0
32+
},
33+
{
34+
"cell_type": "markdown",
35+
"source": [
36+
"**Preparation - creating the Cosmos DB container to ingest the data into**"
37+
],
38+
"metadata": {
39+
"application/vnd.databricks.v1+cell": {
40+
"title": "",
41+
"showTitle": false,
42+
"inputWidgets": {},
43+
"nuid": "c3d4e5f6-a7b8-9012-cdef-123456789012"
44+
}
45+
}
46+
},
47+
{
48+
"cell_type": "markdown",
49+
"source": [
50+
"Configure the Catalog API to be used"
51+
],
52+
"metadata": {
53+
"application/vnd.databricks.v1+cell": {
54+
"title": "",
55+
"showTitle": false,
56+
"inputWidgets": {},
57+
"nuid": "d4e5f6a7-b8c9-0123-defa-234567890123"
58+
}
59+
}
60+
},
61+
{
62+
"cell_type": "code",
63+
"source": [
64+
"import uuid\nspark.conf.set(\"spark.sql.catalog.cosmosCatalog\", \"com.azure.cosmos.spark.CosmosCatalog\")\nspark.conf.set(\"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint\", cosmosEndpoint)\nspark.conf.set(\"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey\", cosmosMasterKey)\nspark.conf.set(\"spark.sql.catalog.cosmosCatalog.spark.cosmos.views.repositoryPath\", \"/viewDefinitions\" + str(uuid.uuid4()))\n"
65+
],
66+
"metadata": {
67+
"application/vnd.databricks.v1+cell": {
68+
"title": "",
69+
"showTitle": false,
70+
"inputWidgets": {},
71+
"nuid": "e5f6a7b8-c9d0-1234-efab-345678901234"
72+
}
73+
},
74+
"outputs": [],
75+
"execution_count": 0
76+
},
77+
{
78+
"cell_type": "markdown",
79+
"source": [
80+
"And execute the command to create the new container with a throughput of up-to 100,000 RU (Autoscale - so 10,000 - 100,000 RU based on scale) and only system properties (like /id) being indexed.\n\n**Note:** Unlike SDK-based throughput control, throughput buckets do NOT require a separate metadata container (ThroughputControl) because they are managed server-side."
81+
],
82+
"metadata": {
83+
"application/vnd.databricks.v1+cell": {
84+
"title": "",
85+
"showTitle": false,
86+
"inputWidgets": {},
87+
"nuid": "f6a7b8c9-d0e1-2345-fabc-456789012345"
88+
}
89+
}
90+
},
91+
{
92+
"cell_type": "code",
93+
"source": [
94+
"%sql\nCREATE DATABASE IF NOT EXISTS cosmosCatalog.SampleDatabase;\n\nCREATE TABLE IF NOT EXISTS cosmosCatalog.SampleDatabase.GreenTaxiRecords\nUSING cosmos.oltp\nTBLPROPERTIES(partitionKeyPath = '/id', autoScaleMaxThroughput = '100000', indexingPolicy = 'OnlySystemProperties');\n\nCREATE TABLE IF NOT EXISTS cosmosCatalog.SampleDatabase.GreenTaxiRecordsCFSink\nUSING cosmos.oltp\nTBLPROPERTIES(partitionKeyPath = '/id', autoScaleMaxThroughput = '100000', indexingPolicy = 'OnlySystemProperties');"
95+
],
96+
"metadata": {
97+
"application/vnd.databricks.v1+cell": {
98+
"title": "",
99+
"showTitle": false,
100+
"inputWidgets": {},
101+
"nuid": "a7b8c9d0-e1f2-3456-abcd-567890123456"
102+
}
103+
},
104+
"outputs": [],
105+
"execution_count": 0
106+
},
107+
{
108+
"cell_type": "markdown",
109+
"source": [
110+
"**Preparation - loading data source \"[NYC Taxi & Limousine Commission - green taxi trip records](https://azure.microsoft.com/services/open-datasets/catalog/nyc-taxi-limousine-commission-green-taxi-trip-records/)\"**\n\nThe green taxi trip records include fields capturing pick-up and drop-off dates/times, pick-up and drop-off locations, trip distances, itemized fares, rate types, payment types, and driver-reported passenger counts. This data set has over 80 million records (>8 GB) of data and is available via a publicly accessible Azure Blob Storage Account located in the East-US Azure region."
111+
],
112+
"metadata": {
113+
"application/vnd.databricks.v1+cell": {
114+
"title": "",
115+
"showTitle": false,
116+
"inputWidgets": {},
117+
"nuid": "b8c9d0e1-f2a3-4567-bcde-678901234567"
118+
}
119+
}
120+
},
121+
{
122+
"cell_type": "code",
123+
"source": [
124+
"import datetime\nimport time\nimport uuid\nfrom pyspark.sql.functions import udf\nfrom pyspark.sql.types import StringType, LongType\n\nprint(\"Starting preparation: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))\n# Azure storage access info\nblob_account_name = \"azureopendatastorage\"\nblob_container_name = \"nyctlc\"\nblob_relative_path = \"green\"\nblob_sas_token = r\"\"\n# Allow SPARK to read from Blob remotely\nwasbs_path = 'wasbs://%s@%s.blob.core.windows.net/%s' % (blob_container_name, blob_account_name, blob_relative_path)\nspark.conf.set(\n 'fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name),\n blob_sas_token)\nprint('Remote blob path: ' + wasbs_path)\n# SPARK read parquet, note that it won't load any data yet by now\n# NOTE - if you want to experiment with larger dataset sizes - consider switching to Option B (commenting code \n# for Option A/uncommenting code for option B) the lines below or increase the value passed into the \n# limit function restricting the dataset size below\n\n#------------------------------------------------------------------------------------\n# Option A - with limited dataset size\n#------------------------------------------------------------------------------------\ndf_rawInputWithoutLimit = spark.read.parquet(wasbs_path)\npartitionCount = df_rawInputWithoutLimit.rdd.getNumPartitions()\ndf_rawInput = df_rawInputWithoutLimit.limit(1_000_000).repartition(partitionCount)\ndf_rawInput.persist()\n\n#------------------------------------------------------------------------------------\n# Option B - entire dataset\n#------------------------------------------------------------------------------------\n#df_rawInput = spark.read.parquet(wasbs_path)\n\n# Adding an id column with unique values\nuuidUdf= udf(lambda : str(uuid.uuid4()),StringType())\nnowUdf= udf(lambda : int(time.time() * 1000),LongType())\ndf_input_withId = df_rawInput \\\n .withColumn(\"id\", uuidUdf()) \\\n .withColumn(\"insertedAt\", nowUdf()) \\\n\nprint('Register the DataFrame as a SQL temporary view: source')\ndf_input_withId.createOrReplaceTempView('source')\nprint(\"Finished preparation: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))"
125+
],
126+
"metadata": {
127+
"application/vnd.databricks.v1+cell": {
128+
"title": "",
129+
"showTitle": false,
130+
"inputWidgets": {},
131+
"nuid": "c9d0e1f2-a3b4-5678-cdef-789012345678"
132+
}
133+
},
134+
"outputs": [],
135+
"execution_count": 0
136+
},
137+
{
138+
"cell_type": "markdown",
139+
"source": [
140+
"** Sample - ingesting the NYC Green Taxi data into Cosmos DB using throughput bucket**\n\nThroughput buckets provide server-side throughput control. Instead of using the SDK-based global throughput control\n(which requires a separate metadata container), you configure a `throughputBucket` value between 1 and 5.\n\nThis is simpler to configure because it does not require a separate throughput control metadata container.\nFor more information, see [Throughput Buckets](https://learn.microsoft.com/azure/cosmos-db/throughput-buckets?tabs=dotnet)."
141+
],
142+
"metadata": {
143+
"application/vnd.databricks.v1+cell": {
144+
"title": "",
145+
"showTitle": false,
146+
"inputWidgets": {},
147+
"nuid": "d0e1f2a3-b4c5-6789-defa-890123456789"
148+
}
149+
}
150+
},
151+
{
152+
"cell_type": "code",
153+
"source": [
154+
"import uuid\nimport datetime\n\nprint(\"Starting ingestion: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))\n\nwriteCfg = {\n \"spark.cosmos.accountEndpoint\": cosmosEndpoint,\n \"spark.cosmos.accountKey\": cosmosMasterKey,\n \"spark.cosmos.database\": \"SampleDatabase\",\n \"spark.cosmos.container\": \"GreenTaxiRecords\",\n \"spark.cosmos.write.strategy\": \"ItemOverwrite\",\n \"spark.cosmos.write.bulk.enabled\": \"true\",\n \"spark.cosmos.throughputControl.enabled\": \"true\",\n \"spark.cosmos.throughputControl.name\": \"NYCGreenTaxiDataIngestion\",\n \"spark.cosmos.throughputControl.throughputBucket\": \"5\",\n}\n\ndf_NYCGreenTaxi_Input = spark.sql('SELECT * FROM source')\n\ndf_NYCGreenTaxi_Input \\\n .write \\\n .format(\"cosmos.oltp\") \\\n .mode(\"Append\") \\\n .options(**writeCfg) \\\n .save()\n\nprint(\"Finished ingestion: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))"
155+
],
156+
"metadata": {
157+
"application/vnd.databricks.v1+cell": {
158+
"title": "",
159+
"showTitle": false,
160+
"inputWidgets": {},
161+
"nuid": "e1f2a3b4-c5d6-7890-efab-901234567890"
162+
}
163+
},
164+
"outputs": [],
165+
"execution_count": 0
166+
},
167+
{
168+
"cell_type": "markdown",
169+
"source": [
170+
"**Getting the reference record count**"
171+
],
172+
"metadata": {
173+
"application/vnd.databricks.v1+cell": {
174+
"title": "",
175+
"showTitle": false,
176+
"inputWidgets": {},
177+
"nuid": "f2a3b4c5-d6e7-8901-fabc-012345678901"
178+
}
179+
}
180+
},
181+
{
182+
"cell_type": "code",
183+
"source": [
184+
"count_source = spark.sql('SELECT * FROM source').count()\nprint(\"Number of records in source: \", count_source) "
185+
],
186+
"metadata": {
187+
"application/vnd.databricks.v1+cell": {
188+
"title": "",
189+
"showTitle": false,
190+
"inputWidgets": {},
191+
"nuid": "a3b4c5d6-e7f8-9012-abcd-123456789012"
192+
}
193+
},
194+
"outputs": [],
195+
"execution_count": 0
196+
},
197+
{
198+
"cell_type": "markdown",
199+
"source": [
200+
"**Sample - validating the record count via query**"
201+
],
202+
"metadata": {
203+
"application/vnd.databricks.v1+cell": {
204+
"title": "",
205+
"showTitle": false,
206+
"inputWidgets": {},
207+
"nuid": "b4c5d6e7-f8a9-0123-bcde-234567890123"
208+
}
209+
}
210+
},
211+
{
212+
"cell_type": "code",
213+
"source": [
214+
"from pyspark.sql.types import *\nimport pyspark.sql.functions as F\n\nprint(\"Starting validation via query: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))\nreadCfg = {\n \"spark.cosmos.accountEndpoint\": cosmosEndpoint,\n \"spark.cosmos.accountKey\": cosmosMasterKey,\n \"spark.cosmos.database\": \"SampleDatabase\",\n \"spark.cosmos.container\": \"GreenTaxiRecords\",\n \"spark.cosmos.read.partitioning.strategy\": \"Restrictive\",\n \"spark.cosmos.read.inferSchema.enabled\" : \"false\",\n \"spark.cosmos.read.customQuery\" : \"SELECT COUNT(0) AS Count FROM c\"\n}\n\ncount_query_schema=StructType(fields=[StructField(\"Count\", LongType(), True)])\nquery_df = spark.read.format(\"cosmos.oltp\").schema(count_query_schema).options(**readCfg).load()\ncount_query = query_df.select(F.sum(\"Count\").alias(\"TotalCount\")).first()[\"TotalCount\"]\nprint(\"Number of records retrieved via query: \", count_query) \nprint(\"Finished validation via query: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))\n\nassert count_source == count_query"
215+
],
216+
"metadata": {
217+
"application/vnd.databricks.v1+cell": {
218+
"title": "",
219+
"showTitle": false,
220+
"inputWidgets": {},
221+
"nuid": "c5d6e7f8-a9b0-1234-cdef-345678901234"
222+
}
223+
},
224+
"outputs": [],
225+
"execution_count": 0
226+
},
227+
{
228+
"cell_type": "markdown",
229+
"source": [
230+
"**Sample - validating the record count via change feed**"
231+
],
232+
"metadata": {
233+
"application/vnd.databricks.v1+cell": {
234+
"title": "",
235+
"showTitle": false,
236+
"inputWidgets": {},
237+
"nuid": "d6e7f8a9-b0c1-2345-defa-456789012345"
238+
}
239+
}
240+
},
241+
{
242+
"cell_type": "code",
243+
"source": [
244+
"print(\"Starting validation via change feed: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))\nchangeFeedCfg = {\n \"spark.cosmos.accountEndpoint\": cosmosEndpoint,\n \"spark.cosmos.accountKey\": cosmosMasterKey,\n \"spark.cosmos.database\": \"SampleDatabase\",\n \"spark.cosmos.container\": \"GreenTaxiRecords\",\n \"spark.cosmos.read.partitioning.strategy\": \"Restrictive\",\n \"spark.cosmos.read.inferSchema.enabled\" : \"false\",\n \"spark.cosmos.changeFeed.startFrom\" : \"Beginning\",\n \"spark.cosmos.changeFeed.mode\" : \"Incremental\"\n}\nchangeFeed_df = spark.read.format(\"cosmos.oltp.changeFeed\").options(**changeFeedCfg).load()\ncount_changeFeed = changeFeed_df.count()\nprint(\"Number of records retrieved via change feed: \", count_changeFeed) \nprint(\"Finished validation via change feed: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))\n\nassert count_source == count_changeFeed"
245+
],
246+
"metadata": {
247+
"application/vnd.databricks.v1+cell": {
248+
"title": "",
249+
"showTitle": false,
250+
"inputWidgets": {},
251+
"nuid": "e7f8a9b0-c1d2-3456-efab-567890123456"
252+
}
253+
},
254+
"outputs": [],
255+
"execution_count": 0
256+
},
257+
{
258+
"cell_type": "markdown",
259+
"source": [
260+
"**Sample - bulk deleting documents with throughput bucket and validating document count afterwards**"
261+
],
262+
"metadata": {
263+
"application/vnd.databricks.v1+cell": {
264+
"title": "",
265+
"showTitle": false,
266+
"inputWidgets": {},
267+
"nuid": "f8a9b0c1-d2e3-4567-fabc-678901234567"
268+
}
269+
}
270+
},
271+
{
272+
"cell_type": "code",
273+
"source": [
274+
"import math\n\nprint(\"Starting to identify to be deleted documents: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))\nreadCfg = {\n \"spark.cosmos.accountEndpoint\": cosmosEndpoint,\n \"spark.cosmos.accountKey\": cosmosMasterKey,\n \"spark.cosmos.database\": \"SampleDatabase\",\n \"spark.cosmos.container\": \"GreenTaxiRecords\",\n \"spark.cosmos.read.partitioning.strategy\": \"Restrictive\",\n \"spark.cosmos.read.inferSchema.enabled\" : \"false\",\n}\n\ntoBeDeleted_df = spark.read.format(\"cosmos.oltp\").options(**readCfg).load().limit(100_000)\nprint(\"Number of records to be deleted: \", toBeDeleted_df.count()) \n\nprint(\"Starting to bulk delete documents: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))\ndeleteCfg = {\n \"spark.cosmos.accountEndpoint\": cosmosEndpoint,\n \"spark.cosmos.accountKey\": cosmosMasterKey,\n \"spark.cosmos.database\": \"SampleDatabase\",\n \"spark.cosmos.container\": \"GreenTaxiRecords\",\n \"spark.cosmos.write.strategy\": \"ItemDelete\",\n \"spark.cosmos.write.bulk.enabled\": \"true\",\n \"spark.cosmos.throughputControl.enabled\": \"true\",\n \"spark.cosmos.throughputControl.name\": \"NYCGreenTaxiDataDelete\",\n \"spark.cosmos.throughputControl.throughputBucket\": \"1\",\n}\ntoBeDeleted_df \\\n .write \\\n .format(\"cosmos.oltp\") \\\n .mode(\"Append\") \\\n .options(**deleteCfg) \\\n .save()\nprint(\"Finished deleting documents: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))\n\nprint(\"Starting count validation via query: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))\ncount_query_schema=StructType(fields=[StructField(\"Count\", LongType(), True)])\nreadCfg[\"spark.cosmos.read.customQuery\"] = \"SELECT COUNT(0) AS Count FROM c\"\nquery_df = spark.read.format(\"cosmos.oltp\").schema(count_query_schema).options(**readCfg).load()\ncount_query = query_df.select(F.sum(\"Count\").alias(\"TotalCount\")).first()[\"TotalCount\"]\nprint(\"Number of records retrieved via query: \", count_query) \nprint(\"Finished count validation via query: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))\n\nassert max(0, count_source - 100_000) == count_query"
275+
],
276+
"metadata": {
277+
"application/vnd.databricks.v1+cell": {
278+
"title": "",
279+
"showTitle": false,
280+
"inputWidgets": {},
281+
"nuid": "a9b0c1d2-e3f4-5678-abcd-789012345678"
282+
}
283+
},
284+
"outputs": [],
285+
"execution_count": 0
286+
},
287+
{
288+
"cell_type": "markdown",
289+
"source": [
290+
"**Sample - showing the existing Containers**"
291+
],
292+
"metadata": {
293+
"application/vnd.databricks.v1+cell": {
294+
"title": "",
295+
"showTitle": false,
296+
"inputWidgets": {},
297+
"nuid": "b0c1d2e3-f4a5-6789-bcde-890123456789"
298+
}
299+
}
300+
},
301+
{
302+
"cell_type": "code",
303+
"source": [
304+
"%sql\nSHOW TABLES FROM cosmosCatalog.SampleDatabase"
305+
],
306+
"metadata": {
307+
"application/vnd.databricks.v1+cell": {
308+
"title": "",
309+
"showTitle": false,
310+
"inputWidgets": {},
311+
"nuid": "c1d2e3f4-a5b6-7890-cdef-901234567890"
312+
}
313+
},
314+
"outputs": [],
315+
"execution_count": 0
316+
},
317+
{
318+
"cell_type": "code",
319+
"source": [
320+
"df_Tables = spark.sql('SHOW TABLES FROM cosmosCatalog.SampleDatabase')\nassert df_Tables.count() == 2"
321+
],
322+
"metadata": {
323+
"application/vnd.databricks.v1+cell": {
324+
"title": "",
325+
"showTitle": false,
326+
"inputWidgets": {},
327+
"nuid": "d2e3f4a5-b6c7-8901-defa-012345678901"
328+
}
329+
},
330+
"outputs": [],
331+
"execution_count": 0
332+
}
333+
],
334+
"metadata": {
335+
"application/vnd.databricks.v1+notebook": {
336+
"notebookName": "04_ThroughputBucket",
337+
"dashboards": [],
338+
"notebookMetadata": {
339+
"pythonIndentUnit": 2
340+
},
341+
"language": "python",
342+
"widgets": {},
343+
"notebookOrigID": 86486029782770
344+
}
345+
},
346+
"nbformat": 4,
347+
"nbformat_minor": 0
348+
}

0 commit comments

Comments
 (0)