Skip to content

Commit 6389a2a

Browse files
committed
feat: address and accomodate ffeedback:
1 parent 86459f0 commit 6389a2a

5 files changed

Lines changed: 116 additions & 46 deletions

File tree

examples/streamed-list-objects/src/main/java/dev/openfga/sdk/example/StreamedListObjectsExample.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,9 @@ public void run() throws Exception {
7373
.user("user:anne");
7474

7575
fgaClient
76-
.streamedListObjects(request, object -> {
77-
System.out.println(" " + object);
78-
results.add(object);
76+
.streamedListObjects(request, response -> {
77+
System.out.println(" " + response.getObject());
78+
results.add(response.getObject());
7979
})
8080
.thenRun(() -> {
8181
System.out.println("Streaming complete!");

src/main/java/dev/openfga/sdk/api/StreamedListObjectsApi.java

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,13 @@ public StreamedListObjectsApi(Configuration configuration, ApiClient apiClient)
4343
*
4444
* @param storeId The store ID
4545
* @param body The list objects request
46-
* @param consumer Callback to handle each streamed object response (invoked asynchronously)
46+
* @param consumer Callback to handle each StreamedListObjectsResponse (invoked asynchronously)
4747
* @return CompletableFuture<Void> that completes when streaming finishes
4848
* @throws ApiException if the API call fails immediately
4949
* @throws FgaInvalidParameterException if required parameters are missing
5050
*/
5151
public CompletableFuture<Void> streamedListObjects(
52-
String storeId, ListObjectsRequest body, Consumer<String> consumer)
52+
String storeId, ListObjectsRequest body, Consumer<StreamedListObjectsResponse> consumer)
5353
throws ApiException, FgaInvalidParameterException {
5454
return streamedListObjects(storeId, body, consumer, null, this.configuration);
5555
}
@@ -61,7 +61,7 @@ public CompletableFuture<Void> streamedListObjects(
6161
*
6262
* @param storeId The store ID
6363
* @param body The list objects request
64-
* @param consumer Callback to handle each streamed object response (invoked asynchronously)
64+
* @param consumer Callback to handle each StreamedListObjectsResponse (invoked asynchronously)
6565
* @param configurationOverride Configuration overrides (e.g., additional headers)
6666
* @return CompletableFuture<Void> that completes when streaming finishes
6767
* @throws ApiException if the API call fails immediately
@@ -70,7 +70,7 @@ public CompletableFuture<Void> streamedListObjects(
7070
public CompletableFuture<Void> streamedListObjects(
7171
String storeId,
7272
ListObjectsRequest body,
73-
Consumer<String> consumer,
73+
Consumer<StreamedListObjectsResponse> consumer,
7474
ConfigurationOverride configurationOverride)
7575
throws ApiException, FgaInvalidParameterException {
7676
return streamedListObjects(storeId, body, consumer, null, this.configuration.override(configurationOverride));
@@ -83,14 +83,17 @@ public CompletableFuture<Void> streamedListObjects(
8383
*
8484
* @param storeId The store ID
8585
* @param body The list objects request
86-
* @param consumer Callback to handle each streamed object response (invoked asynchronously)
86+
* @param consumer Callback to handle each StreamedListObjectsResponse (invoked asynchronously)
8787
* @param errorConsumer Optional callback to handle errors during streaming
8888
* @return CompletableFuture<Void> that completes when streaming finishes or exceptionally on error
8989
* @throws ApiException if the API call fails immediately
9090
* @throws FgaInvalidParameterException if required parameters are missing
9191
*/
9292
public CompletableFuture<Void> streamedListObjects(
93-
String storeId, ListObjectsRequest body, Consumer<String> consumer, Consumer<Throwable> errorConsumer)
93+
String storeId,
94+
ListObjectsRequest body,
95+
Consumer<StreamedListObjectsResponse> consumer,
96+
Consumer<Throwable> errorConsumer)
9497
throws ApiException, FgaInvalidParameterException {
9598
return streamedListObjects(storeId, body, consumer, errorConsumer, this.configuration);
9699
}
@@ -102,7 +105,7 @@ public CompletableFuture<Void> streamedListObjects(
102105
*
103106
* @param storeId The store ID
104107
* @param body The list objects request
105-
* @param consumer Callback to handle each streamed object response (invoked asynchronously)
108+
* @param consumer Callback to handle each StreamedListObjectsResponse (invoked asynchronously)
106109
* @param errorConsumer Optional callback to handle errors during streaming
107110
* @param configurationOverride Configuration overrides (e.g., additional headers)
108111
* @return CompletableFuture<Void> that completes when streaming finishes or exceptionally on error
@@ -112,7 +115,7 @@ public CompletableFuture<Void> streamedListObjects(
112115
public CompletableFuture<Void> streamedListObjects(
113116
String storeId,
114117
ListObjectsRequest body,
115-
Consumer<String> consumer,
118+
Consumer<StreamedListObjectsResponse> consumer,
116119
Consumer<Throwable> errorConsumer,
117120
ConfigurationOverride configurationOverride)
118121
throws ApiException, FgaInvalidParameterException {
@@ -126,7 +129,7 @@ public CompletableFuture<Void> streamedListObjects(
126129
private CompletableFuture<Void> streamedListObjects(
127130
String storeId,
128131
ListObjectsRequest body,
129-
Consumer<String> consumer,
132+
Consumer<StreamedListObjectsResponse> consumer,
130133
Consumer<Throwable> errorConsumer,
131134
Configuration configuration)
132135
throws ApiException, FgaInvalidParameterException {
@@ -198,7 +201,8 @@ private CompletableFuture<Void> streamedListObjects(
198201
/**
199202
* Process a single line from the NDJSON stream
200203
*/
201-
private void processLine(String line, Consumer<String> consumer, Consumer<Throwable> errorConsumer) {
204+
private void processLine(
205+
String line, Consumer<StreamedListObjectsResponse> consumer, Consumer<Throwable> errorConsumer) {
202206
try {
203207
// Parse the JSON line to extract the object
204208
StreamResultOfStreamedListObjectsResponse streamResult =
@@ -214,10 +218,10 @@ private void processLine(String line, Consumer<String> consumer, Consumer<Throwa
214218
errorConsumer.accept(new ApiException(errorMessage));
215219
}
216220
} else if (streamResult.getResult() != null) {
217-
// Deliver the object to the consumer
221+
// Deliver the response object to the consumer
218222
StreamedListObjectsResponse result = streamResult.getResult();
219-
if (result.getObject() != null) {
220-
consumer.accept(result.getObject());
223+
if (result != null) {
224+
consumer.accept(result);
221225
}
222226
}
223227
} catch (Exception e) {

src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java

Lines changed: 72 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1110,13 +1110,33 @@ public CompletableFuture<ClientListObjectsResponse> listObjects(
11101110
* Objects are delivered to the consumer as they are received from the server asynchronously.
11111111
* Returns a CompletableFuture that completes when streaming is finished.
11121112
*
1113+
* <p>Example usage:</p>
1114+
* <pre>{@code
1115+
* ClientListObjectsRequest request = new ClientListObjectsRequest()
1116+
* .user("user:anne")
1117+
* .relation("viewer")
1118+
* .type("document");
1119+
*
1120+
* client.streamedListObjects(request,
1121+
* response -> System.out.println("Found object: " + response.getObject())
1122+
* ).thenRun(() -> System.out.println("Streaming complete"))
1123+
* .exceptionally(error -> {
1124+
* System.err.println("Error: " + error.getMessage());
1125+
* return null;
1126+
* });
1127+
* }</pre>
1128+
*
11131129
* @param request The list objects request containing type, relation, and user
1114-
* @param consumer Callback to handle each streamed object as it arrives (invoked asynchronously)
1130+
* @param consumer Callback to handle each StreamedListObjectsResponse as it arrives
11151131
* @return CompletableFuture<Void> that completes when streaming finishes
1116-
* @throws FgaInvalidParameterException When the Store ID is null, empty, or whitespace
1132+
* @throws FgaInvalidParameterException When the Store ID is null, empty, or whitespace, or consumer is null
11171133
*/
1118-
public CompletableFuture<Void> streamedListObjects(ClientListObjectsRequest request, Consumer<String> consumer)
1134+
public CompletableFuture<Void> streamedListObjects(
1135+
ClientListObjectsRequest request, Consumer<StreamedListObjectsResponse> consumer)
11191136
throws FgaInvalidParameterException {
1137+
if (consumer == null) {
1138+
throw new FgaInvalidParameterException("consumer", "streamedListObjects");
1139+
}
11201140
return streamedListObjects(request, null, consumer, null);
11211141
}
11221142

@@ -1126,15 +1146,39 @@ public CompletableFuture<Void> streamedListObjects(ClientListObjectsRequest requ
11261146
* Objects are delivered to the consumer as they are received from the server asynchronously.
11271147
* Returns a CompletableFuture that completes when streaming is finished.
11281148
*
1149+
* <p>Example usage with options:</p>
1150+
* <pre>{@code
1151+
* ClientListObjectsRequest request = new ClientListObjectsRequest()
1152+
* .user("user:anne")
1153+
* .relation("viewer")
1154+
* .type("document");
1155+
*
1156+
* ClientStreamedListObjectsOptions options = new ClientStreamedListObjectsOptions()
1157+
* .authorizationModelId("01HVMMBCMGZNT3SED4Z17ECXCA");
1158+
*
1159+
* client.streamedListObjects(request, options,
1160+
* response -> System.out.println("Found object: " + response.getObject())
1161+
* ).thenRun(() -> System.out.println("Streaming complete"))
1162+
* .exceptionally(error -> {
1163+
* System.err.println("Error: " + error.getMessage());
1164+
* return null;
1165+
* });
1166+
* }</pre>
1167+
*
11291168
* @param request The list objects request containing type, relation, and user
11301169
* @param options Options for the streaming request
1131-
* @param consumer Callback to handle each streamed object as it arrives (invoked asynchronously)
1170+
* @param consumer Callback to handle each StreamedListObjectsResponse as it arrives
11321171
* @return CompletableFuture<Void> that completes when streaming finishes
1133-
* @throws FgaInvalidParameterException When the Store ID is null, empty, or whitespace
1172+
* @throws FgaInvalidParameterException When the Store ID is null, empty, or whitespace, or consumer is null
11341173
*/
11351174
public CompletableFuture<Void> streamedListObjects(
1136-
ClientListObjectsRequest request, ClientStreamedListObjectsOptions options, Consumer<String> consumer)
1175+
ClientListObjectsRequest request,
1176+
ClientStreamedListObjectsOptions options,
1177+
Consumer<StreamedListObjectsResponse> consumer)
11371178
throws FgaInvalidParameterException {
1179+
if (consumer == null) {
1180+
throw new FgaInvalidParameterException("consumer", "streamedListObjects");
1181+
}
11381182
return streamedListObjects(request, options, consumer, null);
11391183
}
11401184

@@ -1144,19 +1188,38 @@ public CompletableFuture<Void> streamedListObjects(
11441188
* Objects are delivered to the consumer as they are received from the server asynchronously.
11451189
* Returns a CompletableFuture that completes when streaming is finished.
11461190
*
1191+
* <p>Example usage with error handling:</p>
1192+
* <pre>{@code
1193+
* ClientListObjectsRequest request = new ClientListObjectsRequest()
1194+
* .user("user:anne")
1195+
* .relation("viewer")
1196+
* .type("document");
1197+
*
1198+
* ClientStreamedListObjectsOptions options = new ClientStreamedListObjectsOptions()
1199+
* .authorizationModelId("01HVMMBCMGZNT3SED4Z17ECXCA");
1200+
*
1201+
* client.streamedListObjects(request, options,
1202+
* response -> System.out.println("Found object: " + response.getObject()),
1203+
* error -> System.err.println("Streaming error: " + error.getMessage())
1204+
* ).thenRun(() -> System.out.println("Streaming complete"));
1205+
* }</pre>
1206+
*
11471207
* @param request The list objects request containing type, relation, and user
11481208
* @param options Options for the streaming request
1149-
* @param consumer Callback to handle each streamed object as it arrives (invoked asynchronously)
1209+
* @param consumer Callback to handle each StreamedListObjectsResponse as it arrives
11501210
* @param errorConsumer Optional callback to handle errors during streaming
11511211
* @return CompletableFuture<Void> that completes when streaming finishes or exceptionally on error
1152-
* @throws FgaInvalidParameterException When the Store ID is null, empty, or whitespace
1212+
* @throws FgaInvalidParameterException When the Store ID is null, empty, or whitespace, or consumer is null
11531213
*/
11541214
public CompletableFuture<Void> streamedListObjects(
11551215
ClientListObjectsRequest request,
11561216
ClientStreamedListObjectsOptions options,
1157-
Consumer<String> consumer,
1217+
Consumer<StreamedListObjectsResponse> consumer,
11581218
Consumer<Throwable> errorConsumer)
11591219
throws FgaInvalidParameterException {
1220+
if (consumer == null) {
1221+
throw new FgaInvalidParameterException("consumer", "streamedListObjects");
1222+
}
11601223
configuration.assertValid();
11611224
String storeId = configuration.getStoreIdChecked();
11621225

src/test-integration/java/dev/openfga/sdk/api/client/OpenFgaClientIntegrationTest.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,8 @@ public void streamedListObjects() throws Exception {
419419
.relation("reader")
420420
.user("user:test");
421421

422-
CompletableFuture<Void> streamingFuture1 = fga.streamedListObjects(request1, streamedObjects::add);
422+
CompletableFuture<Void> streamingFuture1 =
423+
fga.streamedListObjects(request1, response -> streamedObjects.add(response.getObject()));
423424
streamingFuture1.get(); // Wait for completion
424425

425426
assertEquals(50, streamedObjects.size());
@@ -435,8 +436,8 @@ public void streamedListObjects() throws Exception {
435436
.relation("reader")
436437
.user("user:error-test");
437438

438-
CompletableFuture<Void> streamingFuture2 =
439-
fga.streamedListObjects(request2, null, errorTestObjects::add, errors::add);
439+
CompletableFuture<Void> streamingFuture2 = fga.streamedListObjects(
440+
request2, null, response -> errorTestObjects.add(response.getObject()), errors::add);
440441
streamingFuture2.get();
441442

442443
assertEquals(10, errorTestObjects.size());
@@ -452,7 +453,8 @@ public void streamedListObjects() throws Exception {
452453
java.util.concurrent.atomic.AtomicBoolean chainedOperationExecuted =
453454
new java.util.concurrent.atomic.AtomicBoolean(false);
454455

455-
CompletableFuture<Void> chainedFuture = fga.streamedListObjects(request3, chainTestObjects::add)
456+
CompletableFuture<Void> chainedFuture = fga.streamedListObjects(
457+
request3, response -> chainTestObjects.add(response.getObject()))
456458
.thenRun(() -> {
457459
chainedOperationExecuted.set(true);
458460
});

0 commit comments

Comments
 (0)