Skip to content

Commit 0f09ae9

Browse files
authored
feat: add a single-pass cursor extractor for CursorPaginationStrategy (#98)
PR: #98
1 parent 0efad1e commit 0f09ae9

7 files changed

Lines changed: 259 additions & 76 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ See [docs/pipelines.md](docs/pipelines.md) for the step-author walkthrough.
256256
| `http.auth` | `Credential` sealed hierarchy (`KeyCredential`, `NamedKeyCredential`, `BearerToken`), `BearerTokenProvider`, `AuthScheme`, `AuthMetadata`, RFC 7235 challenge parser, `BasicChallengeHandler`, `DigestChallengeHandler`, `CompositeChallengeHandler`. |
257257
| `http.sse` | `ServerSentEventReader` (WHATWG spec), `ServerSentEvent`, `ServerSentEventListener`, `BufferedSource.readServerSentEvents()`. |
258258
| `http.paging` | `PagedIterable<T>`, `PagedResponse<T>`, `PagingOptions` with `byPage()` and `stream()` accessors. |
259-
| `pagination` | `Paginator<T>` (with a `maxPages` safety cap) over cursor / page-number / link-header `PaginationStrategy` implementations, plus `Page<T>` / `SimplePage<T>`. |
259+
| `pagination` | `Paginator<T>` (with a `maxPages` safety cap) over cursor / page-number / link-header `PaginationStrategy` implementations, plus `Page<T>` / `SimplePage<T>`. Token-style APIs use `CursorPaginationStrategy` with the query-param name set (e.g. `"page_token"`). |
260260
| `pipeline` | Recovery-aware primitives: `RequestPipeline`, `ResponsePipeline`, `ExecutionPipeline` over a sealed `ResponseOutcome`, with steps (`pipeline.step`, `pipeline.step.retry`) like `RetryStep`, `ResponseRecoveryStep`, `IdempotencyKeyStep`, `ClientIdentityStep`. |
261261
| `serde` | `Serde`, `Serializer`, `Deserializer` abstractions, `Tristate<T>` (absent / null / present), and `SerdeException` (the unchecked failure adapters translate codec errors into). |
262262
| `io` | `Source`, `Sink`, `Buffer`, `BufferedSource`, `BufferedSink`, `IoProvider`, `Io`, `TeeSink`. |

sdk-core/api/sdk-core.api

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1960,12 +1960,25 @@ public abstract interface class org/dexpace/sdk/core/io/Source : java/io/Closeab
19601960
}
19611961

