Skip to content

Commit 98cdaee

Browse files
jiteshsoniHeartSaVioR
authored andcommitted
[SPARK-55450][SS][PYTHON][DOCS] Document admission control in PySpark streaming data sources
### What changes were proposed in this pull request? This PR adds documentation and an example for admission control in PySpark custom streaming data sources (SPARK-55304). Changes include: 1. **Updated tutorial documentation** (`python/docs/source/tutorial/sql/python_data_source.rst`): - Added "Admission Control for Streaming Readers" section - Documents `getDefaultReadLimit()` returning `ReadMaxRows(n)` to limit batch size - Shows how `latestOffset(start, limit)` respects the `ReadLimit` parameter 2. **Example file** (`examples/src/main/python/sql/streaming/structured_blockchain_admission_control.py`): - Demonstrates admission control via `getDefaultReadLimit()` and `latestOffset()` - Simulates blockchain data source with controlled batch sizes (20 blocks per batch) - Simple, focused example showing backpressure management ### Why are the changes needed? Users need documentation and practical examples to implement admission control in custom streaming sources (introduced in SPARK-55304). ### Does this PR introduce _any_ user-facing change? No. Documentation and examples only. ### How was this patch tested? **Testing approach:** - Ran the example on Databricks Dogfood Staging (DBR 17.3 / Spark 4.0) - Used the Spark Streaming UI to verify admission control works correctly **Test notebook:** [pr_54807_admission_control_notebook](https://dogfood.staging.databricks.com/editor/notebooks/1113954931051543?o=6051921418418893#command/7790625346196924) **What was verified:** 1. **Batch sizes:** Each micro-batch processed exactly 20 blocks (admission control working) 2. **Consistent behavior:** 79 batches completed in ~28 seconds, all with 20 rows 3. **Stream reader:** `PythonMicroBatchStreamWithAdmissionControl` active in Streaming UI **Sample batch output:** ```json { "batchId": 78, "numInputRows": 20, "sources": [{ "description": "PythonMicroBatchStreamWithAdmissionControl", "startOffset": {"block_number": 1560}, "endOffset": {"block_number": 1580}, "numInputRows": 20 }] } ``` ### Was this patch authored or co-authored using generative AI tooling? Yes (Claude Opus 4.5) 🤖 Generated with [Claude Code](https://claude.ai/code) Closes #54807 from jiteshsoni/SPARK-55450-admission-control-docs. Lead-authored-by: Jitesh Soni <get2jitesh@gmail.com> Co-authored-by: Canadian Data Guy <get2jitesh@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
1 parent 9dbe381 commit 98cdaee

File tree

2 files changed

+252
-0
lines changed

2 files changed

