Skip to content

Commit c13c9b8

Browse files
committed
feat: add resource cleanup and try-catch
1 parent fde2193 commit c13c9b8

6 files changed

Lines changed: 74 additions & 34 deletions

File tree

examples/streamed-list-objects/README.md

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -57,50 +57,52 @@ export FGA_API_AUDIENCE=your_audience
5757
```java
5858
// Create a request
5959
var request = new ClientListObjectsRequest()
60-
.type("document")
61-
.relation("owner")
62-
.user("user:anne");
63-
64-
// Call the streaming API
65-
var objectStream = fgaClient.streamedListObjects(request).get();
60+
.type("document")
61+
.relation("owner")
62+
.user("user:anne");
6663

64+
// Call the streaming API and ensure proper resource cleanup
65+
try (var objectStream = fgaClient.streamedListObjects(request).get()) {
6766
// Collect all results
6867
List<String> objects = objectStream
69-
.map(StreamedListObjectsResponse::getObject)
70-
.collect(Collectors.toList());
68+
.map(StreamedListObjectsResponse::getObject)
69+
.collect(Collectors.toList());
70+
}
7171
```
7272

7373
### Early Termination
7474

7575
```java
76-
// Get only the first 10 results
77-
var objectStream = fgaClient.streamedListObjects(request).get();
78-
List<String> firstTen = objectStream
79-
.map(StreamedListObjectsResponse::getObject)
80-
.limit(10)
81-
.collect(Collectors.toList());
76+
// Get only the first 10 results, ensuring the stream is closed properly
77+
try (var objectStream = fgaClient.streamedListObjects(request).get()) {
78+
List<String> firstTen = objectStream
79+
.map(StreamedListObjectsResponse::getObject)
80+
.limit(10)
81+
.collect(Collectors.toList());
82+
}
8283
```
8384

8485
### Process as You Go
8586

8687
```java
8788
// Process each object immediately as it arrives
88-
var objectStream = fgaClient.streamedListObjects(request).get();
89-
objectStream
90-
.map(StreamedListObjectsResponse::getObject)
91-
.forEach(obj -> {
92-
// Do something with each object
93-
System.out.println("Processing: " + obj);
94-
});
89+
try (var objectStream = fgaClient.streamedListObjects(request).get()) {
90+
objectStream
91+
.map(StreamedListObjectsResponse::getObject)
92+
.forEach(obj -> {
93+
// Do something with each object
94+
System.out.println("Processing: " + obj);
95+
});
96+
}
9597
```
9698

9799
### With Options
98100

99101
```java
100102
// Use options to specify consistency preference
101103
var options = new ClientListObjectsOptions()
102-
.consistency(ConsistencyPreference.HIGHER_CONSISTENCY)
103-
.authorizationModelId("01GXSXXXXXXXXXXXXXXXX");
104+
.consistency(ConsistencyPreference.HIGHER_CONSISTENCY)
105+
.authorizationModelId("01GXSXXXXXXXXXXXXXXXX");
104106

105107
var objectStream = fgaClient.streamedListObjects(request, options).get();
106108
```

examples/streamed-list-objects/src/main/java/dev/openfga/sdk/example/streamedlistobjects/StreamedListObjectsExample.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,9 @@ private static int writeTuples(OpenFgaClient fgaClient, int quantity) throws Exc
138138
*/
139139
private static List<String> streamedListObjects(OpenFgaClient fgaClient, ClientListObjectsRequest request)
140140
throws Exception {
141-
var objectStream = fgaClient.streamedListObjects(request).get();
142-
return objectStream.map(StreamedListObjectsResponse::getObject).collect(Collectors.toList());
141+
try (var objectStream = fgaClient.streamedListObjects(request).get()) {
142+
return objectStream.map(StreamedListObjectsResponse::getObject).collect(Collectors.toList());
143+
}
143144
}
144145

