Skip to content

Commit e75da76

Browse files
authored
fix: Streaming in azure_ai_foundry.py (#55)
1 parent 4ddf8d5 commit e75da76

1 file changed

Lines changed: 64 additions & 13 deletions

File tree

pipelines/azure/azure_ai_foundry.py

Lines changed: 64 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
author_url: https://github.com/owndev/
55
project_url: https://github.com/owndev/Open-WebUI-Functions
66
funding_url: https://github.com/sponsors/owndev
7-
version: 2.3.2
7+
version: 2.3.3
88
license: Apache License 2.0
99
description: A pipeline for interacting with Azure AI services, enabling seamless communication with various AI models via configurable headers and robust error handling. This includes support for Azure OpenAI models as well as other Azure AI models by dynamically managing headers and request configurations.
1010
features:
@@ -389,7 +389,11 @@ def pipes(self) -> List[Dict[str, str]]:
389389
return [{"id": "Azure AI", "name": "Azure AI"}]
390390

391391
async def stream_processor(
392-
self, content: aiohttp.StreamReader, __event_emitter__=None
392+
self,
393+
content: aiohttp.StreamReader,
394+
__event_emitter__=None,
395+
response: Optional[aiohttp.ClientResponse] = None,
396+
session: Optional[aiohttp.ClientSession] = None,
393397
) -> AsyncIterator[bytes]:
394398
"""
395399
Process streaming content and properly handle completion status updates.
@@ -425,6 +429,19 @@ async def stream_processor(
425429
"data": {"description": f"Error: {str(e)}", "done": True},
426430
}
427431
)
432+
finally:
433+
# Always attempt to close response and session to avoid resource leaks
434+
try:
435+
if response:
436+
response.close()
437+
except Exception:
438+
pass
439+
try:
440+
if session:
441+
await session.close()
442+
except Exception:
443+
# Suppress close-time errors (e.g., SSL shutdown timeouts)
444+
pass
428445

429446
async def pipe(
430447
self, body: Dict[str, Any], __event_emitter__=None
@@ -531,8 +548,29 @@ async def pipe(
531548
headers=headers,
532549
)
533550

534-
# Check if response is SSE
535-
if "text/event-stream" in request.headers.get("Content-Type", ""):
551+
# If the server returned an error status, parse and raise before streaming logic
552+
if request.status >= 400:
553+
err_ct = (request.headers.get("Content-Type") or "").lower()
554+
if "json" in err_ct:
555+
try:
556+
response = await request.json()
557+
except Exception as e:
558+
# In error status, provider may mislabel content-type; keep log at debug to avoid noise
559+
log.debug(
560+
f"Failed to parse JSON error body despite JSON content-type: {e}"
561+
)
562+
response = await request.text()
563+
else:
564+
response = await request.text()
565+
566+
request.raise_for_status()
567+
568+
# Auto-detect streaming: either requested via body or indicated by response headers
569+
content_type_header = (request.headers.get("Content-Type") or "").lower()
570+
wants_stream = bool(filtered_body.get("stream", False))
571+
is_sse_header = "text/event-stream" in content_type_header
572+
573+
if wants_stream or is_sse_header:
536574
streaming = True
537575

538576
# Send status update for successful streaming connection
@@ -547,19 +585,32 @@ async def pipe(
547585
}
548586
)
549587

588+
# Ensure correct SSE headers are set for downstream consumers
589+
sse_headers = dict(request.headers)
590+
sse_headers["Content-Type"] = "text/event-stream"
591+
sse_headers.pop("Content-Length", None)
592+
550593
return StreamingResponse(
551-
self.stream_processor(request.content, __event_emitter__),
552-
status_code=request.status,
553-
headers=dict(request.headers),
554-
background=BackgroundTask(
555-
cleanup_response, response=request, session=session
594+
self.stream_processor(
595+
request.content,
596+
__event_emitter__=__event_emitter__,
597+
response=request,
598+
session=session,
556599
),
600+
status_code=request.status,
601+
headers=sse_headers,
557602
)
558603
else:
559-
try:
560-
response = await request.json()
561-
except Exception as e:
562-
log.error(f"Error parsing JSON response: {e}")
604+
# Parse non-stream response based on content-type without noisy error logs
605+
if "json" in content_type_header:
606+
try:
607+
response = await request.json()
608+
except Exception as e:
609+
log.debug(
610+
f"Failed to parse JSON response despite JSON content-type: {e}"
611+
)
612+
response = await request.text()
613+
else:
563614
response = await request.text()
564615

565616
request.raise_for_status()

0 commit comments

Comments
 (0)