Skip to content

Commit 8930ce8

Browse files
Copilotswathipil
andcommitted
Complete TROUBLESHOOTING.md with async operations and enhanced content
Co-authored-by: swathipil <76007337+swathipil@users.noreply.github.com>
1 parent 03cbdbe commit 8930ce8

1 file changed

Lines changed: 360 additions & 3 deletions

File tree

sdk/servicebus/azure-servicebus/TROUBLESHOOTING.md

Lines changed: 360 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ This troubleshooting guide contains instructions to diagnose frequently encounte
3939
* [Threading and concurrency issues](#threading-and-concurrency-issues)
4040
* [Thread safety limitations](#thread-safety-limitations)
4141
* [Async/await best practices](#asyncawait-best-practices)
42+
* [Troubleshooting async operations](#troubleshooting-async-operations)
43+
* [Event loop issues](#event-loop-issues)
44+
* [Async context manager problems](#async-context-manager-problems)
45+
* [Mixing sync and async code](#mixing-sync-and-async-code)
4246
* [Frequently asked questions](#frequently-asked-questions)
4347
* [Get additional help](#get-additional-help)
4448

@@ -127,6 +131,34 @@ The Service Bus APIs generate the following exceptions in `azure.servicebus.exce
127131

128132
- **AutoLockRenewTimeout:** The time allocated to renew the message or session lock has elapsed. You could re-register the object that wants be auto lock renewed or extend the timeout in advance.
129133

134+
#### Python-Specific Considerations
135+
136+
- **ImportError/ModuleNotFoundError:** Common when Azure Service Bus dependencies are not properly installed. Ensure you have installed the correct package version:
137+
```bash
138+
pip install azure-servicebus
139+
```
140+
141+
- **TypeError:** Often occurs when passing incorrect data types to Service Bus methods:
142+
```python
143+
# Incorrect: passing string instead of ServiceBusMessage
144+
sender.send_messages("Hello World") # This will fail
145+
146+
# Correct: create ServiceBusMessage objects
147+
from azure.servicebus import ServiceBusMessage
148+
message = ServiceBusMessage("Hello World")
149+
sender.send_messages(message)
150+
```
151+
152+
- **ConnectionError/socket.gaierror:** Network-level errors that may require checking DNS resolution and network connectivity:
153+
```python
154+
import socket
155+
try:
156+
# Test DNS resolution
157+
socket.gethostbyname("your-namespace.servicebus.windows.net")
158+
except socket.gaierror as e:
159+
print(f"DNS resolution failed: {e}")
160+
```
161+
130162
### Timeouts
131163

132164
There are various timeouts a user should be aware of within the library:
@@ -907,6 +939,249 @@ async def handle_async_errors():
907939
- Mixing blocking and non-blocking operations incorrectly
908940
- Not handling connection failures in multi-threaded scenarios
909941

942+
## Troubleshooting async operations
943+
944+
### Event loop issues
945+
946+
Python's asyncio event loop can cause issues when not properly managed in Service Bus async operations.
947+
948+
**Common symptoms:**
949+
- `RuntimeError: no running event loop`
950+
- `RuntimeError: cannot be called from a running event loop`
951+
- Async operations hanging indefinitely
952+
953+
**Resolution:**
954+
955+
1. **Proper event loop management:**
956+
```python
957+
import asyncio
958+
from azure.servicebus.aio import ServiceBusClient
959+
960+
async def main():
961+
async with ServiceBusClient.from_connection_string(connection_string) as client:
962+
async with client.get_queue_sender(queue_name) as sender:
963+
message = ServiceBusMessage("Hello async world")
964+
await sender.send_messages(message)
965+
966+
# Correct way to run async Service Bus code
967+
if __name__ == "__main__":
968+
asyncio.run(main())
969+
```
970+
971+
2. **Handling existing event loops (e.g., in Jupyter notebooks):**
972+
```python
973+
import asyncio
974+
import nest_asyncio
975+
976+
# In environments like Jupyter where an event loop is already running
977+
nest_asyncio.apply()
978+
979+
async def notebook_friendly_function():
980+
async with ServiceBusClient.from_connection_string(connection_string) as client:
981+
# Your async Service Bus operations
982+
pass
983+
984+
# Can be called directly in Jupyter
985+
await notebook_friendly_function()
986+
```
987+
988+
3. **Event loop in multi-threaded applications:**
989+
```python
990+
import asyncio
991+
import threading
992+
from concurrent.futures import ThreadPoolExecutor
993+
994+
def run_async_in_thread(connection_string, queue_name):
995+
"""Run async Service Bus operations in a separate thread"""
996+
async def async_operations():
997+
async with ServiceBusClient.from_connection_string(connection_string) as client:
998+
async with client.get_queue_receiver(queue_name) as receiver:
999+
messages = await receiver.receive_messages(max_message_count=10)
1000+
for message in messages:
1001+
print(f"Received: {message}")
1002+
await receiver.complete_message(message)
1003+
1004+
# Create new event loop for this thread
1005+
asyncio.run(async_operations())
1006+
1007+
# Use ThreadPoolExecutor for better management
1008+
with ThreadPoolExecutor(max_workers=3) as executor:
1009+
futures = [
1010+
executor.submit(run_async_in_thread, connection_string, f"queue_{i}")
1011+
for i in range(3)
1012+
]
1013+
1014+
for future in futures:
1015+
future.result() # Wait for completion
1016+
```
1017+
1018+
### Async context manager problems
1019+
1020+
Improper use of async context managers can lead to resource leaks and connection issues.
1021+
1022+
**Common mistakes:**
1023+
1024+
1. **Not using async context managers:**
1025+
```python
1026+
# DON'T DO THIS
1027+
client = ServiceBusClient.from_connection_string(connection_string)
1028+
sender = client.get_queue_sender(queue_name)
1029+
await sender.send_messages(message)
1030+
# Resources not properly closed
1031+
1032+
# DO THIS INSTEAD
1033+
async with ServiceBusClient.from_connection_string(connection_string) as client:
1034+
async with client.get_queue_sender(queue_name) as sender:
1035+
await sender.send_messages(message)
1036+
```
1037+
1038+
2. **Improper exception handling in async context:**
1039+
```python
1040+
async def proper_exception_handling():
1041+
"""Handle exceptions properly in async context managers"""
1042+
try:
1043+
async with ServiceBusClient.from_connection_string(connection_string) as client:
1044+
async with client.get_queue_receiver(queue_name) as receiver:
1045+
messages = await receiver.receive_messages(max_message_count=10)
1046+
1047+
for message in messages:
1048+
try:
1049+
# Process message
1050+
await process_message_async(message)
1051+
await receiver.complete_message(message)
1052+
except Exception as processing_error:
1053+
print(f"Processing failed: {processing_error}")
1054+
await receiver.abandon_message(message)
1055+
1056+
except ServiceBusError as sb_error:
1057+
print(f"Service Bus error: {sb_error}")
1058+
except Exception as general_error:
1059+
print(f"Unexpected error: {general_error}")
1060+
```
1061+
1062+
3. **Resource cleanup in long-running async operations:**
1063+
```python
1064+
import asyncio
1065+
from contextlib import AsyncExitStack
1066+
1067+
async def long_running_processor():
1068+
"""Properly manage resources in long-running async operations"""
1069+
async with AsyncExitStack() as stack:
1070+
client = await stack.enter_async_context(
1071+
ServiceBusClient.from_connection_string(connection_string)
1072+
)
1073+
receiver = await stack.enter_async_context(
1074+
client.get_queue_receiver(queue_name)
1075+
)
1076+
1077+
# Long-running processing loop
1078+
while True:
1079+
try:
1080+
messages = await receiver.receive_messages(
1081+
max_message_count=10,
1082+
max_wait_time=30
1083+
)
1084+
1085+
if not messages:
1086+
await asyncio.sleep(1)
1087+
continue
1088+
1089+
# Process messages with proper error handling
1090+
await process_messages_batch(receiver, messages)
1091+
1092+
except KeyboardInterrupt:
1093+
print("Shutting down gracefully...")
1094+
break
1095+
except Exception as e:
1096+
print(f"Error in processing loop: {e}")
1097+
await asyncio.sleep(5) # Brief pause before retry
1098+
1099+
async def process_messages_batch(receiver, messages):
1100+
"""Process a batch of messages with individual error handling"""
1101+
for message in messages:
1102+
try:
1103+
await process_single_message(message)
1104+
await receiver.complete_message(message)
1105+
except Exception as e:
1106+
print(f"Failed to process message {message.message_id}: {e}")
1107+
await receiver.abandon_message(message)
1108+
```
1109+
1110+
### Mixing sync and async code
1111+
1112+
Mixing synchronous and asynchronous Service Bus operations can cause issues.
1113+
1114+
**Common problems:**
1115+
1116+
1. **Calling async methods without await:**
1117+
```python
1118+
# WRONG - This returns a coroutine, doesn't actually send
1119+
client = ServiceBusClient.from_connection_string(connection_string)
1120+
sender = client.get_queue_sender(queue_name)
1121+
sender.send_messages(message) # Missing 'await'
1122+
1123+
# CORRECT
1124+
async with ServiceBusClient.from_connection_string(connection_string) as client:
1125+
async with client.get_queue_sender(queue_name) as sender:
1126+
await sender.send_messages(message)
1127+
```
1128+
1129+
2. **Using sync and async clients together:**
1130+
```python
1131+
# Avoid mixing sync and async clients in the same application
1132+
# Choose one pattern and stick with it
1133+
1134+
# Option 1: Pure async
1135+
async def async_pattern():
1136+
async with ServiceBusClient.from_connection_string(connection_string) as client:
1137+
# All operations are async
1138+
pass
1139+
1140+
# Option 2: Pure sync
1141+
def sync_pattern():
1142+
with ServiceBusClient.from_connection_string(connection_string) as client:
1143+
# All operations are sync
1144+
pass
1145+
```
1146+
1147+
3. **Proper integration with async frameworks (FastAPI, aiohttp, etc.):**
1148+
```python
1149+
# Example with FastAPI
1150+
from fastapi import FastAPI, BackgroundTasks
1151+
from azure.servicebus.aio import ServiceBusClient
1152+
1153+
app = FastAPI()
1154+
1155+
# Global client for reuse (properly managed)
1156+
class ServiceBusManager:
1157+
def __init__(self):
1158+
self.client = None
1159+
1160+
async def start(self):
1161+
self.client = ServiceBusClient.from_connection_string(connection_string)
1162+
1163+
async def stop(self):
1164+
if self.client:
1165+
await self.client.close()
1166+
1167+
sb_manager = ServiceBusManager()
1168+
1169+
@app.on_event("startup")
1170+
async def startup_event():
1171+
await sb_manager.start()
1172+
1173+
@app.on_event("shutdown")
1174+
async def shutdown_event():
1175+
await sb_manager.stop()
1176+
1177+
@app.post("/send-message")
1178+
async def send_message(message_content: str):
1179+
async with sb_manager.client.get_queue_sender(queue_name) as sender:
1180+
message = ServiceBusMessage(message_content)
1181+
await sender.send_messages(message)
1182+
return {"status": "sent"}
1183+
```
1184+
9101185
## Frequently asked questions
9111186

9121187
### Q: Why am I getting connection timeout errors?
@@ -938,17 +1213,99 @@ Check the `dead_letter_reason` and `dead_letter_error_description` properties on
9381213
### Q: How do I process messages faster?
9391214

9401215
**A:** Consider:
941-
- Using concurrent message processing
1216+
- Using concurrent message processing (with separate client instances per thread/task)
9421217
- Optimizing your message processing logic
943-
- Using `prefetch_count` to pre-fetch messages
944-
- Scaling out with multiple receivers
1218+
- Using `prefetch_count` to pre-fetch messages (use with caution - see note below)
1219+
- Scaling out with multiple receivers (on different clients)
1220+
1221+
**Note on prefetch_count:** Be careful when using `prefetch_count` as it can cause message lock expiration if processing takes too long. The client cannot extend locks for prefetched messages.
9451222

9461223
### Q: What's the difference between `complete_message()` and `abandon_message()`?
9471224

9481225
**A:**
9491226
- `complete_message()`: Removes the message from the queue/subscription (successful processing)
9501227
- `abandon_message()`: Returns the message to the queue/subscription for reprocessing
9511228

1229+
**Important:** Due to Python AMQP implementation limitations, these operations return immediately without waiting for service acknowledgment. Implement idempotent processing to handle potential redelivery.
1230+
1231+
### Q: How do I handle message ordering?
1232+
1233+
**A:**
1234+
- Use **sessions** for guaranteed message ordering within a session
1235+
- For partitioned entities, messages with the same partition key maintain order
1236+
- Regular queues do not guarantee strict FIFO ordering
1237+
1238+
```python
1239+
# Using sessions for ordered processing
1240+
with client.get_queue_receiver(queue_name, session_id="order_123") as session_receiver:
1241+
messages = session_receiver.receive_messages(max_message_count=10)
1242+
1243+
# Messages within this session are processed in order
1244+
for message in messages:
1245+
process_message_in_order(message)
1246+
session_receiver.complete_message(message)
1247+
```
1248+
1249+
### Q: How do I implement retry logic for transient failures?
1250+
1251+
**A:**
1252+
```python
1253+
import time
1254+
import random
1255+
from azure.servicebus.exceptions import ServiceBusError
1256+
1257+
def exponential_backoff_retry(operation, max_retries=3):
1258+
"""Implement exponential backoff retry for Service Bus operations"""
1259+
for attempt in range(max_retries + 1):
1260+
try:
1261+
return operation()
1262+
except ServiceBusError as e:
1263+
if attempt == max_retries:
1264+
raise
1265+
1266+
# Check if error is retryable
1267+
if hasattr(e, 'reason'):
1268+
retryable_reasons = ['ServiceTimeout', 'ServerBusy', 'ServiceCommunicationProblem']
1269+
if e.reason not in retryable_reasons:
1270+
raise
1271+
1272+
# Calculate backoff delay
1273+
delay = (2 ** attempt) + random.uniform(0, 1)
1274+
print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f} seconds...")
1275+
time.sleep(delay)
1276+
1277+
# Usage example
1278+
def send_with_retry(sender, message):
1279+
return exponential_backoff_retry(lambda: sender.send_messages(message))
1280+
```
1281+
1282+
### Q: How do I monitor message processing performance?
1283+
1284+
**A:**
1285+
```python
1286+
import time
1287+
import logging
1288+
from contextlib import contextmanager
1289+
1290+
@contextmanager
1291+
def message_processing_timer(message_id):
1292+
"""Context manager to time message processing"""
1293+
start_time = time.time()
1294+
try:
1295+
yield
1296+
finally:
1297+
processing_time = time.time() - start_time
1298+
logging.info(f"Message {message_id} processed in {processing_time:.3f}s")
1299+
1300+
# Usage
1301+
def process_with_monitoring(receiver, message):
1302+
with message_processing_timer(message.message_id):
1303+
# Your processing logic
1304+
result = process_message(message)
1305+
receiver.complete_message(message)
1306+
return result
1307+
```
1308+
9521309
## Get additional help
9531310

9541311
Additional information on ways to reach out for support can be found in the [SUPPORT.md](https://github.com/Azure/azure-sdk-for-python/blob/main/SUPPORT.md) at the root of the repo.

0 commit comments

Comments
 (0)