Skip to content

Commit 785af00

Browse files
committed
fix: streaming stuff
1 parent 0535b1c commit 785af00

File tree

10 files changed

+394
-57
lines changed

10 files changed

+394
-57
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
### Breaking Changes
88

9-
- The `_return_http_data_only`, `_request_auth`, and `_preload_content` kwargs have been removed from all `OpenFgaApi` and `SyncOpenFgaApi` endpoint methods. These were internal implementation details not intended for external use. `_return_http_data_only` is now hardcoded to `True` internally, meaning all endpoint methods always return the deserialized response object directly. Users relying on `_with_http_info` methods returning a `(data, status, headers)` tuple should use `execute_api_request` instead.
9+
- The `_return_http_data_only`, `_preload_content`, `_request_auth`, `async_req`, and `_request_timeout` kwargs have been removed from all `OpenFgaApi` and `SyncOpenFgaApi` endpoint methods. These were internal implementation details not intended for external use. `_return_http_data_only` is now hardcoded to `True`; all endpoint methods return the deserialized response object directly. Users relying on `_with_http_info` methods returning a `(data, status, headers)` tuple should use `execute_api_request` instead.
1010

1111
### [0.9.9](https://github.com/openfga/python-sdk/compare/v0.9.8...v0.9.9) (2025-12-09)
1212
- feat: improve error messaging (#245)

README.md

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1265,7 +1265,7 @@ response = await fga_client.write_assertions(body, options)
12651265

12661266
In certain cases you may want to call other APIs not yet wrapped by the SDK. You can do so by using the `execute_api_request` method available on the `OpenFgaClient`. It allows you to make raw HTTP calls to any OpenFGA endpoint by specifying the HTTP method, path, body, query parameters, and path parameters, while still honoring the client configuration (authentication, telemetry, retries, and error handling).
12671267

1268-
For streaming endpoints, use `execute_streamed_api_request` instead.
1268+
For streaming endpoints (e.g. `streamed-list-objects`), use `execute_streamed_api_request` instead. It returns an `AsyncIterator` (or `Iterator` in the sync client) that yields one parsed JSON object per chunk.
12691269

12701270
This is useful when:
12711271
- You want to call a new endpoint that is not yet supported by the SDK
@@ -1315,6 +1315,27 @@ stores = stores_response.json()
13151315
print("Stores:", stores)
13161316
```
13171317

1318+
#### Example: Calling a Streaming Endpoint
1319+
1320+
```python
1321+
# Stream objects visible to a user
1322+
async for chunk in fga_client.execute_streamed_api_request(
1323+
operation_name="StreamedListObjects",
1324+
method="POST",
1325+
path="/stores/{store_id}/streamed-list-objects",
1326+
path_params={"store_id": FGA_STORE_ID},
1327+
body={
1328+
"type": "document",
1329+
"relation": "viewer",
1330+
"user": "user:anne",
1331+
"authorization_model_id": FGA_MODEL_ID,
1332+
},
1333+
):
1334+
# Each chunk has the shape {"result": {"object": "..."}} or {"error": {...}}
1335+
if "result" in chunk:
1336+
print(chunk["result"]["object"]) # e.g. "document:roadmap"
1337+
```
1338+
13181339
#### Example: Using Path Parameters
13191340

13201341
Path parameters are specified in the path using `{param_name}` syntax and must all be provided explicitly via `path_params` (URL-encoded automatically):

example/execute-api-request/execute_api_request_example.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,27 @@ async def main():
242242
assert raw.status == 200
243243
print(f" ✅ custom headers accepted (status {raw.status})")
244244

245+
print("9. StreamedListObjects (POST /stores/{store_id}/streamed-list-objects)")
246+
chunks = []
247+
async for chunk in fga_client.execute_streamed_api_request(
248+
operation_name="StreamedListObjects",
249+
method="POST",
250+
path="/stores/{store_id}/streamed-list-objects",
251+
path_params={"store_id": store.id},
252+
body={
253+
"type": "document",
254+
"relation": "viewer",
255+
"user": "user:anne",
256+
"authorization_model_id": auth_model_id,
257+
},
258+
):
259+
chunks.append(chunk)
260+
assert len(chunks) >= 1, f"Expected at least 1 chunk, got {len(chunks)}"
261+
# Each chunk has the shape {"result": {"object": "..."}} or {"error": {...}}
262+
objects = [c["result"]["object"] for c in chunks if "result" in c]
263+
assert "document:roadmap" in objects, f"Expected document:roadmap in {objects}"
264+
print(f" ✅ {len(chunks)} chunks, objects={objects}")
265+
245266
print("\n=== Cleanup ===")
246267
await fga_client.delete_store()
247268
print(f"Deleted test store: {store.id}")

openfga_sdk/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from openfga_sdk.api_client import ApiClient
33
from openfga_sdk.client.client import OpenFgaClient
44
from openfga_sdk.client.configuration import ClientConfiguration
5+
from openfga_sdk.client.models.raw_response import RawResponse
56
from openfga_sdk.configuration import Configuration
67
from openfga_sdk.constants import SDK_VERSION
78
from openfga_sdk.exceptions import (
@@ -136,6 +137,7 @@
136137
__all__ = [
137138
"OpenFgaClient",
138139
"ClientConfiguration",
140+
"RawResponse",
139141
"OpenFgaApi",
140142
"ApiClient",
141143
"Configuration",

openfga_sdk/api/open_fga_api.py

Lines changed: 63 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
from __future__ import annotations
1414

15+
from collections.abc import AsyncIterator
1516
from typing import TYPE_CHECKING, Any
1617

1718
from openfga_sdk.api_client import ApiClient
@@ -148,7 +149,7 @@ async def execute_api_request(
148149
body: dict[str, Any] | list[Any] | str | bytes | None = None,
149150
query_params: dict[str, str | int | list[str | int]] | None = None,
150151
headers: dict[str, str] | None = None,
151-
options: dict[str, int | str | dict[str, int | str]] | None = None,
152+
options: dict[str, Any] | None = None,
152153
) -> RawResponse:
153154
"""
154155
Execute an arbitrary HTTP request to any OpenFGA API endpoint.
@@ -164,7 +165,7 @@ async def execute_api_request(
164165
:param body: Request body for POST/PUT/PATCH
165166
:param query_params: Query string parameters
166167
:param headers: Custom headers (SDK enforces Content-Type and Accept)
167-
:param options: Extra options (headers, retry_params)
168+
:param options: Extra options e.g. {"retry_params": RetryParams(max_retry=3)}
168169
:return: RawResponse with status, headers, and body
169170
"""
170171
return await self._execute_api_request_internal(
@@ -176,7 +177,6 @@ async def execute_api_request(
176177
query_params=query_params,
177178
headers=headers,
178179
options=options,
179-
streaming=False,
180180
)
181181

182182
async def execute_streamed_api_request(
@@ -189,25 +189,73 @@ async def execute_streamed_api_request(
189189
body: dict[str, Any] | list[Any] | str | bytes | None = None,
190190
query_params: dict[str, str | int | list[str | int]] | None = None,
191191
headers: dict[str, str] | None = None,
192-
options: dict[str, int | str | dict[str, int | str]] | None = None,
193-
) -> RawResponse:
192+
options: dict[str, Any] | None = None,
193+
) -> AsyncIterator[dict[str, Any]]:
194194
"""
195195
Execute an arbitrary HTTP request to a streaming OpenFGA API endpoint.
196196
197-
Same interface as execute_api_request but for streaming endpoints.
198-
See execute_api_request for full parameter documentation.
197+
Yields parsed JSON objects as they arrive. Use with async for:
198+
199+
async for chunk in api.execute_streamed_api_request(...):
200+
process(chunk)
201+
202+
:param operation_name: Operation name for telemetry (e.g., "StreamedListObjects")
203+
:param method: HTTP method (GET, POST, PUT, DELETE, PATCH)
204+
:param path: API path, e.g. "/stores/{store_id}/streamed-list-objects".
205+
:param path_params: Path parameter substitutions (URL-encoded automatically).
206+
All path parameters, including store_id, must be provided explicitly.
207+
:param body: Request body for POST/PUT/PATCH
208+
:param query_params: Query string parameters
209+
:param headers: Custom headers (SDK enforces Content-Type and Accept)
210+
:param options: Extra options e.g. {"retry_params": RetryParams(max_retry=3)}
199211
"""
200-
return await self._execute_api_request_internal(
212+
from openfga_sdk.client.execute_api_request_builder import (
213+
ExecuteApiRequestBuilder,
214+
)
215+
216+
builder = ExecuteApiRequestBuilder(
201217
operation_name=operation_name,
202218
method=method,
203219
path=path,
204220
path_params=path_params,
205221
body=body,
206222
query_params=query_params,
207223
headers=headers,
208-
options=options,
209-
streaming=True,
210224
)
225+
builder.validate()
226+
227+
resource_path = builder.build_path()
228+
query_params_list = builder.build_query_params_list()
229+
final_headers = builder.build_headers()
230+
231+
retry_params = options.get("retry_params") if options else None
232+
233+
telemetry_attributes: dict[TelemetryAttribute, str | bool | int | float] = {
234+
TelemetryAttributes.fga_client_request_method: operation_name.lower(),
235+
}
236+
if self.api_client.get_store_id():
237+
telemetry_attributes[TelemetryAttributes.fga_client_request_store_id] = (
238+
self.api_client.get_store_id()
239+
)
240+
241+
stream = await self.api_client.call_api(
242+
resource_path=resource_path,
243+
method=method.upper(),
244+
query_params=query_params_list if query_params_list else None,
245+
header_params=final_headers,
246+
body=body,
247+
response_types_map={},
248+
auth_settings=[],
249+
_return_http_data_only=True,
250+
_preload_content=True,
251+
_retry_params=retry_params,
252+
_oauth2_client=self._oauth2_client,
253+
_telemetry_attributes=telemetry_attributes,
254+
_streaming=True,
255+
)
256+
257+
async for chunk in stream:
258+
yield chunk
211259

212260
async def _execute_api_request_internal(
213261
self,
@@ -219,10 +267,9 @@ async def _execute_api_request_internal(
219267
body: dict[str, Any] | list[Any] | str | bytes | None = None,
220268
query_params: dict[str, str | int | list[str | int]] | None = None,
221269
headers: dict[str, str] | None = None,
222-
options: dict[str, int | str | dict[str, int | str]] | None = None,
223-
streaming: bool = False,
270+
options: dict[str, Any] | None = None,
224271
) -> RawResponse:
225-
"""Shared implementation for execute_api_request and execute_streamed_api_request."""
272+
"""Implementation for execute_api_request."""
226273
from openfga_sdk.client.execute_api_request_builder import (
227274
ExecuteApiRequestBuilder,
228275
ResponseParser,
@@ -242,15 +289,9 @@ async def _execute_api_request_internal(
242289

243290
resource_path = builder.build_path()
244291
query_params_list = builder.build_query_params_list()
292+
final_headers = builder.build_headers()
245293

246-
options_headers = None
247-
if options and isinstance(options.get("headers"), dict):
248-
options_headers = options["headers"]
249-
final_headers = builder.build_headers(options_headers)
250-
251-
retry_params = None
252-
if options and options.get("retry_params"):
253-
retry_params = options["retry_params"]
294+
retry_params = options.get("retry_params") if options else None
254295

255296
telemetry_attributes: dict[TelemetryAttribute, str | bool | int | float] = {
256297
TelemetryAttributes.fga_client_request_method: operation_name.lower(),
@@ -273,7 +314,7 @@ async def _execute_api_request_internal(
273314
_retry_params=retry_params,
274315
_oauth2_client=self._oauth2_client,
275316
_telemetry_attributes=telemetry_attributes,
276-
_streaming=streaming,
317+
_streaming=False,
277318
)
278319

279320
rest_response = getattr(self.api_client, "last_response", None)

openfga_sdk/client/client.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import asyncio
22
import uuid
33

4+
from collections.abc import AsyncIterator
45
from typing import Any
56

67
from openfga_sdk.api.open_fga_api import OpenFgaApi
@@ -1113,7 +1114,7 @@ async def execute_api_request(
11131114
body: dict[str, Any] | list[Any] | str | bytes | None = None,
11141115
query_params: dict[str, str | int | list[str | int]] | None = None,
11151116
headers: dict[str, str] | None = None,
1116-
options: dict[str, int | str | dict[str, int | str]] | None = None,
1117+
options: dict[str, Any] | None = None,
11171118
) -> RawResponse:
11181119
"""
11191120
Execute an arbitrary HTTP request to any OpenFGA API endpoint.
@@ -1129,7 +1130,7 @@ async def execute_api_request(
11291130
:param body: Request body for POST/PUT/PATCH
11301131
:param query_params: Query string parameters
11311132
:param headers: Custom headers (SDK enforces Content-Type and Accept)
1132-
:param options: Extra options (headers, retry_params)
1133+
:param options: Extra options e.g. {"retry_params": RetryParams(max_retry=3)}
11331134
:return: RawResponse with status, headers, and body
11341135
"""
11351136
return await self._api.execute_api_request(
@@ -1153,15 +1154,17 @@ async def execute_streamed_api_request(
11531154
body: dict[str, Any] | list[Any] | str | bytes | None = None,
11541155
query_params: dict[str, str | int | list[str | int]] | None = None,
11551156
headers: dict[str, str] | None = None,
1156-
options: dict[str, int | str | dict[str, int | str]] | None = None,
1157-
) -> RawResponse:
1157+
options: dict[str, Any] | None = None,
1158+
) -> AsyncIterator[dict[str, Any]]:
11581159
"""
11591160
Execute an arbitrary HTTP request to a streaming OpenFGA API endpoint.
11601161
11611162
Same interface as execute_api_request but for streaming endpoints.
11621163
See execute_api_request for full parameter documentation.
1164+
1165+
:return: AsyncIterator yielding parsed JSON chunks from the streaming response.
11631166
"""
1164-
return await self._api.execute_streamed_api_request(
1167+
async for chunk in self._api.execute_streamed_api_request(
11651168
operation_name=operation_name,
11661169
method=method,
11671170
path=path,
@@ -1170,4 +1173,5 @@ async def execute_streamed_api_request(
11701173
query_params=query_params,
11711174
headers=headers,
11721175
options=options,
1173-
)
1176+
):
1177+
yield chunk

openfga_sdk/sync/client/client.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import uuid
22

3+
from collections.abc import Iterator
34
from concurrent.futures import ThreadPoolExecutor
45
from typing import Any
56

@@ -1111,7 +1112,7 @@ def execute_api_request(
11111112
body: dict[str, Any] | list[Any] | str | bytes | None = None,
11121113
query_params: dict[str, str | int | list[str | int]] | None = None,
11131114
headers: dict[str, str] | None = None,
1114-
options: dict[str, int | str | dict[str, int | str]] | None = None,
1115+
options: dict[str, Any] | None = None,
11151116
) -> RawResponse:
11161117
"""
11171118
Execute an arbitrary HTTP request to any OpenFGA API endpoint.
@@ -1127,7 +1128,7 @@ def execute_api_request(
11271128
:param body: Request body for POST/PUT/PATCH
11281129
:param query_params: Query string parameters
11291130
:param headers: Custom headers (SDK enforces Content-Type and Accept)
1130-
:param options: Extra options (headers, retry_params)
1131+
:param options: Extra options e.g. {"retry_params": RetryParams(max_retry=3)}
11311132
:return: RawResponse with status, headers, and body
11321133
"""
11331134
return self._api.execute_api_request(
@@ -1151,15 +1152,17 @@ def execute_streamed_api_request(
11511152
body: dict[str, Any] | list[Any] | str | bytes | None = None,
11521153
query_params: dict[str, str | int | list[str | int]] | None = None,
11531154
headers: dict[str, str] | None = None,
1154-
options: dict[str, int | str | dict[str, int | str]] | None = None,
1155-
) -> RawResponse:
1155+
options: dict[str, Any] | None = None,
1156+
) -> Iterator[dict[str, Any]]:
11561157
"""
11571158
Execute an arbitrary HTTP request to a streaming OpenFGA API endpoint.
11581159
11591160
Same interface as execute_api_request but for streaming endpoints.
11601161
See execute_api_request for full parameter documentation.
1162+
1163+
:return: Iterator yielding parsed JSON chunks from the streaming response.
11611164
"""
1162-
return self._api.execute_streamed_api_request(
1165+
yield from self._api.execute_streamed_api_request(
11631166
operation_name=operation_name,
11641167
method=method,
11651168
path=path,

0 commit comments

Comments
 (0)