+252
-0
lines changed
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
"""
19+
Demonstrates admission control using a simulated blockchain data source.
20+
21+
This example shows how to build a custom streaming data source that
22+
simulates reading blockchain blocks while respecting admission control
23+
limits using getDefaultReadLimit() and ReadMaxRows(20).
24+
25+
Key concepts demonstrated:
26+
- getDefaultReadLimit() returning ReadMaxRows(20) to limit blocks per micro-batch
27+
- latestOffset(start, limit) respecting the ReadLimit parameter
28+
- Controlled data ingestion rate for backpressure management
29+
30+
Usage:
31+
bin/spark-submit examples/src/main/python/sql/streaming/\\
32+
structured_blockchain_admission_control.py
33+
34+
Expected output:
35+
Each micro-batch processes up to 20 blocks (controlled by admission control):
36+
Batch 0: blocks 0-19
37+
Batch 1: blocks 20-39
38+
Batch 2: blocks 40-59
39+
...
40+
The final batch may contain fewer than 20 blocks when the chain is exhausted.
41+
"""
42+
43+
import hashlib
44+
import time
45+
from typing import Iterator, Sequence, Tuple
46+
47+
from pyspark.sql import SparkSession
48+
from pyspark.sql.datasource import DataSource, DataSourceStreamReader, InputPartition
49+
from pyspark.sql.streaming.datasource import ReadAllAvailable, ReadLimit, ReadMaxRows
50+
from pyspark.sql.types import StructType
51+
52+
53+
class BlockPartition(InputPartition):
54+
"""Partition representing a range of blockchain blocks to read."""
55+
56+
def __init__(self, start_block: int, end_block: int):
57+
self.start_block = start_block
58+
self.end_block = end_block
59+
60+
61+
class BlockchainStreamReader(DataSourceStreamReader):
62+
"""
63+
A streaming reader that simulates reading blockchain blocks.
64+
65+
Demonstrates admission control via getDefaultReadLimit() which limits
66+
the number of blocks processed per micro-batch.
67+
"""
68+
69+
CHAIN_HEIGHT = 10000 # Total blocks available
70+
71+
def initialOffset(self) -> dict:
72+
"""Return the starting block number for new queries."""
73+
return {"block_number": 0}
74+
75+
def getDefaultReadLimit(self) -> ReadLimit:
76+
"""
77+
Limit each micro-batch to 20 blocks.
78+
79+
This controls the data ingestion rate, useful for:
80+
- Preventing memory issues with large batches
81+
- Rate limiting when reading from external APIs
82+
- Backpressure management
83+
"""
84+
return ReadMaxRows(20)
85+
86+
def latestOffset(self, start: dict, limit: ReadLimit) -> dict:
87+
"""
88+
Compute the ending block number respecting admission control.
89+
90+
Parameters
91+
----------
92+
start : dict
93+
Current offset with 'block_number' key
94+
limit : ReadLimit
95+
Engine-provided limit on data consumption
96+
97+
Returns
98+
-------
99+
dict
100+
Ending offset for this micro-batch
101+
"""
102+
start_block = start["block_number"]
103+
104+
if start_block >= self.CHAIN_HEIGHT:
105+
return start # No more data
106+
107+
if isinstance(limit, ReadMaxRows):
108+
end_block = min(start_block + limit.max_rows, self.CHAIN_HEIGHT)
109+
elif isinstance(limit, ReadAllAvailable):
110+
end_block = self.CHAIN_HEIGHT
111+
else:
112+
raise ValueError(f"Unexpected ReadLimit type: {type(limit)}")
113+
114+
return {"block_number": end_block}
115+
116+
def partitions(self, start: dict, end: dict) -> Sequence[InputPartition]:
117+
"""Create a single partition for the block range."""
118+
start_block = start["block_number"]
119+
end_block = end["block_number"]
120+
121+
if start_block >= end_block:
122+
return []
123+
124+
return [BlockPartition(start_block, end_block)]
125+
126+
def read(self, partition: InputPartition) -> Iterator[Tuple]:
127+
"""
128+
Generate simulated blockchain block data.
129+
130+
Each block contains:
131+
- block_number: Sequential block identifier
132+
- block_hash: Simulated hash based on block number
133+
- timestamp: Simulated timestamp
134+
- transaction_count: Simulated transaction count
135+
"""
136+
assert isinstance(partition, BlockPartition)
137+
138+
for block_num in range(partition.start_block, partition.end_block):
139+
block_hash = hashlib.sha256(str(block_num).encode()).hexdigest()[:16]
140+
timestamp = 1700000000 + (block_num * 12)
141+
tx_count = (block_num % 100) + 1
142+
143+
yield (block_num, block_hash, timestamp, tx_count)
144+
145+
def commit(self, end: dict) -> None:
146+
"""Cleanup after batch completion."""
147+
pass
148+
149+
150+
class BlockchainDataSource(DataSource):
151+
"""Data source that creates BlockchainStreamReader instances."""
152+
153+
@classmethod
154+
def name(cls) -> str:
155+
return "blockchain_example"
156+
157+
def schema(self) -> str:
158+
return "block_number INT, block_hash STRING, timestamp LONG, transaction_count INT"
159+
160+
def streamReader(self, schema: StructType) -> DataSourceStreamReader:
161+
return BlockchainStreamReader()
162+
163+
164+
def main() -> None:
165+
"""Run blockchain streaming example demonstrating admission control."""
166+
spark = SparkSession.builder.appName("BlockchainAdmissionControl").getOrCreate()
167+
168+
spark.dataSource.register(BlockchainDataSource)
169+
170+
print("\n" + "=" * 70)
171+
print("BLOCKCHAIN STREAMING WITH ADMISSION CONTROL")
172+
print("=" * 70)
173+
print("\nData Source: Simulated blockchain with 10000 blocks")
174+
print("Admission Control: getDefaultReadLimit() returns ReadMaxRows(20)")
175+
print("Expected: Each batch processes up to 20 blocks")
176+
print()
177+
178+
df = spark.readStream.format("blockchain_example").load()
179+
180+
query = (
181+
df.writeStream
182+
.format("console")
183+
.queryName("admission_control_test")
184+
.start()
185+
)
186+
187+
print("Streaming query started. Check the Streaming UI to verify:")
188+
print("- Each full batch should process 20 blocks")
189+
print("- The final batch may be smaller when fewer than 20 blocks remain")
190+
print()
191+
192+
time.sleep(30)
193+
194+
query.stop()
195+
print("\nQuery stopped - check Streaming UI for results")
196+
print("=" * 70)
197+
198+
spark.stop()
199+
200+
201+
if __name__ == "__main__":
202+
main()