145146
/**

src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1175,6 +1175,10 @@ public CompletableFuture<Stream<StreamedListObjectsResponse>> streamedListObject
11751175
Stream<StreamedListObjectsResponse> stream = java.util.stream.StreamSupport.stream(
11761176
((Iterable<StreamedListObjectsResponse>) () -> iterator).spliterator(), false);
11771177
return stream.onClose(() -> {
1178+
try {
1179+
iterator.close();
1180+
} catch (java.io.IOException ignore) {
1181+
}
11781182
try {
11791183
srb.close();
11801184
} catch (java.io.IOException ignore) {

src/main/java/dev/openfga/sdk/api/client/StreamedResponseIterator.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* If an error is encountered in the stream (either from parsing or from an error
1717
* response), it will be thrown as a StreamingException when hasNext() or next() is called.
1818
*/
19-
public class StreamedResponseIterator implements Iterator<StreamedListObjectsResponse> {
19+
public class StreamedResponseIterator implements Iterator<StreamedListObjectsResponse>, AutoCloseable {
2020
private final BufferedReader reader;
2121
private final ObjectMapper objectMapper;
2222
private StreamedListObjectsResponse nextItem;
@@ -92,4 +92,9 @@ public StreamedListObjectsResponse next() {
9292

9393
return current;
9494
}
95+
96+
@Override
97+
public void close() throws IOException {
98+
reader.close();
99+
}
95100
}

src/test-integration/java/dev/openfga/sdk/api/client/OpenFgaClientIntegrationTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,33 @@ public void write_and_listObjects() throws Exception {
328328
assertEquals(DEFAULT_DOC, response.getObjects().get(0));
329329
}
330330

331+
@Test
332+
public void write_and_streamedListObjects() throws Exception {
333+
// Given
334+
String storeName = thisTestName();
335+
String storeId = createStore(storeName);
336+
fga.setStoreId(storeId);
337+
String authModelId = writeAuthModel(storeId);
338+
fga.setAuthorizationModelId(authModelId);
339+
ClientWriteRequest writeRequest = new ClientWriteRequest().writes(List.of(DEFAULT_TUPLE_KEY));
340+
ClientListObjectsRequest listObjectsRequest = new ClientListObjectsRequest()
341+
.user(DEFAULT_USER)
342+
.relation("reader")
343+
.type("document");
344+
345+
// When
346+
fga.write(writeRequest).get();
347+
List<String> objects;
348+
try (var stream = fga.streamedListObjects(listObjectsRequest).get()) {
349+
objects = stream.map(StreamedListObjectsResponse::getObject).collect(java.util.stream.Collectors.toList());
350+
}
351+
352+
// Then
353+
assertNotNull(objects);
354+
assertEquals(1, objects.size());
355+
assertEquals(DEFAULT_DOC, objects.get(0));
356+
}
357+
331358
@Test
332359
public void write_readAssertions() throws Exception {
333360
// Given

src/test/java/dev/openfga/sdk/api/client/OpenFgaClientTest.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2610,10 +2610,11 @@ public void streamedListObjectsTest() throws Exception {
26102610
.user(DEFAULT_USER);
26112611

26122612
// When
2613-
Stream<StreamedListObjectsResponse> responseStream =
2614-
fga.streamedListObjects(request).get();
2615-
List<String> objects =
2616-
responseStream.map(StreamedListObjectsResponse::getObject).collect(Collectors.toList());
2613+
List<String> objects;
2614+
try (Stream<StreamedListObjectsResponse> responseStream =
2615+
fga.streamedListObjects(request).get()) {
2616+
objects = responseStream.map(StreamedListObjectsResponse::getObject).collect(Collectors.toList());
2617+
}
26172618

26182619
// Then
26192620
mockHttpClient.verify().post(postPath).withBody(is(expectedBody)).called(1);
@@ -2702,12 +2703,12 @@ public void streamedListObjects_errorInStream() throws Exception {
27022703
.user(DEFAULT_USER);
27032704

27042705
// When
2705-
Stream<StreamedListObjectsResponse> responseStream =
2706-
fga.streamedListObjects(request).get();
2707-
27082706
// Then - should throw when processing the stream
27092707
var exception = assertThrows(RuntimeException.class, () -> {
2710-
responseStream.map(StreamedListObjectsResponse::getObject).collect(Collectors.toList());
2708+
try (Stream<StreamedListObjectsResponse> responseStream =
2709+
fga.streamedListObjects(request).get()) {
2710+
responseStream.map(StreamedListObjectsResponse::getObject).collect(Collectors.toList());
2711+
}
27112712
});
27122713

27132714
assertTrue(exception.getMessage().contains("Error in streaming response"));

0 commit comments

Comments
 (0)