Skip to content

Commit 5dd56c5

Browse files
committed
tests/sequential - using fastapi
1 parent 63d0a16 commit 5dd56c5

5 files changed

Lines changed: 157 additions & 9 deletions

File tree

src/aiopenapi3/request.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -510,8 +510,31 @@ async def aiter_json(response: httpx.Response) -> AsyncIterator["JSON"]:
510510
"""
511511
Server-Sent Events (SSE)
512512
https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events
513+
https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream
514+
https://github.com/mpetazzoni/sseclient/blob/main/sseclient/__init__.py#L36
513515
"""
514516

517+
async def aiter_json(response: httpx.Response) -> AsyncIterator["JSON"]:
518+
519+
async for chunk in response.aiter_text():
520+
data_ = ""
521+
for line in chunk.splitlines(keepends=True):
522+
data_ += line
523+
if not data_.endswith(("\r\r", "\n\n", "\r\n\r\n")):
524+
continue
525+
526+
v = dict()
527+
for l in data_.splitlines(keepends=False):
528+
if l == "":
529+
continue
530+
cmd, _, value = l.partition(":")
531+
if cmd not in ("event", "data", "id", "retry", ""):
532+
# ignore
533+
continue
534+
v[cmd or "comment"] = value.lstrip()
535+
data_ = ""
536+
yield v
537+
elif False:
515538
import ijson
516539

517540
class ReadEventStream:
@@ -541,8 +564,7 @@ async def aiter_json(response: httpx.Response) -> AsyncIterator["JSON"]:
541564
yield AsyncRequestBase.Sequencer(headers, stream, schema_.get_type())
542565
finally:
543566
"""__aexit__"""
544-
if not result.is_closed:
545-
await result.aclose()
567+
await session.aclose()
546568

547569

548570
class OperationIndex:

src/aiopenapi3/v32/schemas.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
1+
import typing
12
from typing import Union, Any, Optional
23

34
from pydantic import Field, model_validator, ConfigDict
45

56
from ..base import ObjectExtended, SchemaBase, DiscriminatorBase
67
from .xml import XML
78

9+
if typing.TYPE_CHECKING:
10+
from .general import Reference
11+
812

913
class Discriminator(ObjectExtended, DiscriminatorBase):
1014
"""
@@ -144,7 +148,7 @@ class Schema(ObjectExtended, SchemaBase):
144148
"""
145149
contentEncoding: str | None = Field(default=None)
146150
contentMediaType: str | None = Field(default=None)
147-
contentSchema: str | None = Field(default=None)
151+
contentSchema: Union["Schema", "Reference"] = Field(default=None)
148152

