Skip to content

Commit ed5cc64

Browse files
tvaron3Copilotxinlian12
authored
[Cosmos Spark] Surface empty change feed pages to avoid end-to-end timeout (Azure#49276)
* [Cosmos Spark] Surface empty change feed pages to avoid end-to-end timeout Spark partition tasks reading change feed (or executing cross-partition queries) against a sparse workload could hit OperationCancelledException ("End-to-end timeout hit when trying to retrieve the next page") at the connector's 65-second per-operation end-to-end timeout. Root cause: with the default emptyPagesAllowed=false, ParallelDocumentQueryExecutionContext and ChangeFeedFetcher swallow empty / 304 pages internally — a single producer-side nextPage() call can keep draining many sub-feedRanges before emitting one non-empty page. For sparse workloads the cumulative time blows the per-operation timeout. Fix: * Spark ItemsPartitionReader (query path) calls setAllowEmptyPages(true) on the CosmosQueryRequestOptions so the SDK's existing emptyPagesAllowed plumbing applies. * New internal-only emptyPagesAllowed flag on CosmosChangeFeedRequestOptionsImpl (default false; behavior unchanged for all other callers) plumbed through Paginator. getChangeFeedQueryResultAsObservable into ChangeFeedFetcher. nextPageInternal. When the flag is true, both 304 branches return Mono.just(r) so empty pages bubble up to the iterator. Surfaced via new package-private bridge accessor CosmosChangeFeedRequestOptionsAccessor.{get,set}AllowEmptyPages. * ChangeFeedFetcher.isFullyDrained no longer short-circuits to true on noChanges responses (it now consults only continuation.isDone()), which removes the load-bearing reEnableShouldFetchMoreForRetry() pattern that was previously needed to undo a base-class decision. * Spark ChangeFeedPartitionReader opts into the new flag via the bridge accessor. * CosmosChangeFeedRequestOptions.withCosmosPagedFluxOptions now also propagates emptyPagesAllowed when the paged-flux pull mechanism supplies a continuation token (the freshly-built impl would otherwise silently lose the flag — comment added flagging the broader drift hazard). Tests: * New ChangeFeedFetcherEmptyPagesTest (5 unit tests): exercises the isFullyDrained behavior change and asserts that nextPageInternal surfaces noChanges responses individually when the flag is true and swallows them via repeatWhenEmpty when the flag is false. * New CosmosChangeFeedRequestOptionsWithPagedFluxOptionsTest (3 unit tests): locks in the flag propagation through withCosmosPagedFluxOptions. * Extended TransientIOErrorsRetryingIteratorSpec with a regression test that drains hundreds of leading empty pages followed by data without hitting the end-to-end timeout. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Add PR link to CHANGELOG entries Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Restore noChanges short-circuit in isFullyDrained when emptyPagesAllowed=false The previous cleanup of ChangeFeedFetcher.isFullyDrained removed the noChanges short-circuit unconditionally, on the rationale that consulting continuation.isDone() was simpler. That regressed every non-Spark caller of the change-feed API: FeedRangeCompositeContinuationImpl.isDone() returns compositeContinuationTokens.size() == 0, but moveToNextToken() rotates the deque via poll() + add() and never shrinks it. So isDone() is permanently false for normal incremental change-feed iteration. For the default emptyPagesAllowed=false path: 1. The 304 arrives. 2. updateState calls isFullyDrained -> false (because isDone() is false). 3. nextPageInternal's else-branch sees handleChangeFeedNotModified return NO_RETRY (single-partition case, multi-partition cycle-complete, or the >4*(size+1) consecutive-304 defense) and falls through to Mono.just(r). 4. Paginator's generate-loop checks shouldFetchMore() -> true and calls nextPage() again -> infinite poll loop. Customer-visible impact would be: any consumer that drains queryChangeFeed(...).byPage() to completion (e.g. .toIterable().iterator(), .collectList(), .blockLast()) hangs forever once the change feed catches up. flag is true (Spark path), surface every noChanges to the caller and let the consumer decide when to stop iterating. When the flag is false (every other caller, including the SDK's public queryChangeFeed API), preserve the original termination signal. Also addressed reviewer feedback: * Drop unused org.mockito.Mockito import (would break checkstyle's UnusedImports rule). * Replace reflective field assignment in stubRequest() with direct field writes on the public fields RxDocumentServiceRequest.requestContext and .faultInjectionRequestContext. Mockito only intercepts method calls; field writes on a mock work directly. * Add a regression test isFullyDrained_noChangesResponseWithEmptyPagesAllowedFalse_returnsTrue that locks in the termination signal for the default path. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Address pass-3 review: defense-in-depth on NO_RETRY + DRY cleanup * Defense-in-depth (F1): when emptyPagesAllowed=true the streaming change- feed path takes branch 2 of nextPageInternal. If handleChangeFeedNotModified returns NO_RETRY on a noChanges page (single- partition case, multi-partition full-cycle complete, or the >4*(size+1) consecutive-304 defense in FeedRangeCompositeContinuationImpl), the SDK's built-in termination signal was being silently dropped because isFullyDrained() consults only continuation.isDone() in that mode (which is permanently false for incremental change feed). Now we explicitly call disableShouldFetchMore() to preserve the defense-in-depth termination guarantee even for emptyPagesAllowed=true callers. * DRY (F4): extracted the two near-identical 'surface or swallow via repeatWhenEmpty' blocks in nextPageInternal into a private surfaceOrSwallowNoChangesPage(r) helper. The branches now read as a one-line intent instead of seven near-identical lines. * Comment density (F9): tightened the long isFullyDrained comment to a 2-line tl;dr followed by the detailed rationale. * Test (F2): new nextPage_emptyPagesAllowedTrueWithNoRetryOnNoChanges_ terminatesIteration locks in the defense-in-depth fix - asserts that after a terminal NO_RETRY noChanges page, shouldFetchMore() flips to false so Paginator stops calling nextPage(). * Test (F3): added Mockito.verify(continuation, never()).isDone() to the pass-2 regression test so a future refactor that accidentally drops the noChanges short-circuit and falls through to the (permanently-false) continuation.isDone() check fails loudly instead of silently hanging. Test results: ChangeFeedFetcherEmptyPagesTest 7/7, FetcherTest 5/5, CosmosChangeFeedRequestOptionsWithPagedFluxOptionsTest 3/3, TransientIOErrorsRetryingIteratorSpec 7/7 - all green. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Address pass-4 review: lock contract on data-page non-termination * Test (F1): assert callIndex==4 after NO_RETRY termination in the pass-3 defense-in-depth test so a future regression that terminates iteration but still over-fetches is caught. * Test (F2): new nextPage_emptyPagesAllowedTrueWithDataPages_doesNotTerminate pins the production contract that the noChanges(r) guard on the disableShouldFetchMore() arm is load-bearing. In production, FeedRangeCompositeContinuationImpl.handleChangeFeedNotModified returns NO_RETRY for EVERY non-noChanges response (the early branch resets state and falls through). Without the noChanges(r) guard, every data page would silently truncate iteration after the first emission. * Comment (F5): added an inline rationale next to the noChanges(r) guard explaining why it must remain - prevents a future engineer from 'simplifying' away the guard without realizing the production truncation hazard. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Shrink bridge accessor surface; reclassify CHANGELOG entry * Drop the new CosmosChangeFeedRequestOptionsAccessor.{get,set}AllowEmptyPages wrapper methods. Callers (ChangeFeedQueryImpl, Spark ChangeFeedPartitionReader, the propagation unit test) now use the already-exposed accessor.getImpl(options).{is,set}EmptyPagesAllowed() instead, keeping the bridge accessor interface at its pre-PR shape. * Move the azure-cosmos CHANGELOG entry from 'Bugs Fixed' to 'Other Changes' and reword: this PR adds an internal-only field on CosmosChangeFeedRequestOptionsImpl that pure SDK consumers cannot reach without going through getImpl(). The customer-facing fix lives in the Spark connector CHANGELOGs (which keep their 'Bugs Fixed' entries). Test results: ChangeFeedFetcherEmptyPagesTest 8/8, FetcherTest 5/5, CosmosChangeFeedRequestOptionsWithPagedFluxOptionsTest 3/3, TransientIOErrorsRetryingIteratorSpec 7/7 - all green. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Address pass-5 review (Kushagra + Annie): drift hazard, test coverage, polish Major (Kushagra): * M1 — Update PR description to reflect the post-commit-34d3da4 accessor shape (getImpl() escape hatch was being misrepresented as a named accessor wrapper). * M2 — Invert the default in CosmosChangeFeedRequestOptions .withCosmosPagedFluxOptions: instead of building a fresh impl from the continuation token and copying back 4 fields, build from the token and inherit ALL non-token-encoded fields via a new CosmosChangeFeedRequestOptionsImpl.inheritNonContinuationFieldsFrom helper. This closes silent drift for endLSN, customSerializer, excludeRegions, readConsistencyStrategy, thresholds, customOptions, operationContextAndListenerTuple, keywordIdentifiers, completeAfterAllCurrentChangesRetrieved, quotaInfoEnabled, isSplitHandlingDisabled, partitionKeyDefinition, and collectionRid. The 4 token-encoded fields (continuationState, feedRangeInternal, mode, startFromInternal) remain authoritative from the parsed token. Single maintenance point for any future field. * M3 — Add timeOut = 10_000 to the 4 nextPage_* tests in ChangeFeedFetcherEmptyPagesTest so a regression that reintroduces unbounded repeatWhenEmpty drain fails fast instead of looking like CI flake (.NET parity). Minor (Kushagra + Annie): * m1+m3+m7 — Combined javadoc on CosmosChangeFeedRequestOptionsImpl .setEmptyPagesAllowed/isEmptyPagesAllowed: explains default, paging semantics impact, and the deliberate 'not surfaced on public API' decision. Replaces the PR-body claim with an in-code source of truth. * m2 = Annie #2 — Re-add the bridge accessor wrappers setAllowEmptyPages/getAllowEmptyPages on CosmosChangeFeedRequestOptionsAccessor mirroring the query-side pattern. Restores grep-discoverability and reduces refactor blast radius. The public CosmosChangeFeedRequestOptions API is unchanged (no public setter); friend-API surface only. * m4 = Annie #1 — Add 2 nextPage_endLsnSet_emptyPagesAllowed_* tests exercising branch 1 of nextPageInternal (the completeAfterAllCurrentChangesRetrieved || endLSN != null path), which is the production path Spark's ChangeFeedPartitionReader hits for bounded snapshot reads. * m5 — Add emulator-group end-to-end test CosmosContainerChangeFeedTest.changeFeedQuery_emptyPagesAllowed_ surfacesNoChangesPagesAndTerminates exercising the real FeedRangeCompositeContinuationImpl >4*(size+1) consecutive-304 defense path that mock-based unit tests can't reach (the impl class is package-private by design). * m6 — Shorten the verbose Spark + azure-cosmos CHANGELOG entries and append a brief 'one iterator callback per empty page' trade-off note for operator observability. * Annie #3 = M2 — broader drift hazard closed via the inheritNonContinuationFieldsFrom approach above. Nits: * n1 — Extract the nextPageInternal flatMap body into a private applyNoChangesDecision(FeedResponse<T>) method. Reduces nesting depth from 4 to 2 and improves readability of the contract. * n3 — Replace reflective Fetcher.isFullyDrained invocation with a direct call. The test lives in com.azure.cosmos.implementation.query, same package as Fetcher, so protected access works without reflection. Tests: * ChangeFeedFetcherEmptyPagesTest: 10 tests (was 8), all green * CosmosChangeFeedRequestOptionsWithPagedFluxOptionsTest: 6 tests (was 3 — adds endLSN, customSerializer, negative pin), all green * FetcherTest: 5/5 (no regression) * TransientIOErrorsRetryingIteratorSpec: 7/7 * CosmosContainerChangeFeedTest.changeFeedQuery_emptyPagesAllowed_*: new emulator-group test (compiles; runs in CI Test Emulator lane) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * De-flake CosmosContainerChangeFeedTest.asyncChangeFeedPrefetching The test failed intermittently on macOS / slow CI runners with 'count > 2' failing at count == 2. Root causes: * .subscribe() is fire-and-forget; Thread.sleep(3000) was the only synchronization, hoping 3s was enough for async pages to arrive * Race between the two subscriptions in FULL_FIDELITY mode: line 367 read continuation.get() synchronously without waiting for the first subscription to populate it (could feed '' to createForProcessingFromContinuation) * Slow runners need more than 3s to receive 3 pages of 100 docs at maxItemCount=10 Fix: * Replace each .subscribe() + Thread.sleep(3000) pattern with a CountDownLatch(N) for 'N pages received' + a generous 30s timeout. Deterministic ordering, no fixed sleep. * For the bounded .take(2, true) block, switch from fire-and-forget .subscribe() to .blockLast(Duration.ofSeconds(30)) so the test waits for the pipeline to complete after exactly 2 pages. * Dispose subscriptions in finally blocks to avoid leaking pages between test iterations. Test intent preserved: count > 2 on the first/resume subscriptions, count == 2 on the bounded take(2, true) subscription. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * change * add one more change * Fix asyncChangeFeedPrefetching FULL_FIDELITY: insert AFTER subscribe Previous de-flake (commit 5683b08) reordered the FULL_FIDELITY insert and the first subscription such that the insert ran BEFORE subscribing. But FULL_FIDELITY uses createForProcessingFromNow, so docs written before the subscription opens are invisible — the first subscription saw zero pages and the new CountDownLatch(3) timed out at 30s. Fix: branch by mode. INCREMENTAL keeps the pre-subscribe insert (since createForProcessingFromBeginning sees pre-existing docs). FULL_FIDELITY inserts AFTER each subscribe (first subscription, resume-from-continuation subscription, and the bounded take(2,true) subscription) so the from-now pipeline actually has writes to consume. Caught by: CosmosContainerChangeFeedTest.asyncChangeFeedPrefetching:385 [first change-feed subscription should produce at least 3 pages within 30 seconds] on Windows TCP Java8 + Java17 emulator runs, FULL_FIDELITY parameter only; INCREMENTAL passed on all platforms. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Address @xinlian12 review: rename changefeed flag, fix broken test, add spark 4.x changelogs - Renamed change-feed-side flag emptyPagesAllowed -> notModifiedPagesAllowed (and bridge accessor methods setAllowEmptyPages/getAllowEmptyPages -> setAllowNotModifiedPages/getAllowNotModifiedPages) so the name reflects what it actually controls: 304/NotModified pages from sub-partitions. Query-side CosmosQueryRequestOptions.setAllowEmptyPages stays unchanged. Test file renamed via git mv (history preserved). Closes Annie's naming feedback. - Updated ChangeFeedFetcherNotModifiedPagesTest.isFullyDrained_...returnsTrue assertion to match xinlian12's merged simpler isFullyDrained (unconditional noChanges -> true). Was failing CI build 6368298 with 'Expecting value to be false but was true' across all NotFromSource_TestsOnly + EmulatorTCP jobs. - Added missing PR 49276 bugfix entries to azure-cosmos-spark_4-0_2-13 and azure-cosmos-spark_4-1_2-13 CHANGELOGs (both ship the shared azure-cosmos-spark_3 code so the fix lands there). - Stripped IcM 51000001033272 references from test docstrings (per Annie - PR link in CHANGELOG is sufficient internal traceability). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Stabilize asyncChangeFeedPrefetching on Windows EmulatorTcp CI build 6368554 failed only on windows2022_EmulatorOnlyIntegrationTestsTcpJava{8,17} with 'first change-feed subscription should produce at least 3 pages within 30 seconds' on both INCREMENTAL and FULL_FIDELITY parameter sets. Root cause: the test's TestNG method timeOut was TIMEOUT (40s), but the deterministic de-flake from a prior session uses 30s firstLatch.await + another 30s for the FF resume phase + bounded take(.blockLast(30s)). On the slow Windows EmulatorTcp runner the sum routinely exceeds the method timeout, and even within each phase the page-arrival cadence sometimes can't deliver 3 pages in 30s. Fix: - timeOut = TIMEOUT * 5 (200s) — matches sibling emulator tests on lines 200, 1121, 1167 that also need the longer budget. - awaitSeconds = 60L — doubles the per-phase wait window so the slow runner has room to deliver pages. - retryAnalyzer = FlakyTestRetryAnalyzer.class — consistent with the changeFeedQueryEndLSNHang test on line 1070; absorbs residual jitter. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Revert asyncChangeFeedPrefetching to original Thread.sleep shape + add retry The deterministic CountDownLatch + .blockLast() de-flake I introduced earlier in the session over-corrected: while it eliminates non-determinism in principle, it traded subtle race jitter for slow-runner sensitivity that became a hard failure on Windows EmulatorTcp Java 8 (build 6368807 still failed all 3 retries even with TIMEOUT * 5 and 60s per-phase awaits). The original Thread.sleep(3000) shape had been passing in CI for months and predates this PR entirely — the test is exercising Reactor's byPage prefetch behavior on the change-feed stream, which is unrelated to the notModifiedPagesAllowed work this PR ships. Reverting to that proven shape is the lowest-risk path; retryAnalyzer = FlakyTestRetryAnalyzer is the only addition (consistent with sibling change-feed tests) to absorb the residual slow-runner jitter that remains in the original. Removed the now-unused CountDownLatch + TimeUnit imports. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Remove stale TODO-style comment on surfaceOrRetryNoChangesPage The 3-line comment was xinlian12's reasoning note explaining the need for the setFeedResponseContinuationToken call directly below it. The token re-stamp already addresses the concern (re-stamps with this.changeFeedState.toString() so the surfaced empty page carries the post-rotation cursor), so the speculative wording reads as a stale TODO. Removed. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Revert change-feed half; keep query-path empty-pages fix and CF options drift fix The change-feed half of this PR (surfacing intermediate sub-feedRange 304s to Spark via a new notModifiedPagesAllowed flag) had no IcM repro and caused a regression in TransientIOErrorsRetryingIterator. Reverting all CF-specific surface: - Reverted ChangeFeedFetcher, ChangeFeedQueryImpl, Paginator, ChangeFeedPartitionReader, and the ImplementationBridgeHelpers accessor for the CF flag - Removed notModifiedPagesAllowed field/getter/setter from CosmosChangeFeedRequestOptionsImpl and CosmosChangeFeedRequestOptions - Removed ChangeFeedFetcherNotModifiedPagesTest and 3 flag-specific tests from CosmosChangeFeedRequestOptionsWithPagedFluxOptionsTest; removed the new CF flag test from CosmosContainerChangeFeedTest - Updated 6 Spark CHANGELOGs to query-only wording (allowEmptyPages) Kept the orthogonal drift fix added in pass-5 review: inheritNonContinuationFieldsFrom in CosmosChangeFeedRequestOptionsImpl now propagates endLSN, customSerializer, excludeRegions, readConsistencyStrategy, and other non-token-encoded fields when resuming via byPage(continuation). Pre-PR code only inherited maxPrefetchPageCount and throughputControlGroupName, silently dropping the rest. New 'Bugs Fixed' bullet in azure-cosmos/CHANGELOG.md for this. Net PR is now just the query-path setAllowEmptyPages(true) in ItemsPartitionReader plus the CF options drift fix. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Merge customOptions in inheritNonContinuationFieldsFrom to preserve token-mode headers When a CosmosChangeFeedRequestOptionsImpl is rebuilt from a continuation token, the constructor populates mode-derived headers (e.g., CHANGE_FEED_WIRE_FORMAT_VERSION for FULL_FIDELITY). inheritNonContinuationFieldsFrom previously did `this.customOptions = source.customOptions`, which silently dropped those required headers if the source's customOptions did not contain them. Switch to a putIfAbsent merge so token-driven headers win on key collision and source's caller-supplied headers are added otherwise. Added 2 unit tests covering the FULL_FIDELITY header preservation case and the caller-supplied + token-mode header coexistence case. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Co-authored-by: Annie Liang <xin.liang@microsoft.com>
1 parent c46043e commit ed5cc64

13 files changed

Lines changed: 334 additions & 3 deletions

File tree

sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#### Bugs Fixed
1010
* Improved partition planning performance for change feed with large number of feed ranges. - See [PR 49086](https://github.com/Azure/azure-sdk-for-java/pull/49086)
1111
* Fixed `UnsupportedOperationException` when using `readManyByPartitionKeys` for empty pages. - See [PR 49311](https://github.com/Azure/azure-sdk-for-java/pull/49311)
12+
* Fixed `OperationCancelledException` ("End-to-end timeout hit") on sparse cross-partition queries by opting into the SDK's `allowEmptyPages` behavior, so the per-page timeout applies per page instead of being exceeded by serial empty-page drains. Note: this surfaces one iterator callback per empty page where previously a single callback could drain many. - See [PR 49276](https://github.com/Azure/azure-sdk-for-java/pull/49276)
1213

1314
#### Other Changes
1415

sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#### Bugs Fixed
1010
* Improved partition planning performance for change feed with large number of feed ranges. - See [PR 49086](https://github.com/Azure/azure-sdk-for-java/pull/49086)
1111
* Fixed `UnsupportedOperationException` when using `readManyByPartitionKeys` for empty pages. - See [PR 49311](https://github.com/Azure/azure-sdk-for-java/pull/49311)
12+
* Fixed `OperationCancelledException` ("End-to-end timeout hit") on sparse cross-partition queries by opting into the SDK's `allowEmptyPages` behavior, so the per-page timeout applies per page instead of being exceeded by serial empty-page drains. Note: this surfaces one iterator callback per empty page where previously a single callback could drain many. - See [PR 49276](https://github.com/Azure/azure-sdk-for-java/pull/49276)
1213

1314
#### Other Changes
1415

sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#### Bugs Fixed
1010
* Improved partition planning performance for change feed with large number of feed ranges. - See [PR 49086](https://github.com/Azure/azure-sdk-for-java/pull/49086)
1111
* Fixed `UnsupportedOperationException` when using `readManyByPartitionKeys` for empty pages. - See [PR 49311](https://github.com/Azure/azure-sdk-for-java/pull/49311)
12+
* Fixed `OperationCancelledException` ("End-to-end timeout hit") on sparse cross-partition queries by opting into the SDK's `allowEmptyPages` behavior, so the per-page timeout applies per page instead of being exceeded by serial empty-page drains. Note: this surfaces one iterator callback per empty page where previously a single callback could drain many. - See [PR 49276](https://github.com/Azure/azure-sdk-for-java/pull/49276)
1213

1314
#### Other Changes
1415

sdk/cosmos/azure-cosmos-spark_3-5_2-13/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#### Bugs Fixed
1010
* Improved partition planning performance for change feed with large number of feed ranges. - See [PR 49086](https://github.com/Azure/azure-sdk-for-java/pull/49086)
1111
* Fixed `UnsupportedOperationException` when using `readManyByPartitionKeys` for empty pages. - See [PR 49311](https://github.com/Azure/azure-sdk-for-java/pull/49311)
12+
* Fixed `OperationCancelledException` ("End-to-end timeout hit") on sparse cross-partition queries by opting into the SDK's `allowEmptyPages` behavior, so the per-page timeout applies per page instead of being exceeded by serial empty-page drains. Note: this surfaces one iterator callback per empty page where previously a single callback could drain many. - See [PR 49276](https://github.com/Azure/azure-sdk-for-java/pull/49276)
1213

1314
#### Other Changes
1415

sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ItemsPartitionReader.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,14 @@ private case class ItemsPartitionReader
4545
.getCosmosQueryRequestOptionsAccessor
4646
.disallowQueryPlanRetrieval(new CosmosQueryRequestOptions())
4747

48+
// Bubble empty pages up to the iterator so the per-page end-to-end timeout
49+
// applies to each individual page rather than being exceeded by serial
50+
// empty-page drains inside ParallelDocumentQueryExecutionContext.
51+
ImplementationBridgeHelpers
52+
.CosmosQueryRequestOptionsHelper
53+
.getCosmosQueryRequestOptionsAccessor
54+
.setAllowEmptyPages(queryOptions, true)
55+
4856
private val readConfig = CosmosReadConfig.parseCosmosReadConfig(config)
4957
ThroughputControlHelper.populateThroughputControlGroupName(
5058
ImplementationBridgeHelpers

sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIteratorSpec.scala

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,68 @@ class TransientIOErrorsRetryingIteratorSpec extends UnitSpec with BasicLoggingTr
180180
factoryCallCount.get shouldEqual 1
181181
}
182182

183+
"TransientIOErrors" should "drain long runs of empty pages without hitting the end-to-end timeout" in {
184+
// Regression test for the empty-page drain scenario: when the SDK is configured with
185+
// emptyPagesAllowed=true the iterator must surface many consecutive empty
186+
// pages without busy-waiting beyond the per-page end-to-end timeout. Even
187+
// with hundreds of empty pages followed by real data, the iterator should
188+
// return all real rows.
189+
val emptyLeadingPages = 200
190+
val realPages = 5
191+
val totalPages = emptyLeadingPages + realPages
192+
val iterator = new TransientIOErrorsRetryingIterator(
193+
continuationToken => generateMockedCosmosPagedFluxWithEmptyPrefix(
194+
continuationToken, totalPages, emptyLeadingPages),
195+
pageSize,
196+
1,
197+
None,
198+
None
199+
)
200+
iterator.maxRetryIntervalInMs = 5
201+
202+
// 2 producers (Left/Right) each emit realPages * pageSize rows
203+
iterator.count(_ => true) shouldEqual (realPages * pageSize * 2)
204+
}
205+
206+
private def generateMockedCosmosPagedFluxWithEmptyPrefix
207+
(
208+
continuationToken: String,
209+
initialPageCount: Int,
210+
leadingEmptyPageCount: Int
211+
) = {
212+
213+
val leftProducer = generateFeedResponseFluxWithEmptyPrefix(
214+
"Left", initialPageCount, leadingEmptyPageCount, Option.apply(continuationToken))
215+
val rightProducer = generateFeedResponseFluxWithEmptyPrefix(
216+
"Right", initialPageCount, leadingEmptyPageCount, Option.apply(continuationToken))
217+
val toBeMerged = Array(leftProducer, rightProducer).toIterable.asJava
218+
val mergedFlux = Flux.mergeSequential(toBeMerged, 1, 2)
219+
UtilBridgeInternal.createCosmosPagedFlux(_ => mergedFlux)
220+
}
221+
222+
private def generateFeedResponseFluxWithEmptyPrefix
223+
(
224+
prefix: String,
225+
pageCount: Int,
226+
leadingEmptyPageCount: Int,
227+
requestContinuationToken: Option[String]
228+
): Flux[FeedResponse[SparkRowItem]] = {
229+
230+
// generateFeedResponse uses documentStartIndex=-1 as the "emit an empty page" sentinel.
231+
val emptyPageSentinel = -1
232+
val firstDataPageStartIndex = 1
233+
234+
val responses = Array.range(1, pageCount + 1)
235+
.map(i => generateFeedResponse(
236+
prefix,
237+
i,
238+
if (i <= leadingEmptyPageCount) emptyPageSentinel else firstDataPageStartIndex))
239+
.filter(response => requestContinuationToken.isEmpty ||
240+
requestContinuationToken.get < response.getContinuationToken)
241+
242+
Flux.fromArray(responses)
243+
}
244+
183245
private val objectMapper = new ObjectMapper
184246

185247
@throws[JsonProcessingException]

sdk/cosmos/azure-cosmos-spark_4-0_2-13/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#### Bugs Fixed
1010
* Improved partition planning performance for change feed with large number of feed ranges. - See [PR 49086](https://github.com/Azure/azure-sdk-for-java/pull/49086)
1111
* Fixed `UnsupportedOperationException` when using `readManyByPartitionKeys` for empty pages. - See [PR 49311](https://github.com/Azure/azure-sdk-for-java/pull/49311)
12+
* Fixed `OperationCancelledException` ("End-to-end timeout hit") on sparse cross-partition queries by opting into the SDK's `allowEmptyPages` behavior, so the per-page timeout applies per page instead of being exceeded by serial empty-page drains. Note: this surfaces one iterator callback per empty page where previously a single callback could drain many. - See [PR 49276](https://github.com/Azure/azure-sdk-for-java/pull/49276)
1213

1314
#### Other Changes
1415

sdk/cosmos/azure-cosmos-spark_4-1_2-13/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#### Bugs Fixed
1010
* Improved partition planning performance for change feed with large number of feed ranges. - See [PR 49086](https://github.com/Azure/azure-sdk-for-java/pull/49086)
1111
* Fixed `UnsupportedOperationException` when using `readManyByPartitionKeys` for empty pages. - See [PR 49311](https://github.com/Azure/azure-sdk-for-java/pull/49311)
12+
* Fixed `OperationCancelledException` ("End-to-end timeout hit") on sparse cross-partition queries by opting into the SDK's `allowEmptyPages` behavior, so the per-page timeout applies per page instead of being exceeded by serial empty-page drains. Note: this surfaces one iterator callback per empty page where previously a single callback could drain many. - See [PR 49276](https://github.com/Azure/azure-sdk-for-java/pull/49276)
1213

1314
#### Other Changes
1415

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -332,8 +332,14 @@ public void asyncChangeFeed_fromBeginning_incremental_forLogicalPartition() thro
332332
}
333333
}
334334

335-
@Test(groups = { "emulator" }, dataProvider = "changeFeedQueryPrefetchingDataProvider", timeOut = TIMEOUT)
335+
@Test(groups = { "emulator" }, dataProvider = "changeFeedQueryPrefetchingDataProvider",
336+
timeOut = TIMEOUT, retryAnalyzer = FlakyTestRetryAnalyzer.class)
336337
public void asyncChangeFeedPrefetching(ChangeFeedMode changeFeedMode) throws Exception {
338+
// Note on shape: this test verifies Reactor's prefetch behavior on the change-feed
339+
// byPage stream. The two fire-and-forget `.subscribe()` calls + `Thread.sleep(3000)`
340+
// are intentional — they exercise the prefetch path without backpressure-bounded
341+
// collection. retryAnalyzer = FlakyTestRetryAnalyzer absorbs occasional slow-runner
342+
// jitter (Windows EmulatorTcp Java 8 can take >3s to deliver the first 3 pages).
337343
this.createContainer(
338344
(cp) -> {
339345
if (changeFeedMode.equals(ChangeFeedMode.INCREMENTAL)) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.cosmos.implementation;
5+
6+
import com.azure.cosmos.CosmosItemSerializer;
7+
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedMode;
8+
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStartFromInternal;
9+
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStateV1;
10+
import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl;
11+
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
12+
import com.azure.cosmos.models.ModelBridgeInternal;
13+
import org.testng.annotations.Test;
14+
15+
import static org.assertj.core.api.Assertions.assertThat;
16+
17+
/**
18+
* Unit tests for the paged-flux pull continuation path on
19+
* {@link CosmosChangeFeedRequestOptions#withCosmosPagedFluxOptions(CosmosPagedFluxOptions)} (package-visible via
20+
* {@link ModelBridgeInternal#getEffectiveChangeFeedRequestOptions(CosmosChangeFeedRequestOptions, CosmosPagedFluxOptions)}).
21+
*
22+
* <p>That method silently builds a brand-new {@code CosmosChangeFeedRequestOptionsImpl} when the caller supplies a
23+
* continuation token via {@link CosmosPagedFluxOptions}, so any field NOT explicitly copied is dropped. These tests
24+
* lock in the propagation of fields whose loss would silently break a feature.
25+
*/
26+
public class CosmosChangeFeedRequestOptionsWithPagedFluxOptionsTest {
27+
28+
@Test(groups = { "unit" })
29+
public void endLSN_isPropagated_whenContinuationTokenSupplied() {
30+
// Locks in the bounded-change-feed contract across a byPage(savedContinuation) round-trip:
31+
// a caller who set endLSN=42 must continue to see iteration bounded by LSN 42 after resume.
32+
// Before the inheritNonContinuationFieldsFrom fix, endLSN was silently dropped on the rebuild
33+
// path, turning a bounded change feed into an unbounded one.
34+
CosmosChangeFeedRequestOptions src = CosmosChangeFeedRequestOptions
35+
.createForProcessingFromBeginning(FeedRangeEpkImpl.forFullRange());
36+
ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper
37+
.getCosmosChangeFeedRequestOptionsAccessor()
38+
.setEndLSN(src, 42L);
39+
40+
CosmosPagedFluxOptions pagedFluxOptions = new CosmosPagedFluxOptions();
41+
pagedFluxOptions.setRequestContinuation(buildContinuationToken());
42+
43+
CosmosChangeFeedRequestOptions effective = ModelBridgeInternal
44+
.getEffectiveChangeFeedRequestOptions(src, pagedFluxOptions);
45+
46+
assertThat(ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper
47+
.getCosmosChangeFeedRequestOptionsAccessor()
48+
.getEndLSN(effective))
49+
.describedAs("endLSN must survive the paged-flux pull continuation rebuild")
50+
.isEqualTo(42L);
51+
}
52+
53+
@Test(groups = { "unit" })
54+
public void customSerializer_isPropagated_whenContinuationTokenSupplied() {
55+
// Locks in custom-serializer preservation across a byPage(savedContinuation) round-trip:
56+
// a caller who registered a custom CosmosItemSerializer must continue to see items
57+
// deserialized through that serializer after resume. Before the inheritNonContinuationFieldsFrom
58+
// fix, the customSerializer was silently dropped on the rebuild path, falling back to the
59+
// SDK's internal default serializer and potentially producing wrong field values.
60+
CosmosItemSerializer sentinel = new CosmosItemSerializer() {
61+
@Override
62+
public <T> java.util.Map<String, Object> serialize(T item) { return null; }
63+
64+
@Override
65+
public <T> T deserialize(java.util.Map<String, Object> jsonNodeMap, Class<T> classType) { return null; }
66+
};
67+
CosmosChangeFeedRequestOptions src = CosmosChangeFeedRequestOptions
68+
.createForProcessingFromBeginning(FeedRangeEpkImpl.forFullRange());
69+
src.setCustomItemSerializer(sentinel);
70+
71+
CosmosPagedFluxOptions pagedFluxOptions = new CosmosPagedFluxOptions();
72+
pagedFluxOptions.setRequestContinuation(buildContinuationToken());
73+
74+
CosmosChangeFeedRequestOptions effective = ModelBridgeInternal
75+
.getEffectiveChangeFeedRequestOptions(src, pagedFluxOptions);
76+
77+
assertThat(effective.getCustomItemSerializer())
78+
.describedAs("customSerializer must survive the paged-flux pull continuation rebuild")
79+
.isSameAs(sentinel);
80+
}
81+
82+
@Test(groups = { "unit" })
83+
public void tokenEncodedFields_overrideCallerSuppliedValues_whenContinuationTokenSupplied() {
84+
// Negative pin: the four token-encoded fields (continuationState, feedRangeInternal, mode,
85+
// startFromInternal) MUST come from the token, not from the caller's pre-resume options.
86+
// The caller's options here have continuationState=null (createForProcessingFromBeginning),
87+
// but the resulting effective options must have a non-null continuationState parsed from
88+
// the supplied token. If a future refactor accidentally inherits the token-encoded fields
89+
// from the source impl (e.g. moving them into inheritNonContinuationFieldsFrom), this test
90+
// catches the regression because the source's continuationState would clobber the token's.
91+
CosmosChangeFeedRequestOptions src = CosmosChangeFeedRequestOptions
92+
.createForProcessingFromBeginning(FeedRangeEpkImpl.forFullRange());
93+
94+
CosmosPagedFluxOptions pagedFluxOptions = new CosmosPagedFluxOptions();
95+
pagedFluxOptions.setRequestContinuation(buildContinuationToken());
96+
97+
CosmosChangeFeedRequestOptions effective = ModelBridgeInternal
98+
.getEffectiveChangeFeedRequestOptions(src, pagedFluxOptions);
99+
100+
assertThat(ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper
101+
.getCosmosChangeFeedRequestOptionsAccessor()
102+
.getImpl(effective)
103+
.getContinuation())
104+
.describedAs("continuationState is encoded in the token and MUST override the caller's pre-resume value")
105+
.isNotNull();
106+
}
107+
108+
@Test(groups = { "unit" })
109+
public void fullFidelityWireFormatHeader_isPreserved_whenSourceHasNoCustomHeaders() {
110+
// Reviewer-found bug: inheritNonContinuationFieldsFrom used to do
111+
// `this.customOptions = source.customOptions`, which clobbered the
112+
// CHANGE_FEED_WIRE_FORMAT_VERSION header set by the constructor when mode==FULL_FIDELITY.
113+
// If the source's customOptions is null (typical for callers who only set high-level
114+
// options), the resume would produce a FULL_FIDELITY request WITHOUT the required wire
115+
// format header. The merge-don't-clobber fix preserves token-mode-derived headers.
116+
CosmosChangeFeedRequestOptions src = CosmosChangeFeedRequestOptions
117+
.createForProcessingFromNow(FeedRangeEpkImpl.forFullRange());
118+
119+
CosmosPagedFluxOptions pagedFluxOptions = new CosmosPagedFluxOptions();
120+
pagedFluxOptions.setRequestContinuation(buildFullFidelityContinuationToken());
121+
122+
CosmosChangeFeedRequestOptions effective = ModelBridgeInternal
123+
.getEffectiveChangeFeedRequestOptions(src, pagedFluxOptions);
124+
125+
java.util.Map<String, String> headers = ImplementationBridgeHelpers
126+
.CosmosChangeFeedRequestOptionsHelper
127+
.getCosmosChangeFeedRequestOptionsAccessor()
128+
.getHeaders(effective);
129+
130+
assertThat(headers)
131+
.describedAs("token-derived FULL_FIDELITY wire format header must survive the rebuild")
132+
.isNotNull()
133+
.containsEntry(
134+
HttpConstants.HttpHeaders.CHANGE_FEED_WIRE_FORMAT_VERSION,
135+
HttpConstants.ChangeFeedWireFormatVersions.SEPARATE_METADATA_WITH_CRTS);
136+
}
137+
138+
@Test(groups = { "unit" })
139+
public void callerSuppliedCustomHeaders_areMergedWith_tokenDerivedHeaders() {
140+
// Companion to the above: when the source HAS its own custom headers AND the token's
141+
// mode triggers constructor-set headers, both must coexist after inherit. The merge
142+
// semantics: token-mode headers win on key collision; source headers are added otherwise.
143+
CosmosChangeFeedRequestOptions src = CosmosChangeFeedRequestOptions
144+
.createForProcessingFromNow(FeedRangeEpkImpl.forFullRange());
145+
ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper
146+
.getCosmosChangeFeedRequestOptionsAccessor()
147+
.setHeader(src, "X-Caller-Header", "caller-value");
148+
149+
CosmosPagedFluxOptions pagedFluxOptions = new CosmosPagedFluxOptions();
150+
pagedFluxOptions.setRequestContinuation(buildFullFidelityContinuationToken());
151+
152+
CosmosChangeFeedRequestOptions effective = ModelBridgeInternal
153+
.getEffectiveChangeFeedRequestOptions(src, pagedFluxOptions);
154+
155+
java.util.Map<String, String> headers = ImplementationBridgeHelpers
156+
.CosmosChangeFeedRequestOptionsHelper
157+
.getCosmosChangeFeedRequestOptionsAccessor()
158+
.getHeaders(effective);
159+
160+
assertThat(headers)
161+
.describedAs("both caller-supplied and token-mode-derived headers must be present")
162+
.containsEntry("X-Caller-Header", "caller-value")
163+
.containsEntry(
164+
HttpConstants.HttpHeaders.CHANGE_FEED_WIRE_FORMAT_VERSION,
165+
HttpConstants.ChangeFeedWireFormatVersions.SEPARATE_METADATA_WITH_CRTS);
166+
}
167+
168+
private static String buildContinuationToken() {
169+
// Build a real ChangeFeedState so we can serialize a valid (base64-encoded) continuation token.
170+
// We use the state's own toString() which round-trips through createForProcessingFromContinuation.
171+
ChangeFeedStateV1 state = new ChangeFeedStateV1(
172+
"someContainerRid",
173+
FeedRangeEpkImpl.forFullRange(),
174+
ChangeFeedMode.INCREMENTAL,
175+
ChangeFeedStartFromInternal.createFromBeginning(),
176+
null);
177+
return state.toString();
178+
}
179+
180+
private static String buildFullFidelityContinuationToken() {
181+
ChangeFeedStateV1 state = new ChangeFeedStateV1(
182+
"someContainerRid",
183+
FeedRangeEpkImpl.forFullRange(),
184+
ChangeFeedMode.FULL_FIDELITY,
185+
ChangeFeedStartFromInternal.createFromNow(),
186+
null);
187+
return state.toString();
188+
}
189+
}

0 commit comments

Comments
 (0)