python/docs/source/tutorial/sql/python_data_source.rst

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,56 @@ This is the same dummy streaming reader that generates 2 rows every batch implem
338338
"""
339339
pass
340340
341+
Admission Control for Streaming Readers
342+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
343+
344+
To limit the amount of data processed per micro-batch, implement
345+
``getDefaultReadLimit()`` and have ``latestOffset(start, limit)`` honor the
346+
engine-provided ``ReadLimit``:
347+
348+
.. code-block:: python
349+
350+
from pyspark.sql.streaming.datasource import ReadAllAvailable, ReadLimit, ReadMaxRows
351+
352+
class MyStreamReader(DataSourceStreamReader):
353+
354+
def getDefaultReadLimit(self) -> ReadLimit:
355+
"""
356+
Limit each micro-batch to at most 20 rows.
357+
358+
This value is just an example; in practice, configure the limit
359+
based on source options (e.g., self.options.get("maxRowsPerBatch")).
360+
"""
361+
return ReadMaxRows(20)
362+
363+
def latestOffset(self, start: dict, limit: ReadLimit) -> dict:
364+
"""
365+
Return the latest offset, respecting the provided limit.
366+
"""
367+
current = start["offset"]
368+
369+
if isinstance(limit, ReadMaxRows):
370+
end = min(current + limit.max_rows, self.max_available)
371+
elif isinstance(limit, ReadAllAvailable):
372+
end = self.max_available
373+
else:
374+
raise ValueError(f"Unexpected ReadLimit type: {type(limit)}")
375+
376+
return {"offset": end}
377+
378+
When Spark uses the default ``ReadMaxRows(20)`` limit, each micro-batch
379+
reads at most 20 rows, depending on the available rows. If Spark passes
380+
``ReadAllAvailable``, the reader should return all remaining rows instead.
381+
382+
This is useful for:
383+
384+
- **Controlling data ingestion rate**: Prevent overwhelming downstream systems
385+
- **Memory management**: Limit batch sizes to avoid out-of-memory errors
386+
- **Backpressure handling**: Process data at a sustainable rate
387+
388+
For a complete working example, see:
389+
``examples/src/main/python/sql/streaming/structured_blockchain_admission_control.py``
390+
341391
Implement a Streaming Writer
342392
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
343393

0 commit comments

Comments
 (0)