149153
"""
150154
9. A Vocabulary for Basic Meta-Data Annotations

tests/fixtures/schema-itemSchema.yaml

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,18 @@ servers:
1010

1111
components:
1212
schemas:
13+
ServerSentEvent:
14+
type: object
15+
properties:
16+
data:
17+
type: string
18+
event:
19+
type: string
20+
id:
21+
type: string
22+
retry:
23+
minimum: 0
24+
type: integer
1325
LogEntry:
1426
type: object
1527
properties:
@@ -131,4 +143,4 @@ paths:
131143
content:
132144
text/event-stream:
133145
itemSchema:
134-
$ref: "#/components/schemas/LogEntry"
146+
$ref: "#/components/schemas/ServerSentEvent"

tests/sequential_test.py

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
import asyncio
2+
3+
from collections.abc import AsyncIterable
4+
5+
6+
from hypercorn.asyncio import serve
7+
from hypercorn.config import Config
8+
import pydantic
9+
from fastapi import FastAPI
10+
from fastapi.sse import EventSourceResponse, ServerSentEvent
11+
12+
import pytest
13+
import pytest_asyncio
14+
15+
16+
import aiopenapi3
17+
18+
app = FastAPI(
19+
version="1.0.0",
20+
title="Sequential Streaming tests",
21+
servers=[{"url": "/", "description": "Default, relative server"}],
22+
)
23+
24+
25+
def openapi32():
26+
"""
27+
https://github.com/fastapi/fastapi/discussions/15328#discussioncomment-16543627
28+
"""
29+
if app.openapi_schema:
30+
return app.openapi_schema
31+
from fastapi.openapi.utils import get_openapi
32+
33+
openapi_schema = get_openapi(
34+
openapi_version="3.2.0", title=app.title, version=app.version, routes=app.routes, servers=app.servers
35+
)
36+
app.openapi_schema = openapi_schema
37+
return app.openapi_schema
38+
39+
40+
app.openapi = openapi32
41+
42+
43+
@pytest.fixture(scope="session")
44+
def config(unused_tcp_port_factory):
45+
c = Config()
46+
c.bind = [f"localhost:{unused_tcp_port_factory()}"]
47+
return c
48+
49+
50+
@pytest_asyncio.fixture(loop_scope="session")
51+
async def server(config):
52+
event_loop = asyncio.get_event_loop()
53+
try:
54+
sd = asyncio.Event()
55+
task = event_loop.create_task(serve(app, config, shutdown_trigger=sd.wait))
56+
yield config
57+
finally:
58+
sd.set()
59+
await task
60+
61+
62+
@pytest_asyncio.fixture(loop_scope="session")
63+
async def client(server):
64+
api = await aiopenapi3.OpenAPI.load_async(f"http://{server.bind[0]}/openapi.json")
65+
return api
66+
67+
68+
class Item(pydantic.BaseModel):
69+
name: str
70+
description: str | None
71+
72+
73+
items = [
74+
Item(name="Plumbus", description="A multi-purpose household device."),
75+
Item(name="Portal Gun", description="A portal opening device."),
76+
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."),
77+
]
78+
79+
80+
@app.get("/jsonl", operation_id="jsonl")
81+
async def jsonl() -> AsyncIterable[Item]:
82+
"""
83+
https://fastapi.tiangolo.com/tutorial/stream-json-lines/#use-cases
84+
"""
85+
for item in items:
86+
yield item
87+
88+
89+
@app.get("/sse", operation_id="sse", response_class=EventSourceResponse)
90+
async def sse() -> AsyncIterable[ServerSentEvent]:
91+
for idx, item in enumerate(items):
92+
yield ServerSentEvent(comment=str(idx), data=item)
93+
94+
95+
@pytest.mark.asyncio(loop_scope="session")
96+
async def test_jsonl(server, client):
97+
req = client.createRequest("jsonl")
98+
async with req.sequence() as sequence:
99+
async for obj in sequence:
100+
print(obj)
101+
102+
103+
@pytest.mark.asyncio(loop_scope="session")
104+
async def test_sse(server, client):
105+
from aiopenapi3.request import AsyncRequestBase
106+
107+
req: AsyncRequestBase
108+
req = client.createRequest("sse")
109+
async with req.sequence() as sequence:
110+
async for obj in sequence:
111+
print(obj)
112+
await asyncio.sleep(1.1)

tests/v32_test.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,11 @@ async def test_MediaType(httpx_mock, with_schema_itemSchema):
3232
import pydantic
3333

3434
api = OpenAPI("https://example.org/api/", with_schema_itemSchema, session_factory=httpx.AsyncClient)
35-
LogEntry: pydantic.BaseModel = api.components.schemas["LogEntry"].get_type()
35+
ServerSentEvent: pydantic.BaseModel = api.components.schemas["ServerSentEvent"].get_type()
3636

3737
records = with_schema_itemSchema["components"]["examples"]["LogJSONPerLine"]["value"].strip("\n").split("\n")
38-
t = pydantic.TypeAdapter(list[LogEntry])
39-
ct = t.dump_json(t.validate_python([LogEntry.model_validate_json(i) for i in records * 16]))
38+
t = pydantic.TypeAdapter(list[ServerSentEvent])
39+
ct = "\n\n".join(f"data: {i}" for i in records * 16)
4040

4141
httpx_mock.add_response(
4242
url="https://example.org/api/json_seq",
@@ -94,8 +94,6 @@ def test_MediaType_itemSchema_sync(httpx_mock, with_schema_itemSchema):
9494
LogEntry: pydantic.BaseModel = api.components.schemas["LogEntry"].get_type()
9595

9696
records = with_schema_itemSchema["components"]["examples"]["LogJSONPerLine"]["value"].strip("\n").split("\n")
97-
t = pydantic.TypeAdapter(list[LogEntry])
98-
ct = t.dump_json(t.validate_python([LogEntry.model_validate_json(i) for i in records * 16]))
9997

10098
httpx_mock.add_response(
10199
url="https://example.org/api/json_seq",

0 commit comments

Comments
 (0)