Skip to content

Commit f5b1aba

Browse files
authored
dataconnect(chore): Refactor realtime queries to use SharedFlow features (#8184)
1 parent c786a42 commit f5b1aba

18 files changed

Lines changed: 1467 additions & 471 deletions
Lines changed: 14 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,6 @@
11
# Realtime Query Subscription TODO List
22

3-
### TODO 1: Lack of Connection Health Monitoring / Reconnection
4-
5-
* **File:** RealtimeQueryManager.kt
6-
* **Severity:** `CRITICAL`
7-
8-
#### Description
9-
10-
Once `RealtimeQueryManager` successfully transitions to `State.Connected(stream)`, it remains in
11-
this state permanently. If the underlying bidirectional gRPC connection is lost or the stream is
12-
closed (due to network issues, server-side termination, or client-side close), the manager does not
13-
detect this. Any future calls to `subscribe()` will read `State.Connected` and try to use the dead
14-
stream, causing new subscriptions to silently hang or fail indefinitely without any reconnection
15-
attempts.
16-
17-
#### Recommendation
18-
19-
Add connection health monitoring or detect when the stream has completed/failed, and transition
20-
the manager's state back to `State.Disconnected` (or clean up resources) to allow subsequent
21-
subscription calls to trigger a new connection.
22-
23-
---
24-
25-
### TODO 2: Permanent Lock-out / Stuck in `Connecting` State on Connection Failure
26-
27-
* **File:** RealtimeQueryManager.kt
28-
* **Severity:** `CRITICAL`
29-
30-
#### Description
31-
32-
In `ensureConnected`, `currentState.job.await()` is called to wait for the lazy `Deferred`
33-
connection job. If the connection attempt fails (e.g., due to a temporary network issue) and
34-
throws an exception, the exception propagates out of the method. Because the exception is thrown
35-
before the state is updated, the manager's state remains permanently stuck in
36-
`State.Connecting(job)`. Any future connection attempts will call `await()` on the same failed
37-
`Deferred` job, which immediately and permanently re-throws the same cached exception, making the
38-
manager completely unusable until the app or SDK instance is restarted.
39-
40-
### TODO 3: Memory and Resource Leak of Subscription Flows in `flowByQueryId`
3+
### TODO 1: Memory and Resource Leak of Subscription Flows in `flowByQueryId`
414

425
* **File:** RealtimeQueryManager.kt
436
* **Severity:** `HIGH`
@@ -56,33 +19,26 @@ wasting bandwidth and server resources.
5619
Implement a reference-counting mechanism or a cleanup callback upon flow completion to remove the
5720
query from `flowByQueryId` once the active collector count drops to zero.
5821

59-
### TODO 4: Late Subscribers Hang Indefinitely on Completed Stream
22+
---
23+
24+
### TODO 2: Simplistic Retry Logic / Lack of Exponential Backoff and Network State Integration
6025

6126
* **File:** DataConnectBidiConnectStream.kt
6227
* **Severity:** `HIGH`
6328

6429
#### Description
6530

66-
Late subscribers to a stream that has already completed (server-side) will hang indefinitely.
67-
This occurs because `incomingResponses` is a `SharedFlow` with `replay = 0`. If the stream
68-
completes, the `IncomingResponse.Completed` signal is emitted and lost for future subscribers.
69-
These subscribers will then wait in `transformWhile` for new emissions that will never come.
70-
71-
The current implementation of `onCompletion` does not prevent the hang for late subscribers.
72-
Because `streams.incomingResponses` is a `SharedFlow` with `replay = 0`, if the stream has already
73-
completed, the `IncomingResponse.Completed` signal is lost. A new subscriber will begin collecting
74-
from `incomingResponses` and wait indefinitely for emissions that will never arrive. The
75-
`onCompletion` block is only executed *after* the flow collection completes, so it cannot resolve a
76-
hang that occurs *during* the collection process.
77-
78-
Discovered by gemini code assist:
79-
https://github.com/firebase/firebase-android-sdk/pull/8158#discussion_r3240286966
31+
The retry logic in `connectionFlow` is extremely simplistic: it retries up to 2 times with a flat
32+
1-second delay between attempts. If a connection is lost due to a sustained network outage (longer
33+
than 2 seconds), the stream will fail permanently and will not attempt to recover even after the
34+
network comes back.
8035

8136
#### Recommendation
8237

83-
You should check `streams.completedResponse` before starting the flow collection to ensure the
84-
stream is still active.
38+
Replace the simplistic retry logic with:
39+
1. **Exponential Backoff**: Increase the delay between retries exponentially (with jitter) to avoid
40+
overwhelming the server and the client.
41+
2. **Network State Integration**: Integrate with the OS network state monitoring (e.g., Android's
42+
`ConnectivityManager`) to proactively trigger a reconnection attempt as soon as the device
43+
regains internet connectivity, rather than waiting for a backoff timer.
8544

86-
To address this, you should check the state of `streams.completedResponse` *before* or *at the
87-
start* of the flow collection (e.g., inside the `flow { ... }` builder) to verify if the stream is
88-
already finished, and emit the completion signal or throw the cached exception immediately if it is.

firebase-dataconnect/docs/README.md

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,13 @@ Before modifying any significant component, starting a new feature, or refactori
2020
When you need to make a significant architectural choice (e.g., choosing a library, changing data flow, defining a new component boundary):
2121
1. Create a new file in this directory named `adr-NNNN-short-description.md`, starting from `adr-0001.md`.
2222
2. Use the template in [adr-template.md](adr-template.md).
23-
3. Fill out the sections honestly and concisely.
23+
3. Fill out the sections honestly and concisely. Prioritize explaining the **WHY** behind the
24+
decisions, the benefits, and why the choices are valid and worthwhile. **AVOID** merely repeating
25+
the technical choices without any justifications (developers can just read the git commit
26+
history for what changed; ADRs exist to capture the reasoning). If the rationale, trade-offs,
27+
or benefits are not fully known or clear, **liberally ask the user for clarification** before
28+
writing. It is of paramount importance that these details are recorded accurately and are
29+
absolutely correct.
2430
4. Update the **Decision Registry** in this file (`docs/README.md`) to list your new ADR.
2531

2632
---
@@ -45,14 +51,29 @@ When you need to make a significant architectural choice (e.g., choosing a libra
4551
> (Or `../scripts/spotlessApply.zsh` if running from within the `docs/` directory).
4652
> A task is NOT considered complete until the files have been successfully formatted by this script.
4753
54+
### 3. Focus on the "Why" (Rationale and Benefits)
55+
56+
> [!IMPORTANT]
57+
> When writing ADRs, prioritize explaining the **rationale, trade-offs, and benefits** of the
58+
> decisions. Do not just repeat the technical implementation details—explain **WHY** they were
59+
> chosen and why the solutions are valid. ADRs are design documents meant to convey historical
60+
> context and reasoning, not code diff summaries (which can be read in the git commit history).
61+
>
62+
> **CRITICAL**: If the rationale, trade-offs, or benefits of a decision are not fully known or are
63+
> unclear to you, **you MUST liberally ask the user for clarification** before writing the document.
64+
> It is of paramount importance that these architectural details are recorded accurately and are
65+
> absolutely correct. Do not make assumptions or guess.
66+
4867
---
4968
5069
## Decision Registry
5170
52-
| ID | Title | Date | Status | Tags |
53-
|:-----------------------------------------------------------------------|:-----------------------------------------------------|:-----------------------------|:---------|:--------------------------------------------------|
54-
| [0001](adr-0001-rewire-bidi-connection-to-grpcbidiflow.md) | Rewire Bidirectional gRPC Connection to GrpcBidiFlow | Fri May 15 18:33:07 EDT 2026 | accepted | network, grpc, coroutines, reactive-streams |
55-
| [0002](adr-0002-use-replay-expiration-millis-0-in-while-subscribed.md) | Use replayExpirationMillis = 0 in WhileSubscribed | Sat May 16 22:34:19 EDT 2026 | accepted | network, grpc, coroutines, flow, reactive-streams |
56-
| [0003](adr-0003-sequence-number-filtering-for-stale-replayed-data.md) | Sequence Number Filtering for Stale Replayed Data | Sun May 17 00:02:50 EDT 2026 | accepted | network, grpc, coroutines, flow, multiplexing |
71+
| ID | Title | Date | Status | Tags |
72+
|:--------------------------------------------------------------------------------------------|:------------------------------------------------------------------------|:-----------------------------|:-----------|:-----------------------------------------------------------|
73+
| [0001](adr-0001-rewire-bidi-connection-to-grpcbidiflow.md) | Rewire Bidirectional gRPC Connection to GrpcBidiFlow | Fri May 15 18:33:07 EDT 2026 | accepted | network, grpc, coroutines, reactive-streams |
74+
| [0002](adr-0002-use-replay-expiration-millis-0-in-while-subscribed.md) | Use replayExpirationMillis = 0 in WhileSubscribed | Sat May 16 22:34:19 EDT 2026 | superseded | network, grpc, coroutines, flow, reactive-streams |
75+
| [0003](adr-0003-sequence-number-filtering-for-stale-replayed-data.md) | Sequence Number Filtering for Stale Replayed Data | Sun May 17 00:02:50 EDT 2026 | superseded | network, grpc, coroutines, flow, multiplexing |
76+
| [0004](adr-0004-coordinate-multiplexed-subscriptions-using-conflatedsignal-and-replay-0.md) | Coordinate Multiplexed Subscriptions using ConflatedSignal and Replay=0 | Tue May 19 15:58:34 EDT 2026 | accepted | network, grpc, coroutines, flow, multiplexing |
77+
| [0005](adr-0005-configure-flow-control-and-conflation-for-realtime-queries.md) | Configure Flow Control and Conflation for Realtime Queries | Tue May 19 17:28:33 EDT 2026 | accepted | network, grpc, coroutines, flow, backpressure, performance |
5778
5879
*(Add new records to this table in chronological order. Refer to the status lifecycle: `proposed` -> `accepted` -> `superseded`/`deprecated`)*

firebase-dataconnect/docs/adr-0002-use-replay-expiration-millis-0-in-while-subscribed.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,3 +78,25 @@ This design guarantees that the subscription request is sent exactly once, safel
7878
* **Negative/Risks:**
7979
* If a subscriber disconnects and immediately reconnects within less than a millisecond, they might not get the replay cache, but since they want a fresh reconnection anyway when the stream is closed, this is the desirable behavior.
8080

81+
---
82+
83+
## Amendment (May 19, 2026)
84+
85+
### Status: Superseded
86+
87+
The design described in this ADR has been **superseded** by the architecture documented in
88+
[ADR 0004](adr-0004-coordinate-multiplexed-subscriptions-using-conflatedsignal-and-replay-0.md).
89+
90+
**Key changes since this decision:**
91+
- The internal `sharedFlow` was changed from `replay = 1` to `replay = 0` (disabling the replay
92+
cache completely).
93+
- `SubscriptionStateManager` and `Subscriber` were removed.
94+
- A new thread-safe utility `ConflatedSignal` was introduced to coordinate `subscribe` and
95+
`resume` requests.
96+
- The connection state is now managed via `SubscriptionState` (`Disconnected`,
97+
`DisconnectedWithPendingSubscription`, `Connected`).
98+
99+
By switching to `replay = 0` and using `ConflatedSignal`, we completely eliminated the stale
100+
replay cache issues, rendering the complex `replayExpirationMillis = 0` cache clearance and the
101+
`SubscriptionStateManager` obsolete.
102+

firebase-dataconnect/docs/adr-0003-sequence-number-filtering-for-stale-replayed-data.md

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
id: 0003
44
title: Sequence Number Filtering for Stale Replayed Data
55
date: Sun May 17 00:02:50 EDT 2026
6-
status: accepted
6+
status: obsolete
77
deciders: [Jetski, dconeybe]
88
tags: [network, grpc, coroutines, flow, multiplexing]
99
-----------------------------------------------------
@@ -84,3 +84,27 @@ A potential client-side mitigation that builds upon the sequence number approach
8484
* **Negative/Risks:**
8585
* A late subscriber that starts collecting *exactly* in the small window between another subscriber's request and its response will still receive that in-flight response. This is mitigated by the fact that the data is still technically the "latest" data, but in unit tests asserting strict deterministic sequence of distinct datasets, this causes test failures.
8686

87+
---
88+
89+
## Amendment (May 19, 2026)
90+
91+
### Status: Obsolete (Implementation Retired)
92+
93+
This ADR is now **obsolete** and has been retired. The sequence number filtering mechanism was
94+
completely **removed** from `DataConnectBidiConnectStream.kt` as an implementation detail.
95+
96+
**Why it was removed:**
97+
We refactored the stream sharing policy from `replay = 1` to `replay = 0` (as documented in
98+
[ADR 0004](adr-0004-coordinate-multiplexed-subscriptions-using-conflatedsignal-and-replay-0.md)).
99+
Because the replay cache was eliminated entirely, the "stale replayed data" issue (which sequence
100+
numbers originally filtered) no longer exists.
101+
102+
**Outstanding Race Condition:**
103+
It is **IMPORTANT** to note that the **"in-flight request" race condition** described in this ADR
104+
has **NOT** been resolved by these changes. Because sequence numbers were removed and no
105+
alternative correlation mechanism was introduced, concurrently starting subscribers sharing the
106+
same query can still consume in-flight responses intended for other subscribers.
107+
108+
This remains a known, outstanding limitation that will likely require server-side protocol
109+
cooperation (e.g., transaction/correlation IDs echoed in `StreamResponse`) to fully resolve.
110+

0 commit comments

Comments
 (0)