19621962
public final class org/dexpace/sdk/core/pagination/CursorPaginationStrategy : org/dexpace/sdk/core/pagination/PaginationStrategy {
1963-
public fun <init> (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;)V
1964-
public fun <init> (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Ljava/lang/String;)V
1965-
public synthetic fun <init> (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Ljava/lang/String;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
1963+
public fun <init> (Lkotlin/jvm/functions/Function1;)V
1964+
public fun <init> (Lkotlin/jvm/functions/Function1;Ljava/lang/String;)V
1965+
public synthetic fun <init> (Lkotlin/jvm/functions/Function1;Ljava/lang/String;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
19661966
public fun parse (Lorg/dexpace/sdk/core/http/response/Response;Lorg/dexpace/sdk/core/http/request/Request;)Lorg/dexpace/sdk/core/pagination/Page;
19671967
}
19681968

1969+
public final class org/dexpace/sdk/core/pagination/CursorResult {
1970+
public fun <init> (Ljava/util/List;Ljava/lang/String;)V
1971+
public final fun component1 ()Ljava/util/List;
1972+
public final fun component2 ()Ljava/lang/String;
1973+
public final fun copy (Ljava/util/List;Ljava/lang/String;)Lorg/dexpace/sdk/core/pagination/CursorResult;
1974+
public static synthetic fun copy$default (Lorg/dexpace/sdk/core/pagination/CursorResult;Ljava/util/List;Ljava/lang/String;ILjava/lang/Object;)Lorg/dexpace/sdk/core/pagination/CursorResult;
1975+
public fun equals (Ljava/lang/Object;)Z
1976+
public final fun getItems ()Ljava/util/List;
1977+
public final fun getNextCursor ()Ljava/lang/String;
1978+
public fun hashCode ()I
1979+
public fun toString ()Ljava/lang/String;
1980+
}
1981+
19691982
public final class org/dexpace/sdk/core/pagination/LinkHeaderPaginationStrategy : org/dexpace/sdk/core/pagination/PaginationStrategy {
19701983
public fun <init> (Lkotlin/jvm/functions/Function1;)V
19711984
public fun <init> (Lkotlin/jvm/functions/Function1;Ljava/lang/String;)V

sdk-core/src/main/kotlin/org/dexpace/sdk/core/pagination/CursorPaginationStrategy.kt

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,39 +21,45 @@ import org.dexpace.sdk.core.http.response.Response
2121
* - Page-N response: arbitrary body containing both the items and the next cursor.
2222
* - End of stream: response carries a `null` or absent next cursor.
2323
*
24-
* The strategy is **stateless**: it relies on [itemsExtractor] and [cursorExtractor]
25-
* lambdas to pull data out of the response, and on [RequestRebuilder] to mutate the
26-
* initial request's URL with the new cursor query parameter.
24+
* The strategy is **stateless**: it relies on a single [extractor] to pull both the items
25+
* and the next cursor out of the response in one pass — see [CursorResult] — and rewrites the
26+
* initial request's URL, setting the cursor query parameter to the new value, for the next
27+
* page.
28+
*
29+
* ## Single read
30+
*
31+
* A response body is single-use. Because cursor APIs carry the items and the next cursor in
32+
* the same payload, the extractor reads the body **once** and returns both via a
33+
* [CursorResult]. This avoids the double-drain trap of pulling items and cursor with two
34+
* independent `(Response) -> …` lambdas, which forces either a failed second read or a
35+
* per-response cache to work around it.
2736
*
2837
* @param T Element type extracted from the response.
29-
* @property itemsExtractor Reads the list of items from the response. Called once per
30-
* page; must drain the response body synchronously.
31-
* @property cursorExtractor Reads the next cursor from the response, or returns `null`
32-
* if there are no more pages.
38+
* @property extractor Reads the items and next cursor from the response in a single pass.
39+
* Called once per page; must drain the response body synchronously and return both pieces.
3340
* @property cursorQueryParam Query parameter name used to send the cursor (default
3441
* `"cursor"`). Servers vary; common alternatives include `"after"`, `"next"`,
3542
* `"pageCursor"`.
3643
*/
3744
public class CursorPaginationStrategy<T>
3845
@JvmOverloads
3946
constructor(
40-
private val itemsExtractor: (Response) -> List<T>,
41-
private val cursorExtractor: (Response) -> String?,
47+
private val extractor: (Response) -> CursorResult<T>,
4248
private val cursorQueryParam: String = "cursor",
4349
) : PaginationStrategy<T> {
4450
override fun parse(
4551
response: Response,
4652
initialRequest: Request,
4753
): Page<T> {
48-
val items: List<T> = itemsExtractor(response)
49-
val nextCursor: String? = cursorExtractor(response)
54+
val result: CursorResult<T> = extractor(response)
55+
val nextCursor: String? = result.nextCursor
5056
val hasNext: Boolean = !nextCursor.isNullOrEmpty()
5157
val nextRequest: Request? =
5258
if (hasNext) {
5359
RequestRebuilder.withQueryParam(initialRequest, cursorQueryParam, nextCursor)
5460
} else {
5561
null
5662
}
57-
return SimplePage(items = items, hasNext = hasNext, nextRequest = nextRequest)
63+
return SimplePage(items = result.items, hasNext = hasNext, nextRequest = nextRequest)
5864
}
5965
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright (c) 2026 dexpace and Omar Aljarrah
3+
*
4+
* Licensed under the MIT License. See LICENSE in the project root.
5+
* SPDX-License-Identifier: MIT
6+
*/
7+
8+
package org.dexpace.sdk.core.pagination
9+
10+
import org.dexpace.sdk.core.http.response.Response
11+
12+
/**
13+
* Result of a single-pass cursor extraction: the items on a page together with the cursor
14+
* for the next page, pulled from one read of the [Response].
15+
*
16+
* Cursor APIs return both the items and the next cursor in the same payload, so reading
17+
* them with two separate `(Response) -> …` lambdas means draining the body twice. A response
18+
* body is single-use, so the second drain either fails or — at best — is worked around with
19+
* a per-response cache. `CursorResult` lets a [CursorPaginationStrategy] extractor parse the
20+
* response once and hand back both pieces, keeping the public API honest about the
21+
* single-read constraint.
22+
*
23+
* ## Thread-safety
24+
*
25+
* The [items] list passed at construction is stored by reference, so a caller that retains a
26+
* mutable list can mutate it afterwards and the change will be visible through this result.
27+
* The bundled extractor path always builds a fresh list per page, so values produced by the
28+
* SDK are effectively immutable; hand-built results that share a mutable list should copy it
29+
* (`items.toList()`) before constructing.
30+
*
31+
* @param T Element type carried in [items].
32+
* @property items Items on the page, in server-defined order. Never `null`; may be empty.
33+
* @property nextCursor Opaque cursor to send on the next request, or `null` (or empty) when
34+
* the response signals end-of-stream.
35+
*/
36+
public data class CursorResult<out T>(
37+
public val items: List<T>,
38+
public val nextCursor: String?,
39+
)

sdk-core/src/test/kotlin/org/dexpace/sdk/core/pagination/CursorPaginationTest.kt

Lines changed: 14 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ package org.dexpace.sdk.core.pagination
1010
import org.dexpace.sdk.core.http.request.Method
1111
import org.dexpace.sdk.core.http.request.Request
1212
import org.dexpace.sdk.core.http.response.Response
13-
import java.util.IdentityHashMap
1413
import java.util.stream.Collectors
1514
import kotlin.test.BeforeTest
1615
import kotlin.test.Test
@@ -29,34 +28,18 @@ class CursorPaginationTest {
2928
.build()
3029

3130
/**
32-
* Parses the body format `items=<csv>\ncursor=<next-or-empty>` into a (items, cursor)
33-
* pair. Reads the body exactly once.
31+
* Single-pass extractor for the body format `items=<csv>\ncursor=<next-or-empty>`. Reads
32+
* the body exactly once and returns both the items and the next cursor as a
33+
* [CursorResult], so there is no double-drain of the single-use response body.
3434
*/
35-
private fun parsePayload(resp: Response): Pair<List<String>, String?> {
35+
private val extractor: (Response) -> CursorResult<String> = { resp ->
3636
val body = resp.body!!.source().use { it.readUtf8() }
3737
val itemsLine = body.lineSequence().firstOrNull { it.startsWith("items=") } ?: "items="
3838
val cursorLine = body.lineSequence().firstOrNull { it.startsWith("cursor=") } ?: "cursor="
3939
val itemsRaw = itemsLine.removePrefix("items=")
4040
val cursorRaw = cursorLine.removePrefix("cursor=")
4141
val items = if (itemsRaw.isEmpty()) emptyList() else itemsRaw.split(",")
42-
val cursor: String? = cursorRaw.ifEmpty { null }
43-
return Pair(items, cursor)
44-
}
45-
46-
/**
47-
* Pair of extractors that share a per-Response identity-keyed cache so the
48-
* single-use body is read exactly once per page even though the strategy's contract
49-
* splits items + cursor into two calls.
50-
*/
51-
private fun buildCachedExtractors(): Pair<(Response) -> List<String>, (Response) -> String?> {
52-
val cache: MutableMap<Response, Pair<List<String>, String?>> = IdentityHashMap()
53-
val items: (Response) -> List<String> = { r ->
54-
cache.getOrPut(r) { parsePayload(r) }.first
55-
}
56-
val cursor: (Response) -> String? = { r ->
57-
cache.getOrPut(r) { parsePayload(r) }.second
58-
}
59-
return Pair(items, cursor)
42+
CursorResult(items, cursorRaw.ifEmpty { null })
6043
}
6144

6245
@Test
@@ -72,8 +55,7 @@ class CursorPaginationTest {
7255
textResponse(req, "items=g,h,i\ncursor=")
7356
}
7457

75-
val (items, cursor) = buildCachedExtractors()
76-
val strategy = CursorPaginationStrategy(items, cursor, cursorQueryParam = "cursor")
58+
val strategy = CursorPaginationStrategy(extractor, cursorQueryParam = "cursor")
7759
val paginator = Paginator(client, initialRequest(), strategy)
7860

7961
val collected: List<String> = paginator.iterateAll().toList()
@@ -96,8 +78,7 @@ class CursorPaginationTest {
9678
client.on("https://api.example.com/items") { req ->
9779
textResponse(req, "items=only-1,only-2\ncursor=")
9880
}
99-
val (items, cursor) = buildCachedExtractors()
100-
val strategy = CursorPaginationStrategy(items, cursor, "cursor")
81+
val strategy = CursorPaginationStrategy(extractor, "cursor")
10182
val paginator = Paginator(client, initialRequest(), strategy)
10283
assertEquals(listOf("only-1", "only-2"), paginator.iterateAll().toList())
10384
assertEquals(1, client.callCount)
@@ -123,8 +104,7 @@ class CursorPaginationTest {
123104
.addHeader("Authorization", "Bearer xyz")
124105
.build()
125106

126-
val (items, cursor) = buildCachedExtractors()
127-
val strategy = CursorPaginationStrategy(items, cursor)
107+
val strategy = CursorPaginationStrategy(extractor)
128108
val paginator = Paginator(client, authRequest, strategy)
129109
assertEquals(listOf("a", "b"), paginator.iterateAll().toList())
130110
}
@@ -139,8 +119,7 @@ class CursorPaginationTest {
139119
textResponse(req, "items=3,4\ncursor=")
140120
}
141121

142-
val (items, cursor) = buildCachedExtractors()
143-
val strategy = CursorPaginationStrategy(items, cursor)
122+
val strategy = CursorPaginationStrategy(extractor)
144123
val paginator = Paginator(client, initialRequest(), strategy)
145124
val streamed: List<String> = paginator.streamAll().collect(Collectors.toList())
146125
assertEquals(listOf("1", "2", "3", "4"), streamed)
@@ -149,50 +128,25 @@ class CursorPaginationTest {
149128
@Test
150129
fun `cursor with special characters is URL encoded in next request`() {
151130
// Opaque cursors may contain `=` `+` `/` characters (base64) — the rebuilder must
152-
// URL-encode them so the server sees the original value unmangled.
131+
// URL-encode them so the server sees the original value unmangled. A custom query
132+
// param name (e.g. `page_token`) covers token-style APIs that reuse this strategy.
153133
val rawCursor = "a+b/c="
154134
val encoded = "a%2Bb%2Fc%3D"
155135
val client = StubHttpClient()
156136
client.on("https://api.example.com/items") { req ->
157137
textResponse(req, "items=one\ncursor=$rawCursor")
158138
}
159-
client.on("https://api.example.com/items?cursor=$encoded") { req ->
160-
textResponse(req, "items=two\ncursor=")
161-
}
162-
163-
val (items, cursor) = buildCachedExtractors()
164-
val strategy = CursorPaginationStrategy(items, cursor)
165-
val paginator = Paginator(client, initialRequest(), strategy)
166-
assertEquals(listOf("one", "two"), paginator.iterateAll().toList())
167-
assertEquals(
168-
listOf(
169-
"https://api.example.com/items",
170-
"https://api.example.com/items?cursor=$encoded",
171-
),
172-
client.receivedUrls,
173-
)
174-
}
175-
176-
@Test
177-
fun `custom query-param name is used for the next-page cursor`() {
178-
// Token-style APIs (next_page_token, pageToken, …) are served by setting
179-
// cursorQueryParam; the next request must carry the cursor under that name.
180-
val client = StubHttpClient()
181-
client.on("https://api.example.com/items") { req ->
182-
textResponse(req, "items=one\ncursor=tok1")
183-
}
184-
client.on("https://api.example.com/items?page_token=tok1") { req ->
139+
client.on("https://api.example.com/items?page_token=$encoded") { req ->
185140
textResponse(req, "items=two\ncursor=")
186141
}
187142

188-
val (items, cursor) = buildCachedExtractors()
189-
val strategy = CursorPaginationStrategy(items, cursor, cursorQueryParam = "page_token")
143+
val strategy = CursorPaginationStrategy(extractor, cursorQueryParam = "page_token")
190144
val paginator = Paginator(client, initialRequest(), strategy)
191145
assertEquals(listOf("one", "two"), paginator.iterateAll().toList())
192146
assertEquals(
193147
listOf(
194148
"https://api.example.com/items",
195-
"https://api.example.com/items?page_token=tok1",
149+
"https://api.example.com/items?page_token=$encoded",
196150
),
197151
client.receivedUrls,
198152
)

0 commit comments

Comments
 (0)