From e3a2b149c6a049248987b6559de78b8f49c97546 Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Thu, 7 May 2026 13:37:32 -0300 Subject: [PATCH 1/6] docs(realtime): add v3 API design proposal and backend questions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Greenfield Swift API redesign for the Realtime module, targeting Swift 6.2+. Captures the locked design after grill-through review: - realtime-v3.md — full API specification: explicit lifecycle, typed throws throughout, AsyncSequence as the canonical surface, register-then-subscribe pattern for postgres_changes (reflecting the Phoenix wire constraint that filters must be in the join payload), shared-by-topic channel identity, pluggable transport + clock for deterministic testing, 45+ locked decisions with rationale, and a V2 → V3 migration table. - realtime-v3-questions-for-backend.md — assumption-vs-question pairs across 16 topics (connection, channel lifecycle, broadcast WS + HTTP, replay, presence, postgres changes, auth, errors, rate limits, ordering, app lifecycle, protocol limits) for the Realtime backend team to validate before implementation begins. Tracked in Linear: https://linear.app/supabase/project/realtime-v3-idiomatic-swift-api-rfc-044c5935314f Co-Authored-By: Claude Opus 4.7 --- .../realtime-v3-questions-for-backend.md | 478 ++++++++ docs/design/realtime-v3.md | 1020 +++++++++++++++++ 2 files changed, 1498 insertions(+) create mode 100644 docs/design/realtime-v3-questions-for-backend.md create mode 100644 docs/design/realtime-v3.md diff --git a/docs/design/realtime-v3-questions-for-backend.md b/docs/design/realtime-v3-questions-for-backend.md new file mode 100644 index 000000000..6737f71b0 --- /dev/null +++ b/docs/design/realtime-v3-questions-for-backend.md @@ -0,0 +1,478 @@ +# Realtime v3 — Questions for the Realtime Backend Team + +Each section pairs **an assumption baked into the v3 Swift design** with +**the question(s) that need to be validated**. If an assumption is wrong, the +linked §§ in `realtime-v3.md` need revisiting. + +--- + +## 1. Connection / Socket + +**Assumption A1.** WebSocket auth is a single `apikey` query param / header. +No additional handshake. (§1.1, §6.1) + +- Is `apikey` the only required auth on connect, or should we also send + `Authorization: Bearer ` and/or `vsn` as a query param? +- Are there any required subprotocols (`Sec-WebSocket-Protocol`) we should + be setting? + +**Assumption A2.** `vsn=2.0.0` is the preferred wire version and is stable. +(§1.2, §11, Config.protocolVersion) + +- Is v2 (binary broadcast frames + array-encoded messages) the recommended + default for new clients? +- Any plans for v3? If so, what's the rough shape, and should we design an + escape hatch for it? +- Are there server deployments still pinned to v1 where v2 would break? + +**Assumption A3.** Default heartbeat interval 25s is safe. (§1.2, §6.4) + +- What's the server-side heartbeat timeout (after how many missed + heartbeats does the server close the socket)? +- Are there Cloudflare/LB-level idle timeouts that could close an + otherwise-healthy socket? If so, what's the max safe heartbeat interval? + +**Assumption A4.** Heartbeat RTT is exposed as `phx_reply` latency and is +the canonical "is the connection healthy" signal. (§6.4, `ConnectionStatus.latency`) + +- Is `phx_reply` the right signal, or does the server also push periodic + presence/state messages we could use? +- Is there any server-initiated "ping" the client is expected to respond to? + +--- + +## 2. Channel Join / Leave + +**Assumption B1.** A client may have at most one live subscription per +topic per socket. A second `phx_join` on the same topic while one is live +is rejected or ignored. (§2.1, §2.3) + +- Confirmed? If a second `phx_join` is sent for an already-joined topic, + what does the server do — error, overwrite, or dedupe silently? +- Does the server enforce a max number of topics per socket? What's the limit? + +**Assumption B2.** `phx_leave` is always ACKed by the server before the +server-side state is torn down. (§2.3 "await-to-ack") + +- Is `phx_leave` always ACKed? Under what conditions can it not be + (e.g., server shutdown mid-leave)? +- After ACK, is it safe to assume no further events for that topic will + arrive on this socket? +- If the socket drops mid-leave, what's the server's cleanup behavior? + (We need to know whether a reconnecting client should re-send leave + or just skip it.) + +**Assumption B3.** A `phx_join` immediately after `phx_leave` on the same +topic is valid and produces a fresh subscription. (§2.3 "pipelined re-acquire") + +- If the client sends `phx_leave` then `phx_join` back-to-back (before + leave is ACKed), does the server queue them in order, reject the join, + or race them? +- Is there a minimum cooldown between leave and rejoin on the same topic? + +**Assumption B4.** Dropping a client socket without leaving joined +channels is safe — the server GCs subscriptions within some finite window. +(§2.1 "leaked-channel warning") + +- What's the server-side cleanup delay for abandoned subscriptions? +- Are there billing/quota implications for abandoning vs leaving? + (We want to know how loud our leak warning should be.) + +--- + +## 3. Channel Join Config + +**Assumption C1.** The entire `config` object is frozen at `phx_join` time. +No way to mutate `broadcast.ack`, `self`, `replay`, `presence.key`, or +`postgres_changes` mid-subscription without leaving and rejoining. +(§2.2 "options are locked at creation") + +- Confirmed? Are any of these fields mutable mid-flight? +- If a caller needs to change `postgres_changes` filters, is the correct + pattern always leave + rejoin, or is there a `phx_update`-style event? + +**Assumption C2.** `private: true` channels go through RLS at join time +and reject if the JWT is invalid or lacks permission. (§2.2) + +- What's the exact error the server returns on unauthorized private-channel + join? (`reason` string format, so we can map to `.authenticationFailed` + vs `.channelJoinRejected`.) +- Does `private: true` have implications for broadcast and postgres_changes + behavior beyond the join check? + +--- + +## 4. Broadcast — WebSocket + +**Assumption D1.** `broadcast.ack: true` means every broadcast send gets a +`phx_reply` from the server. `ack: false` means none. (§3.2, BroadcastOptions.acknowledge) + +- Confirmed? What's the exact correlation mechanism — by `ref`? +- What's a reasonable default `broadcastAckTimeout`? (We picked 5s.) + +**Assumption D2.** `self: true` echoes broadcasts back to the sender. +`self: false` does not. This is channel-wide, not per-message. (§3.2, Decision 23) + +- Confirmed channel-wide only, no per-message override? +- Ordering guarantee: if I broadcast 3 messages with `self: true`, are the + echoes guaranteed to arrive in send order? + +**Assumption D3.** v2 protocol sends broadcast payloads as binary frames +(opcode 0x02), type byte `0x03` (client→server) / `0x04` (server→client). +Non-broadcast messages are text frames with JSON arrays. (§3.1, memory: protocol 2.0.0) + +- Confirmed? What's the exact binary framing — is the payload length + length-prefixed, or end-of-frame delimited? +- Is there a max binary frame size the server enforces? + +**Assumption D4.** Arbitrary `Data` can be broadcast as a binary payload +without JSON encoding. (§3.2, Decision 25) + +- Does the server inspect broadcast payloads, or is any byte string valid? +- Any size limits specific to binary vs JSON broadcasts? + +**Assumption D5.** Broadcast delivery is best-effort — no retry, no queue, +no ordering guarantees across topics. Within a single topic + sender, +order is preserved. (§3.1 "streams pause silently during reconnection") + +- Within-topic, within-sender order: guaranteed? (We document it as such.) +- Any cross-topic ordering guarantees we should not assume away? +- Are there rate limits? If so, what does the server return when exceeded? + +--- + +## 5. Broadcast — HTTP Endpoint + +**Assumption E1.** `POST /realtime/v1/api/broadcast` is the correct endpoint +for one-shot broadcasts without opening a WS. (§3.3 httpBroadcast) + +- Is that the canonical path? Is there a versioned alternative? +- Request body shape — batch-only (`{ messages: [...] }`) or single also + accepted? +- Response shape on success (200? 204? body?) +- Error shape — structured JSON with `code`/`message`? + +**Assumption E2.** HTTP broadcast uses the same `apikey` and JWT auth as +the WebSocket. (§3.3 "Auth uses the same `APIKeySource`") + +- Confirmed? Header names: `apikey`, `Authorization: Bearer `? +- Does HTTP broadcast honor RLS for private topics? If the JWT lacks + permission, what's the error? + +**Assumption E3.** HTTP broadcast emits the message to all WS subscribers +on that topic exactly as if a WS client had sent it. (§3.3) + +- Confirmed? Does `self: true` (if the sender happens to also have a WS + subscription to the topic) apply to HTTP-originated broadcasts? + +**Assumption E4.** HTTP broadcast has its own rate limits distinct from WS. + +- What are they? How are they communicated — `429` with `Retry-After` + header? Any per-topic limits vs per-project? + +--- + +## 6. Broadcast Replay + +**Assumption F1.** `replay.since: unix_ms` + optional `limit` is set in the +join config, and the server replays matching messages at join time before +live events start flowing. (§2.2 BroadcastOptions.replay) + +- Confirmed join-time-only? Can replay be re-triggered mid-subscription? +- What's the server-side retention window? If `since` is older than + retention, does the server return the partial window + newest first, + or return an error? +- Default `limit` if omitted? Max `limit` the server enforces? +- Does replay interact with `self: false`? (E.g., will it replay my own + messages even if self-echo is off?) +- Does replay cover private channels the same way as public? +- Ordering: are replayed messages guaranteed to arrive before any live + events after join? + +--- + +## 7. Presence + +**Assumption G1.** Phoenix presence allows multiple `track` calls from the +same socket under the same presence key, each registering a distinct meta +entry. (§4 multi-track support, Decision 16) + +- Confirmed? Or does `track` overwrite any prior meta for the same key? +- If multi-meta: is there a server-enforced max metas per key? + +**Assumption G2.** `presence.key` in join config sets this client's +presence key. If nil, the server generates one (random/per-connection). +(§4 "Presence key source", Decision 17, 45) + +- Confirmed the server generates if nil? What's the format + (UUID, random string)? +- Is the generated key stable across reconnects of the same socket, or + fresh every connect? + +**Assumption G3.** There's an explicit "untrack" mechanism (the +`presence.untrack` event, or similar). Dropping all metas requires an +explicit untrack — merely going silent does not remove presence. +(§4 PresenceHandle.cancel) + +- Confirmed? What's the wire-level untrack event? +- Is untrack ACKed? (We document await-to-ack.) +- If I have 3 tracks and want to untrack one, how does the server know + which meta to remove — meta content match, or a per-track ref? + +**Assumption G4.** On `phx_leave`, the server removes all presence metas +for that socket+topic without requiring explicit untracks. (§4 +"when `channel.leave()` is called, all outstanding tracks are implicitly +torn down server-side") + +- Confirmed? Or must we send explicit untracks before leave? + +**Assumption G5.** Presence is **not** auto-restored by the server on +rejoin. The client must re-send `track` for each live state after the +rejoin `phx_reply`. (§4 "auto re-track on reconnect", §9.2, Decision 18) + +- Confirmed the server does NOT remember presence across reconnects? +- If the server does remember: we need to either skip re-tracking + (optimal) or detect and reconcile (harder). + +**Assumption G6.** `presence_state` (snapshot) arrives once per join; +`presence_diff` arrives for every subsequent change. (§4 `observe` vs `diffs`) + +- Confirmed? Does the snapshot always arrive even when joining an empty + presence set? +- What's the payload shape — `{ [key]: { metas: [...] } }`? + +--- + +## 8. Postgres Changes + +**Assumption H1.** One `postgres_changes` entry in join config = one +server-side filter = one subscription. Multiple entries can be combined +OR-style in a single join. (§5.2, §5.3 "independent subscription") + +- Confirmed multiple entries per join are allowed? +- If two entries overlap (e.g., both match an INSERT on `messages`), does + the server emit duplicate events, deduplicate, or something else? + +**Assumption H2.** Filter wire format is `column=op.value`. Exactly one +clause per entry. No `AND`/`OR`/parenthesization. (§5.2 "single optional +clause", Decision 12) + +- Confirmed single-clause-only? Even if multiple `filter:` fields were + supplied, would only one be honored? +- Are there plans to support `AND` composition? (So we know whether to + leave room in the API.) + +**Assumption H3.** Supported operators are `eq`, `neq`, `gt`, `gte`, `lt`, +`lte`, `in`. (§5.2 Filter factories) + +- Confirmed the full list? Is `is.null` / `is.not.null` supported? +- Is `like` / `ilike` / `match` supported? +- For `in`: what's the max list length? +- Value encoding: how should UUIDs, ISO dates, numbers, booleans, NULLs + be serialized in `column=op.value`? Any escaping for commas in `in`? + +**Assumption H4.** Event filtering on `INSERT`/`UPDATE`/`DELETE`/`*` is +exact — `*` subscribes to all three; anything else subscribes to only +that one. (§5.3 PostgresChangeEvent) + +- Confirmed? Are there other event types (TRUNCATE, etc.) we should + handle? + +**Assumption H5.** For `UPDATE`, the server sends both `old_record` and +`record`. For `DELETE`, only `old_record`. For `INSERT`, only `record`. +(§5.3 `InsertAction`/`UpdateAction`/`DeleteAction`) + +- Confirmed? Is `old_record` always populated on UPDATE, or only when + `REPLICA IDENTITY FULL` is set on the table? +- If `REPLICA IDENTITY` is not `FULL`, what's returned for DELETE? (Just + PKs, or entire row?) +- Schema column order and types match what PostgREST returns for selects? + +**Assumption H6.** If the underlying publication doesn't include a table +or column, events silently don't fire — no error at join time. (§5.3) + +- Confirmed? Or does the server reject the join with an error if the + table/column doesn't exist in `supabase_realtime` publication? + +**Assumption H7.** Postgres change subscriptions are automatically +re-registered on rejoin — the client just re-sends the same join config. +(§9.2 "postgres change subscriptions are restored") + +- Confirmed? Any gaps during rejoin that could lose events? If so, is + there a replay/cursor mechanism like broadcast replay? + +--- + +## 9. Auth / Token Rotation + +**Assumption I1.** The Phoenix event name for pushing a new token is +`access_token` with `{ access_token: "..." }`. Server ACKs with `phx_reply`. +(§6.3 updateToken) + +- Confirmed event name and payload shape? +- Is the response always a `phx_reply` on the top-level socket (not + per-channel)? Or per-channel? +- What does the server do if the new token has different claims + (different `sub`, expired `exp`)? + +**Assumption I2.** On `token_expired`, the server sends a message the +client can distinguish from other errors, and the operation that triggered +it fails with a retryable error. (§6.3 "Reactive path") + +- What's the exact wire signal — a `phx_error` with `reason: "token_expired"`? + On which channel / on the socket itself? +- Does `token_expired` close the socket, close the individual channel, or + just reject the in-flight push? +- After pushing a refreshed token, is the retry on the same original + request, or do we need to resubscribe? + +**Assumption I3.** JWT `exp` is not parsed or enforced client-side — the +SDK reacts only to server-sent `token_expired`. (Decision 9 "No JWT +parsing in the SDK") + +- Is this safe, or is there meaningful latency between local expiry and + server detection that would justify proactive rotation? + +--- + +## 10. Error Taxonomy + +**Assumption J1.** All server-sent errors arrive as `phx_error` / +`phx_reply {status: "error"}` with a `reason: String` field. No structured +error codes. (§7 RealtimeError) + +- Is there a stable set of `reason` strings we can pattern-match to map + into our error cases? Example: `"unauthorized"`, `"rate_limited"`, + `"token_expired"`, `"server_error"`, etc. +- If the set is unstable: can we get a structured `code` field added? + +**Assumption J2.** Server close codes on unexpected socket close are +meaningful and distinct for auth vs transient vs policy violations. + +- What close codes does the server use, and for which scenarios? + (E.g., 4001 = auth, 4003 = rate limit, 4008 = policy, etc.) +- Any close code that means "do not reconnect" vs "reconnect with backoff"? + +--- + +## 11. Rate Limits and Quotas + +**Assumption K1.** Rate limits exist but are not surfaced in the v3 API +except via `.rateLimited(retryAfter:)`. (§7) + +- What are the default server-side limits — messages/sec per channel, + connections per project, topics per socket, presence entries per + channel, presence state size? +- When exceeded via WS: what's the wire signal? A `phx_error` with + `reason: "rate_limited"` + a `retry_after` field? Connection close? +- When exceeded via HTTP: `429` with `Retry-After` header? + +**Assumption K2.** There's no per-client connection cooldown — clients +can reconnect immediately after any close. (§9.1 ReconnectionPolicy) + +- Is there a server-side "too many reconnects" throttle? If so, what + delays does it enforce and how are they communicated? + +--- + +## 12. Ordering and Delivery + +**Assumption L1.** Within a single topic, for a single client, events +arrive in the order the server processed them. Across topics, no ordering +guarantee. (Implicit throughout) + +- Confirmed per-topic-per-client ordering? +- For postgres_changes specifically: does the server guarantee WAL order + within a table, or can concurrent transactions reorder? + +**Assumption L2.** Broadcasts and postgres_changes on the same topic +interleave arbitrarily. (§3, §5) + +- Confirmed? No implicit ordering between them? + +**Assumption L3.** Presence `diff` events and broadcast events on the +same topic interleave arbitrarily. + +- Confirmed? + +--- + +## 13. Reconnection / Resilience + +**Assumption M1.** After a client reconnect, the server has no memory of +prior subscriptions — the client must re-send all `phx_join`s. (§9.2) + +- Confirmed, no session resumption? +- If session resumption is coming in a future version, is there a + protocol hint we should leave room for? + +**Assumption M2.** The server does not emit a "you missed events while +disconnected" signal. Gaps are silent and the client cannot detect them +without broadcast replay. (§3.1 "Gaps are inherent") + +- Confirmed no gap-detection mechanism? + +--- + +## 14. App Lifecycle + +**Assumption N1.** The WebSocket can survive short iOS/macOS +background-foreground transitions without the server terminating the +connection. (§9.3 handleAppLifecycle) + +- What's the server-side idle/heartbeat timeout that determines how long + a backgrounded app can stay connected before the server closes? +- Is there a way to "pause" a connection server-side without closing it? + (Probably not, but worth asking.) + +--- + +## 15. Protocol Limits (Hard Numbers We Want to Document) + +Please confirm or correct: + +| Limit | Assumed | Source | +| ----- | ------- | ------ | +| Max topics per WebSocket | ? | | +| Max concurrent WebSockets per project | ? | | +| Max broadcast payload size (JSON) | ? | | +| Max broadcast payload size (binary) | ? | | +| Max presence metas per key | ? | | +| Max presence state bytes per channel | ? | | +| Max `postgres_changes` entries per join | ? | | +| Max `in` list length in filter | ? | | +| Broadcast replay retention window | ? | | +| Broadcast replay max limit | ? | | +| Default heartbeat timeout (server side) | ? | | +| Rate limit: broadcasts/sec per channel | ? | | +| Rate limit: joins/sec per socket | ? | | + +--- + +## 16. Open Design Questions that Depend on Backend + +These are v3 API decisions we deliberately deferred — the answer from +backend may change our preference. + +1. **Unbounded broadcast buffers.** We picked unbounded per-consumer + buffers (§3.1, Decision 7). If the server drops misbehaving subscribers + itself under backpressure, we could rely on that rather than asking + clients to opt into a drop policy later. +2. **Automatic retry on `token_expired`.** We retry once (§6.3, Decision 10). + If the server already handles token rotation idempotently (i.e., the same + request can be replayed safely), we could retry more aggressively or + never. +3. **HTTP broadcast batching.** We expose a batch form (§3.3). If the + server's batch endpoint has materially different rate limits or failure + semantics than the single form, we should document that. +4. **Presence key ownership.** We pushed presence key to channel-level + config (§4, Decision 17). If the backend plans to support per-track + presence keys natively, we'd revisit. + +--- + +## How to respond + +Ideal format: for each question, either "yes, confirmed", "no, here's the +actual behavior", or "undefined — please don't rely on it". For the +numeric limits table, fill in concrete numbers or "no hard limit". diff --git a/docs/design/realtime-v3.md b/docs/design/realtime-v3.md new file mode 100644 index 000000000..c4b3d55a3 --- /dev/null +++ b/docs/design/realtime-v3.md @@ -0,0 +1,1020 @@ +# Realtime v3 — Idiomatic Swift API Proposal + +> Status: Design locked after grill-through. Greenfield design — no consideration +> given to V2 compatibility or other Supabase SDKs. Targets Swift 6.2+. +> Breaking changes accepted. + +## Design Principles + +1. **Explicit lifecycle.** Resources are acquired and released explicitly. No + auto-cleanup on `deinit`, no magic based on reference counting. If you + joined a channel, you call `leave()` when you're done. +2. **Type‑safety through the language.** Channels, events, presences, and + Postgres tables are generic. The compiler rejects the wrong payload type. +3. **`AsyncSequence` is the canonical surface.** Closures appear only where + they unlock a behavior a sequence cannot express. +4. **Observation‑native.** Clean integration with `@Observable` and SwiftUI. +5. **Typed throws throughout.** `throws(RealtimeError)` at every boundary. +6. **Resilient by default.** Automatic reconnection with pluggable policies; + transparent re‑joining of channels and presences; token refresh. +7. **Explicit, injectable transport and clock.** Deterministic unit tests + without real sockets or real wall‑clock time. +8. **No singletons.** Multiple `Realtime` instances coexist with zero shared + state. + +--- + +## 30‑Second Tour + +```swift +import Realtime + +let realtime = Realtime( + url: URL(string: "wss://project.supabase.co/realtime/v1")!, + apiKey: .literal("anon-key") +) + +let channel = realtime.channel("room:42") + +// Broadcast — lazy join on first iteration. +Task { + for await msg in channel.broadcasts(of: ChatMessage.self, event: "chat") { + render(msg) + } +} + +// Postgres changes — register before subscribe. +let inserts = channel.inserts(into: Message.self, where: .eq(\.roomId, 42)) +let active = try await channel.subscribe() +Task { + for try await row in active.events(for: inserts) { + append(row) + } +} + +// Send on the same handle. +try await channel.broadcast(ChatMessage(text: "hi"), as: "chat") + +// Explicit release when done. +try await active.leave() +``` + +One-shot send without joining: + +```swift +try await realtime.httpBroadcast( + topic: "room:42", event: "chat", + payload: ChatMessage(text: "hi") +) +``` + +That's the mental model: + +- **Broadcast** and **presence** are lazy — iterate them and the channel joins. +- **Postgres changes** are register-then-subscribe — the wire forces it. +- A single `phx_join` carries everything pending at the moment of join. +- Tokens are reusable across `leave()` + resubscribe cycles. + +Everything below is elaboration. + +--- + +## 1. Client Construction + +```swift +public final actor Realtime: Sendable { + public init( + url: URL, + apiKey: APIKeySource, + configuration: Configuration = .default, + transport: any RealtimeTransport = URLSessionTransport() + ) +} +``` + +### 1.1 `APIKeySource` separates static keys from rotating auth + +```swift +public enum APIKeySource: Sendable { + case literal(String) + /// Called on connect and when the server rejects with `token_expired`. + /// See §6.3 for mid-session rotation. + case dynamic(@Sendable () async throws -> String) +} +``` + +### 1.2 Configuration + +```swift +public struct Configuration: Sendable { + public var heartbeat: Duration = .seconds(25) + public var joinTimeout: Duration = .seconds(10) + public var leaveTimeout: Duration = .seconds(10) + public var broadcastAckTimeout: Duration = .seconds(5) + public var reconnection: ReconnectionPolicy = .exponentialBackoff( + initial: .seconds(1), max: .seconds(30), jitter: 0.2 + ) + public var disconnectOnEmptyChannelsAfter: Duration = .seconds(50) + public var handleAppLifecycle: Bool = .automaticDefault + public var protocolVersion: RealtimeProtocolVersion = .v2 + public var clock: any Clock = ContinuousClock() + public var headers: HTTPFields = [:] + public var logger: (any RealtimeLogger)? = nil + public var decoder: JSONDecoder = .iso8601 + public var encoder: JSONEncoder = .iso8601 + + public static let `default` = Configuration() +} +``` + +`disconnectOnEmptyChannelsAfter` is an idle‑socket timeout: when the last +live channel has left, the socket stays open for this duration in case a new +channel joins, avoiding reconnect churn. `.zero` for immediate close. + +--- + +## 2. Channels + +### 2.1 Identity and lifecycle + +```swift +public extension Realtime { + /// Returns the `Channel` for `topic`. Shared by topic — two callers asking + /// for the same topic receive the same underlying actor. + /// + /// The channel joins lazily on the first subscribe. The caller must call + /// `leave()` to unsubscribe; `deinit` does NOT unsubscribe. + func channel( + _ topic: String, + configure: (inout ChannelOptions) -> Void = { _ in } + ) -> Channel +} + +public final actor Channel: Sendable { + public var topic: String { get } + public var options: ChannelOptions { get } + public var state: AsyncStream { get } + + /// Explicit join. Returns an `ActiveChannel` for postgres_changes consumption. + /// Idempotent: calling while joined returns the existing `ActiveChannel`; + /// concurrent calls before join await the same in-flight join. + /// + /// Postgres-change registrations made before this call are baked into the + /// `phx_join` payload (see §5). Broadcast and presence iteration also auto- + /// join on first use; whichever path triggers the join first captures the + /// pending postgres registrations. + public func subscribe() async throws(RealtimeError) -> ActiveChannel + + /// Equivalent to `subscribe()` but discards the active handle. Useful for + /// broadcast-only / presence-only channels. + public func join() async throws(RealtimeError) +} + +public struct ActiveChannel: Sendable { + public var channel: Channel { get } + + /// Iterate events for a previously-registered postgres change token. + /// Each call returns an independent fan-out stream; multiple iterators + /// observe every event independently. (§5) + public func events(for token: ChangeRegistration) + -> AsyncThrowingStream + + /// Explicit unsubscribe. Awaits server ACK. See §2.3. + /// After leave, this `ActiveChannel` is invalid; tokens remain reusable. + public func leave() async throws(RealtimeError) +} +``` + +Key invariants: + +- **Topic identity.** `realtime.channel("x")` always returns the same actor. + One server-side subscription per topic per `Realtime` instance. +- **No auto-unsubscribe.** Dropping a `Channel` or `ActiveChannel` reference + does nothing. Explicit `leave()` is the only way. +- **Single join, multiple paths.** `channel.subscribe()`, broadcast iteration, + and presence iteration all converge on a single `phx_join`. The first to + fire commits the join payload — including any postgres_changes tokens + registered up to that moment. +- **Postgres tokens register before join.** `channel.changes(...)`, + `channel.inserts(...)`, etc. mutate channel state and return tokens. Calling + these *after* the channel has joined throws `.cannotRegisterAfterJoin`. After + `leave()`, registration is allowed again — tokens are reusable across + subscribe cycles. (§5) +- **`channel.broadcast(...)` throws** `.channelNotJoined` if the channel hasn't + joined — for one-shot sends without joining, use `realtime.httpBroadcast`. +- **Leaked-channel warning.** When `Realtime` deinits with channels that + have been joined but never left, an `IssueReporting` warning fires in + debug builds. Release builds silently rely on server-side timeouts. + +### 2.2 Channel options are locked at creation + +```swift +public struct ChannelOptions: Sendable { + public var isPrivate: Bool = false + public var broadcast: BroadcastOptions = .init() + public var presenceKey: String? = nil +} + +public struct BroadcastOptions: Sendable { + public var acknowledge: Bool = false + public var receiveOwnBroadcasts: Bool = false + public var replay: ReplayOption? = nil +} + +public struct ReplayOption: Sendable { + public var since: Date + public var limit: Int? +} +``` + +Options are applied on the first `channel(topic)` call. **A later call with a +different `configure` closure is ignored** — the first call wins. An +`IssueReporting` warning fires in debug. The returned `Channel.options` +reflects the effective options. + +### 2.3 `leave()` semantics (shared-handle model) + +- `leave()` is **global**: it tears down the subscription for every holder of + the same topic. Other holders' active streams terminate by throwing + `RealtimeError.channelClosed(.userRequested)`. +- `leave()` is **await-to-ack**: it returns only after the server ACKs + `phx_leave`. On transport failure or timeout, it throws. +- A **pipelined re-acquire** is safe: if `realtime.channel("x")` is called + while a leave for `"x"` is in flight, the caller gets a fresh `Channel` + whose join is queued behind the pending leave. Same-topic churn is + transparent. + +> **Topic ownership convention.** Because `leave()` is global, coincidental +> sharing of the same topic by unrelated features can tear down each +> other's streams. Topics should be namespaced by feature +> (`"chat:room:42"`, not `"room:42"`), or routed through a single owner. +> Document loudly in the user guide. + +### 2.4 Channel state + +```swift +public enum ChannelState: Sendable, Equatable { + case unsubscribed + case joining + case joined + case leaving + case closed(CloseReason) +} + +public enum CloseReason: Sendable, Equatable { + case userRequested // someone called leave() + case serverClosed(code: Int, message: String?) + case timeout + case unauthorized + case policyViolation(String) + case transportFailure // reconnection policy gave up +} +``` + +--- + +## 3. Broadcast + +### 3.1 Receiving + +```swift +public extension Channel { + /// Typed event stream — decodes each message's payload to `T`. + /// Fan-out: each call returns an independent subscription; multiple + /// iterators observe every message independently. + func broadcasts( + of type: T.Type, + event: String + ) -> AsyncThrowingStream + + /// Untyped stream — raw `JSONValue` payloads across all events. + func broadcasts() -> AsyncStream +} + +public struct BroadcastMessage: Sendable { + public let event: String + public let payload: JSONValue + public let receivedAt: Date +} +``` + +Streams pause silently during reconnection and resume on rejoin. Gaps are +inherent in fire-and-forget pub/sub and not surfaced — callers who care +correlate against `channel.state`. + +Backpressure: each subscription has an **unbounded** buffer. A slow consumer +will accumulate pending messages and eventually OOM under sustained lag. A +`SlowConsumerPolicy` knob may be added later without breaking source. + +### 3.2 Sending + +```swift +public extension Channel { + /// Sends a broadcast. Behavior depends on `ChannelOptions.broadcast.acknowledge`: + /// - `false` (default): fire-and-forget; returns after the frame is queued. + /// - `true`: awaits server ack; throws on timeout (`broadcastAckTimeout`). + /// + /// Throws `.channelNotJoined` if the channel has not yet joined (use + /// `channel.join()` or subscribe first — or use `realtime.httpBroadcast` + /// for one-shot sends). + /// Throws `.disconnected` if the socket is down — no queuing. + func broadcast( + _ payload: T, + as event: String + ) async throws(RealtimeError) + + /// `Data` bypasses encoding and ships as a binary frame (Phoenix v2). + func broadcast(_ data: Data, as event: String) async throws(RealtimeError) +} +``` + +### 3.3 HTTP one-shot broadcast + +For senders that don't need a subscription, `realtime.httpBroadcast` POSTs +to the Realtime REST endpoint (`POST /realtime/v1/api/broadcast`). It does +not open the WebSocket and does not create a `Channel`. + +```swift +public extension Realtime { + /// Single-message shorthand. + func httpBroadcast( + topic: String, event: String, payload: T, + isPrivate: Bool = false + ) async throws(RealtimeError) + + /// Batch form. + func httpBroadcast(_ messages: [HttpBroadcastMessage]) async throws(RealtimeError) +} + +public struct HttpBroadcastMessage: Sendable { + public let topic: String + public let event: String + public let payload: any Encodable & Sendable + public let isPrivate: Bool +} +``` + +Auth uses the same `APIKeySource` as the WebSocket. Errors use the shared +taxonomy (`.authenticationFailed`, `.rateLimited`, `.serverError`). + +--- + +## 4. Presence + +```swift +public extension Channel { + var presence: Presence { get } +} + +public struct Presence: Sendable { + /// Begin tracking a state for this client. Multiple concurrent tracks are + /// supported — each registers a distinct meta under the channel's presence + /// key (Phoenix multi-meta semantics). + /// + /// The handle must be explicitly `cancel()`ed to untrack. Dropping the + /// handle without cancelling does NOT untrack — but when `channel.leave()` + /// is called, all outstanding tracks are implicitly torn down server-side. + /// + /// Debug warning fires if a handle is deinited without `cancel()` while + /// the channel is still joined. + public func track( + _ state: T + ) async throws(RealtimeError) -> PresenceHandle + + /// Snapshot + diff stream of all presences, keyed by presence key. + public func observe( + _ type: T.Type + ) -> AsyncStream> + + /// Incremental diffs only. + public func diffs( + _ type: T.Type + ) -> AsyncStream> +} + +public struct PresenceState: Sendable { + public let active: [PresenceKey: [T]] + public let lastDiff: PresenceDiff? +} + +public struct PresenceDiff: Sendable { + public let joined: [(PresenceKey, T)] + public let left: [(PresenceKey, T)] +} + +public final class PresenceHandle: Sendable { + /// Idempotent; awaits server ACK of the untrack. + public func cancel() async throws(RealtimeError) +} +``` + +- **Presence key source.** Set via `ChannelOptions.presenceKey` at channel + creation. If `nil`, the server generates a random key per connection — + Phoenix default behavior. +- **Auto re-track on reconnect.** The SDK remembers the last state passed + to each live `track()` and re-sends it on rejoin. Presence state is + restored transparently across transport outages. + +--- + +## 5. Postgres Changes + +### 5.1 Declare your table + +```swift +@RealtimeTable(schema: "public", table: "messages") +struct Message: Codable, Sendable, Identifiable { + var id: UUID + var roomId: UUID + var text: String + var createdAt: Date +} +``` + +`@RealtimeTable` synthesizes: + +- Conformance to `RealtimeTable` +- `static let schema: String`, `static let tableName: String` +- A `columnName(for: KeyPath) -> String` lookup, honoring + `CodingKeys` if the type customizes them + +Types the caller doesn't own can conform manually: + +```swift +extension ExternalType: RealtimeTable { + public static let schema = "public" + public static let tableName = "widgets" + public static func columnName(for kp: KeyPath) -> String { ... } +} +``` + +### 5.2 Typed filter — single optional clause + +Phoenix Realtime supports exactly one `column=op.value` per postgres_changes +subscription. The SDK reflects this constraint: a single optional `Filter` +per subscription. + +```swift +public struct Filter: Sendable { + public static func eq( + _ column: KeyPath, _ value: V + ) -> Filter + public static func neq(…) -> Filter + public static func gt(…) -> Filter + public static func gte(…) -> Filter + public static func lt(…) -> Filter + public static func lte(…) -> Filter + public static func `in`(_ column: KeyPath, _ values: [V]) -> Filter +} +``` + +Reads like an enum at call site; implemented as a struct with static +factories so `KeyPath` + `V` type binding is preserved. Passing the +wrong value type for a column (`.eq(\.roomId, 42)` when `roomId: UUID`) fails +at compile time. + +### 5.3 Register-then-subscribe + +Phoenix requires postgres_changes filters in the `phx_join` payload — they +cannot be added after join. The API reflects this: registration mutates +channel state and returns a token; `subscribe()` triggers the join with all +pending tokens; consumption happens through the returned `ActiveChannel`. + +```swift +public extension Channel { + // Variant-typed factories — token preserves event variant in its type. + func changes( + to type: T.Type, where filter: Filter? = nil + ) -> ChangeRegistration + + func inserts( + into type: T.Type, where filter: Filter? = nil + ) -> ChangeRegistration + + func updates( + of type: T.Type, where filter: Filter? = nil + ) -> ChangeRegistration + + func deletes( + from type: T.Type, where filter: Filter? = nil + ) -> ChangeRegistration +} + +public struct ChangeRegistration: Sendable { + // Opaque. Holds enough state for the channel to compose the join payload + // and route incoming events to consumers. +} + +public enum AnyEvent: ChangeEventVariant { public typealias Element = PostgresChange } +public enum Insert: ChangeEventVariant { public typealias Element = T } +public enum Update: ChangeEventVariant { public typealias Element = (old: T, new: T) } +public enum Delete: ChangeEventVariant { public typealias Element = T } + +public enum PostgresChange: Sendable { + case insert(T) + case update(old: T, new: T) + case delete(T) +} +``` + +Usage: + +```swift +// 1. Register tokens (no join yet). +let inserts = channel.inserts(into: Message.self, where: .eq(\.roomId, id)) +let allMsgs = channel.changes(to: Message.self, where: .eq(\.roomId, id)) +let roomGone = channel.deletes(from: Room.self, where: .eq(\.id, id)) + +// 2. Trigger join. All three tokens land in the same phx_join payload. +let active = try await channel.subscribe() + +// 3. Consume — element type follows the token's variant. +await withThrowingDiscardingTaskGroup { group in + group.addTask { + for try await row in active.events(for: inserts) { + // row: Message + } + } + group.addTask { + for try await event in active.events(for: allMsgs) { + // event: PostgresChange + switch event { + case .insert(let row): handle(row) + case .update(let old, let new): diff(old, new) + case .delete(let row): remove(row) + } + } + } + group.addTask { + for try await _ in active.events(for: roomGone) { close() } + } +} +``` + +**Tokens are reusable across subscribe cycles.** After `active.leave()`, the +same tokens replay on the next `channel.subscribe()`. New tokens may also be +registered between leave and resubscribe. Registering while joined throws +`.cannotRegisterAfterJoin`. + +**Fan-out per token.** Each `active.events(for: token)` call returns a fresh +stream; multiple iterators of the same token each receive every event. + +**Reconnect is transparent.** `ActiveChannel` survives silent reconnects +(§9.2); all tokens are re-registered automatically on rejoin. The handle is +invalidated only by explicit `leave()` or terminal `.transportFailure`. + +**AND composition is not available on the wire.** Callers needing multiple +clauses on the same event stream must client-side filter after receipt, or +register two tokens — each produces an independent server subscription +(events may duplicate across the two if the filters overlap, since the +server OR-s them). + +### 5.4 Untyped escape hatch + +For types without `@RealtimeTable`, the same register-then-subscribe flow +applies — only the filter and element types change. + +```swift +let token = channel.changes( + schema: "public", table: "messages", event: .delete, + filter: .eq("room_id", id) // UntypedFilter — string column + any value +) +// token: ChangeRegistration + +let active = try await channel.subscribe() + +for try await record in active.events(for: token) { + // record: JSONValue — caller decodes manually +} +``` + +--- + +## 6. Connection + +### 6.1 Lazy open + +The WebSocket opens lazily on first channel join (implicit via subscribe, or +explicit via `channel.join()`). `httpBroadcast` does not open the socket. + +Explicit `realtime.connect()` is available for callers who want to pre-warm +or surface auth errors early. Calls are idempotent — a second `connect()` +on an already-connected client returns immediately. + +### 6.2 Disconnect + +```swift +public extension Realtime { + /// Closes the socket, awaits close ACK. Does NOT evict the channel cache + /// or call leave() on any channel. Streams throw + /// `.channelClosed(.transportFailure)`; subsequent operations trigger a + /// fresh connect + rejoin. + func disconnect() async +} +``` + +After a manual `disconnect()`, the `ReconnectionPolicy` does NOT auto-reopen. +The next channel operation (subscribe, send, or explicit `connect()`) +triggers a fresh connect. + +### 6.3 Mid-session token rotation + +```swift +public extension Realtime { + /// Push a new token to the server via the Phoenix access_token event. + /// Keeps private channels authorized without rejoining. + func updateToken(_ newToken: String) async throws(RealtimeError) +} +``` + +**Reactive path.** If the server rejects an operation with `token_expired`, +the SDK calls `APIKeySource.dynamic()` once and retries. If the same token +comes back, it propagates `.authenticationFailed`. + +**If `dynamic()` throws:** propagates as `.authenticationFailed(underlying:)`. +Connection enters `.closed(.unauthorized)`. The `ReconnectionPolicy` does +NOT apply — auth recovery is caller-owned. + +**On `connect()`:** blocks on the first `dynamic()` call. Fail-fast if auth +is broken. + +### 6.4 Status + +```swift +public extension Realtime { + var status: AsyncStream { get } +} + +public struct ConnectionStatus: Sendable { + public enum State: Sendable { + case idle + case connecting(attempt: Int) + case connected + case reconnecting(attempt: Int, lastError: (any Error & Sendable)?) + case closed(CloseReason) + } + public let state: State + public let since: Date + public let latency: Duration? // last heartbeat RTT +} +``` + +--- + +## 7. Error Model + +```swift +public enum RealtimeError: Error, Sendable { + case disconnected + case transportFailure(underlying: any Error & Sendable) + case reconnectionGaveUp(lastError: any Error & Sendable) + + case channelNotJoined + case channelJoinTimeout + case channelJoinRejected(reason: String) + case channelClosed(CloseReason) + case cannotRegisterAfterJoin // postgres_changes registration after join (§5.3) + + case authenticationFailed(reason: String, underlying: (any Error & Sendable)?) + case tokenExpired + + case rateLimited(retryAfter: Duration?) + case serverError(code: Int, message: String) + + case broadcastFailed(reason: String) + case broadcastAckTimeout + + case decoding(type: String, underlying: any Error & Sendable) + case encoding(underlying: any Error & Sendable) + + case cancelled // includes task cancellation; Swift's CancellationError is folded here +} +``` + +Single flat enum. Swift's `CancellationError` is caught internally and +re-thrown as `.cancelled` so call sites exhaustively handle one type. +Underlying errors are preserved as `any Error & Sendable` for debugging. + +--- + +## 8. Transport and Testing + +### 8.1 Public transport protocol + +```swift +public protocol RealtimeTransport: Sendable { + func connect(to url: URL, headers: HTTPFields) + async throws -> any RealtimeConnection +} + +public protocol RealtimeConnection: Sendable { + var frames: AsyncThrowingStream { get } + func send(_ frame: TransportFrame) async throws + func close(code: Int, reason: String) async +} + +public enum TransportFrame: Sendable { + case text(String) + case binary(Data) +} +``` + +### 8.2 Built-in implementations + +- `URLSessionTransport` (default). Production. Accepts a custom + `URLSession` via init for proxy / header / session-config customization. +- `InMemoryTransport.pair()` — test helper in `RealtimeTestHelpers` module. + Returns `(client, server)`; the server has `send(_:)` and + `AsyncStream` of frames the client sent. Zero real I/O. + +### 8.3 Deterministic clock + +`Configuration.clock: any Clock` lets tests use `TestClock` to +advance heartbeats/timeouts synchronously. Matches existing `swift-clocks` +patterns in the codebase. + +--- + +## 9. Resilience + +### 9.1 Reconnection policies + +```swift +public struct ReconnectionPolicy: Sendable { + public var nextDelay: @Sendable ( + _ attempt: Int, + _ lastError: any Error & Sendable + ) -> Duration? // nil = give up + + public static let never: Self + public static func exponentialBackoff( + initial: Duration, max: Duration, jitter: Double = 0.2 + ) -> Self + public static func fixed(_ delay: Duration, maxAttempts: Int?) -> Self +} +``` + +### 9.2 Behavior during reconnection + +- **Streams stay open silently.** No sentinel values — events just pause + and resume. `channel.state` is the source of truth for lifecycle. +- **Presence is auto-restored.** The SDK re-sends every live `track()` + state on rejoin. Observers see the re-synced state naturally. +- **Postgres change subscriptions are restored.** Filters re-register on + join. +- **In-flight sends throw immediately.** `try await channel.broadcast(...)` + during an outage throws `.disconnected` — no queuing. +- **On give-up.** Channel streams throw `.channelClosed(.transportFailure)`, + the channel cache evicts affected entries, `channel.state` transitions + to `.closed(.transportFailure)`. This is distinct from user `leave()` — + `.transportFailure` means "server-initiated close the SDK surfaces," not + "you were supposed to call leave." + +### 9.3 App lifecycle + +```swift +public enum LifecyclePolicy: Sendable { + case manual + case automatic +} +``` + +On `automatic` (default on iOS/macOS/tvOS/visionOS), short +background/foreground cycles keep the socket alive; longer cycles or +OS-killed sockets trigger a reconnect on foreground. No caller code. + +--- + +## 10. Observability + +```swift +public protocol RealtimeLogger: Sendable { + func log(_ event: LogEvent) +} + +public struct LogEvent: Sendable { + public let level: LogLevel // .debug, .info, .warn, .error + public let category: Category // .connection, .channel, .broadcast, .presence, .postgres + public let message: String + public let metadata: [String: String] + public let timestamp: Date +} + +public enum LogLevel: Sendable { case debug, info, warn, error } +public enum Category: Sendable { case connection, channel, broadcast, presence, postgres } +``` + +Ship `OSLogLogger` and `StdoutLogger`. Metrics are logs with numeric +metadata (`heartbeat.rtt_ms`, `reconnect.attempt`, `broadcast.ack_latency_ms`) — +consumers extract as they need via their logger of choice. No swift-metrics +dependency in the core module. + +--- + +## 11. Migration Sketch (V2 → V3) + +| V2 | V3 | +| ----------------------------------------------- | --------------------------------------------------------- | +| `RealtimeClientV2(url:options:)` | `Realtime(url:apiKey:configuration:transport:)` | +| `client.channel("x")` | `realtime.channel("x")` (shared; explicit `leave()`) | +| `await channel.subscribe()` | `await channel.join()` (implicit on first subscribe) | +| `await channel.unsubscribe()` | `try await channel.leave()` (typed throws, global) | +| `channel.broadcastStream(event:)` | `channel.broadcasts(of: T.self, event:)` (typed stream) | +| `await channel.broadcast(event:message:)` | `try await channel.broadcast(payload, as: event)` | +| — (no equivalent) | `realtime.httpBroadcast(topic:event:payload:)` | +| `channel.postgresChange(.all, …)` | `let token = channel.changes(to: Message.self, …); let active = try await channel.subscribe(); active.events(for: token)` | +| `channel.presenceChange()` | `channel.presence.diffs(T.self)` / `.observe(T.self)` | +| `channel.track(...)` | `try await channel.presence.track(state)` → handle | +| `ObservationToken` / `subscription.cancel()` | `AsyncSequence` iteration ends on task cancel | +| `accessToken: () async -> String?` closure | `APIKeySource.dynamic(…)` + `realtime.updateToken(…)` | +| `any Error` | `RealtimeError` (typed throws everywhere) | +| `RealtimeClientOptions.maxRetryAttempts` etc. | `Configuration.reconnection: ReconnectionPolicy` | +| `options.vsn` | `Configuration.protocolVersion` (default `.v2`) | +| `options.handleAppLifecycle` | unchanged | + +--- + +## 12. Locked Decisions + +Everything below was resolved during design review. Kept here for reference +so implementors don't re-litigate. + +| # | Decision | Rationale | +| - | -------- | --------- | +| 1 | Channels are shared by topic within a `Realtime` instance | One server-side subscription per topic; predictable identity | +| 2 | No auto-unsubscribe on `deinit`; explicit `leave()` only | Explicit lifecycle; no ref-count magic | +| 3 | Global `leave()` — other holders' streams throw `.channelClosed(.userRequested)` | Mirrors the wire; surfaces the shared nature | +| 4 | `leave()` is `async throws`, awaits server ACK | Deterministic; consistent with the rest of the API | +| 5 | Pipelined re-acquire after `leave()` | Same-topic churn is transparent | +| 6 | Reconnect is silent in typed streams; `channel.state` is the lifecycle source of truth | Avoids leaky delivery-guarantee abstractions | +| 7 | Unbounded per-subscription buffer (for now) | Simplest; `SlowConsumerPolicy` knob can be added additively | +| 8 | Fan-out: each `broadcasts(of:event:)` call is independent | Matches pub/sub intuition; slow consumer is local | +| 9 | `APIKeySource.dynamic(_:)` for fetch; `updateToken(_:)` for push | No JWT parsing in the SDK | +| 10 | On `token_expired`: retry once, then propagate | Tolerates race between rotation and notify | +| 11 | `dynamic()` throwing does NOT trigger `ReconnectionPolicy` | Auth recovery is caller-owned | +| 12 | Single optional `Filter` per postgres_changes subscription | Reflects the Phoenix wire constraint | +| 13 | `Filter` is a struct with static factories; reads like an enum | Preserves `KeyPath` + `V` binding in generics | +| 14 | `@RealtimeTable` macro for column-name resolution; manual conformance as escape hatch | Type-safe without forcing macros on every type | +| 14a | Postgres changes are **register-then-subscribe**: `channel.changes(...)` returns a `ChangeRegistration` token; `channel.subscribe()` triggers the join with all pending tokens; consumption via `active.events(for: token)` | Phoenix requires postgres_changes filters in the join payload — the API can't pretend lazy join works for them | +| 14b | Tokens carry the event variant in their type (`Insert`/`Update`/`Delete`/`AnyEvent`); element type follows the variant | Compiler enforces the right consumer shape per token kind | +| 14c | Registering after join throws `.cannotRegisterAfterJoin`; tokens are reusable across `leave()` + resubscribe cycles | Honest about the wire; ergonomic across reconnects and cycles | +| 14d | Broadcast/presence stay lazy-join; whichever path triggers the join first (subscribe or iteration) captures the pending postgres tokens | One `phx_join` per topic; uniform "first-join wins" rule | +| 15 | `PresenceHandle` is a regular class; explicit `cancel()`; debug warning on leak | Consistent with `Channel` lifecycle rule | +| 16 | Multi-track supported (multiple metas per key) | Matches Phoenix; single-track is the trivial subset | +| 17 | Presence key is channel-level only; server-generated if nil | Simpler; per-track keys confuse more than they help | +| 18 | Auto re-track on reconnect | Presence is a best-effort synced-state abstraction | +| 19 | `withChannel` dropped entirely | Dangerous under global-leave semantics; 3-line explicit pattern is clearer | +| 20 | Flat `RealtimeError` enum; cancellation folded as `.cancelled` | Simpler call sites than grouped or union-throws | +| 21 | Underlying errors preserved as `any Error & Sendable` | Debug value outweighs Equatable/Codable loss | +| 22 | Single `broadcast` method; ack at channel-level config | Uniform call site | +| 23 | Self-broadcast is channel-level only (wire constraint) | Don't lie about the wire | +| 24 | Replay via `ChannelOptions.broadcast.replay` | Config at creation; not a separate method | +| 25 | `Data` payloads bypass encoding; ship as binary frames | Natural Swift affordance | +| 26 | `broadcast` throws `.channelNotJoined` if not joined | Joining is a commitment; one-shot sends go via HTTP | +| 27 | `realtime.httpBroadcast(...)` for one-shot sends; shares `APIKeySource` | Clear separation from WS path | +| 28 | Socket opens lazily on first channel join | Zero ceremony for common paths; explicit `connect()` still exists | +| 29 | `disconnect()` closes socket, keeps channel cache | Pause/resume, not total teardown | +| 30 | `disconnect()` is `async`, awaits close ACK | Consistent with other terminal operations | +| 31 | `connect()` is idempotent | No ceremony for retry paths | +| 32 | No auto-reconnect after manual `disconnect()` | `ReconnectionPolicy` is for unexpected closes | +| 33 | Duplicate `channel(topic)` with different options: first-call wins + debug warning | Silent drift is worse than a warning | +| 34 | `@RealtimeSchema` (typed event channels) deferred | Per-call generics cover 90% of the typing benefit; macro complexity can wait | +| 35 | Public `RealtimeTransport` protocol | Custom transports for testing and advanced networking | +| 36 | Ship `InMemoryTransport.pair()` in test helpers | Table stakes for deterministic testing | +| 37 | Inject `Clock` via `Configuration` | Deterministic timeout/heartbeat tests | +| 38 | Drop obsoleted V2 knobs (`connectOnSubscribe`, `maxRetryAttempts`, `logLevel`, `fetch`, `accessToken`, `disconnectOnSessionLoss`) | Subsumed by better abstractions | +| 39 | Keep `disconnectOnEmptyChannelsAfter` (socket idle timeout) and `protocolVersion` | Still useful | +| 40 | Per-operation timeouts: `joinTimeout`, `leaveTimeout`, `broadcastAckTimeout` | One global knob can't tune distinct round-trips | +| 41 | Logger only; no separate metrics stream; no swift-metrics dep | Metrics = logs with numeric metadata | +| 42 | No custom join payload | Unused in practice; removes surface | +| 43 | Multiple `Realtime` instances are fully independent | No singleton; no hidden coupling | +| 44 | Topic strings are not validated | Server is the source of truth for validity | +| 45 | Presence key default: server-generated when nil | Matches Phoenix | + +--- + +## Appendix A — End-to-end example + +```swift +import Realtime + +@RealtimeTable(schema: "public", table: "messages") +struct Message: Codable, Sendable, Identifiable { + var id: UUID + var roomId: UUID + var text: String + var authorId: UUID + var createdAt: Date +} + +struct UserPresence: Codable, Sendable { + let userId: UUID + let status: Status + enum Status: String, Codable, Sendable { case active, idle } +} + +@MainActor @Observable +final class ChatRoomModel { + private let realtime: Realtime + private let channel: Channel + private let roomId: UUID + private var runTask: Task? + private var active: ActiveChannel? + private var trackHandle: PresenceHandle? + + var messages: [Message] = [] + var onlineUsers: [UUID: UserPresence] = [:] + var connection: ConnectionStatus.State = .idle + + init(realtime: Realtime, roomId: UUID) { + self.realtime = realtime + self.roomId = roomId + self.channel = realtime.channel("chat:room:\(roomId)") { + $0.presenceKey = "user-\(Self.currentUserID)" + } + } + + func start(me: UUID) { + // Register postgres tokens BEFORE subscribe — they bake into phx_join. + let messageInserts = channel.inserts( + into: Message.self, where: .eq(\.roomId, roomId) + ) + + runTask = Task { [channel, realtime, roomId, messageInserts, weak self] in + do { + // Single explicit join captures the registration above. + let active = try await channel.subscribe() + await MainActor.run { self?.active = active } + + try await withThrowingDiscardingTaskGroup { group in + // Postgres inserts → append + group.addTask { + for try await row in active.events(for: messageInserts) { + await MainActor.run { self?.messages.append(row) } + } + } + // Presence observers (lazy-attaches to the joined channel) + group.addTask { + for await state in channel.presence.observe(UserPresence.self) { + let mapped = Dictionary( + uniqueKeysWithValues: state.active.values + .flatMap { $0 } + .map { ($0.userId, $0) } + ) + await MainActor.run { self?.onlineUsers = mapped } + } + } + // Track myself + group.addTask { + let handle = try await channel.presence.track( + UserPresence(userId: me, status: .active) + ) + await MainActor.run { self?.trackHandle = handle } + } + // Connection status mirror + group.addTask { + for await status in realtime.status { + await MainActor.run { self?.connection = status.state } + } + } + } + } catch is CancellationError { + // expected on view teardown + } catch let error as RealtimeError { + print("chat failed:", error) // exhaustive — compiler enforces + } + } + } + + /// Broadcast on the same channel handle the subscribers use. + /// One server-side subscription; one round-trip. + func send(_ text: String, from author: UUID) async throws(RealtimeError) { + try await channel.broadcast( + ChatMessage(authorId: author, text: text), + as: "chat" + ) + } + + func stop() async { + runTask?.cancel() + try? await trackHandle?.cancel() + try? await active?.leave() + } +} +``` + +--- + +## Appendix B — Why not Combine? + +- `AsyncSequence` is the lingua franca of new Apple frameworks. +- Combine cannot express typed throws or structured cancellation cleanly. +- Callers who want Combine can wrap any stream in `Publisher` in ~5 lines — + the reverse is lossy. + +## Appendix C — Platform requirements + +- Swift 6.2+ (typed throws, isolated deinit, macros at the required level) +- iOS 17+ / macOS 14+ / tvOS 17+ / watchOS 10+ / visionOS 1+ for + `@Observable`. A non-Observable compatibility layer could extend support + to iOS 13+ at the cost of ergonomic integration. From f72170a06fe1ccc11ce0751c962fc44276cb1374 Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Thu, 7 May 2026 14:22:30 -0300 Subject: [PATCH 2/6] docs(realtime): collapse ActiveChannel into ChannelSubscription MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the two-type Channel + ActiveChannel split with a single post-join type that conforms to AsyncSequence and owns broadcast send. - subscribe() returns ChannelSubscription (was ActiveChannel) - ChannelSubscription: AsyncSequence with Element = PhoenixMessage - typed views: broadcasts(of:event:), events(for: token), presence - broadcast send moves from Channel to ChannelSubscription — type-level gate replaces the runtime .channelNotJoined error - presence accessor moves from Channel to ChannelSubscription - subscribe() is now the only join path; no iteration-driven lazy-join - 30-second tour, §2-§7, §11 migration table, §12 decisions, Appendix A all updated end-to-end The new shape collapses asymmetry: Channel exposes registration + the join verb; ChannelSubscription exposes everything else. Sending without a live subscription is unrepresentable. Co-Authored-By: Claude Opus 4.7 --- docs/design/realtime-v3.md | 257 +++++++++++++++++++++++-------------- 1 file changed, 163 insertions(+), 94 deletions(-) diff --git a/docs/design/realtime-v3.md b/docs/design/realtime-v3.md index c4b3d55a3..49c0fd108 100644 --- a/docs/design/realtime-v3.md +++ b/docs/design/realtime-v3.md @@ -36,27 +36,38 @@ let realtime = Realtime( let channel = realtime.channel("room:42") -// Broadcast — lazy join on first iteration. +// Optional: register postgres tokens BEFORE subscribe. +let inserts = channel.inserts(into: Message.self, where: .eq(\.roomId, 42)) + +// Single explicit join. +let sub = try await channel.subscribe() + +// Typed broadcast receive. Task { - for await msg in channel.broadcasts(of: ChatMessage.self, event: "chat") { + for try await msg in sub.broadcasts(of: ChatMessage.self, event: "chat") { render(msg) } } -// Postgres changes — register before subscribe. -let inserts = channel.inserts(into: Message.self, where: .eq(\.roomId, 42)) -let active = try await channel.subscribe() +// Postgres consumption. Task { - for try await row in active.events(for: inserts) { + for try await row in sub.events(for: inserts) { append(row) } } -// Send on the same handle. -try await channel.broadcast(ChatMessage(text: "hi"), as: "chat") +// Untyped raw feed (the AsyncSequence conformance). +Task { + for try await frame in sub { + // frame: PhoenixMessage — broadcast / postgres_changes / presence_diff / ... + } +} + +// Send (only available on the subscription — Channel has no broadcast method). +try await sub.broadcast(ChatMessage(text: "hi"), as: "chat") // Explicit release when done. -try await active.leave() +try await sub.leave() ``` One-shot send without joining: @@ -70,10 +81,14 @@ try await realtime.httpBroadcast( That's the mental model: -- **Broadcast** and **presence** are lazy — iterate them and the channel joins. -- **Postgres changes** are register-then-subscribe — the wire forces it. -- A single `phx_join` carries everything pending at the moment of join. -- Tokens are reusable across `leave()` + resubscribe cycles. +- **Channels are factories.** `realtime.channel(topic)` returns a handle for + registering postgres tokens and triggering `subscribe()`. Nothing else. +- **`subscribe()` returns a `ChannelSubscription`.** All consumption (typed and + untyped), all sending, and presence live on the subscription. The type + system enforces "you must subscribe before doing anything live." +- **Postgres changes are register-then-subscribe.** The Phoenix wire forces it; + the API reflects it. Tokens are reusable across `leave()` cycles. +- **One `phx_join` per topic.** All pending tokens land in that single join. Everything below is elaboration. @@ -155,53 +170,89 @@ public final actor Channel: Sendable { public var options: ChannelOptions { get } public var state: AsyncStream { get } - /// Explicit join. Returns an `ActiveChannel` for postgres_changes consumption. - /// Idempotent: calling while joined returns the existing `ActiveChannel`; - /// concurrent calls before join await the same in-flight join. + /// Explicit join. Returns a `ChannelSubscription` — the surface for all + /// post-join interaction (consumption, sending, presence). Idempotent: + /// calling while joined returns an equivalent subscription value; concurrent + /// calls before join await the same in-flight join. /// /// Postgres-change registrations made before this call are baked into the - /// `phx_join` payload (see §5). Broadcast and presence iteration also auto- - /// join on first use; whichever path triggers the join first captures the - /// pending postgres registrations. - public func subscribe() async throws(RealtimeError) -> ActiveChannel + /// `phx_join` payload (see §5). After the call returns, registration of + /// new tokens throws `.cannotRegisterAfterJoin` until the next `leave()`. + public func subscribe() async throws(RealtimeError) -> ChannelSubscription - /// Equivalent to `subscribe()` but discards the active handle. Useful for - /// broadcast-only / presence-only channels. - public func join() async throws(RealtimeError) + /// Convenience for callers who don't currently hold a subscription value + /// (e.g., a different feature on the same shared topic). Equivalent to + /// `subscribe().leave()` but does not require fetching the subscription. + public func leave() async throws(RealtimeError) } -public struct ActiveChannel: Sendable { - public var channel: Channel { get } +/// The post-join surface. Iterating directly yields the untyped Phoenix +/// message stream; methods refine into typed views for broadcasts, postgres +/// changes, and presence. Holds the only handle for sending broadcasts. +public struct ChannelSubscription: AsyncSequence, Sendable { + public typealias Element = PhoenixMessage + + /// Untyped iteration — every user-visible Phoenix message on this channel. + /// Excludes internal frames (`phx_reply`, `phx_close`). Fan-out: each + /// iteration is independent; all iterators observe every message. + public func makeAsyncIterator() -> AsyncIterator - /// Iterate events for a previously-registered postgres change token. - /// Each call returns an independent fan-out stream; multiple iterators - /// observe every event independently. (§5) + // Typed views (§3, §5, §4) — see the relevant sections for full signatures. + public func broadcasts(of type: T.Type, event: String) + -> AsyncThrowingStream public func events(for token: ChangeRegistration) -> AsyncThrowingStream + public var presence: Presence { get } - /// Explicit unsubscribe. Awaits server ACK. See §2.3. - /// After leave, this `ActiveChannel` is invalid; tokens remain reusable. + // Sending (§3.2) — only available post-subscribe; type system enforces it. + public func broadcast(_ payload: T, as event: String) + async throws(RealtimeError) + public func broadcast(_ data: Data, as event: String) async throws(RealtimeError) + + /// Explicit unsubscribe. Global (§2.3); awaits server ACK. After leave, + /// this subscription is invalidated — methods throw `.channelClosed`. + /// Tokens registered on the underlying channel remain reusable for the + /// next `subscribe()`. public func leave() async throws(RealtimeError) } + +public struct PhoenixMessage: Sendable { + /// Server-side event name. Common values: `"broadcast"`, `"postgres_changes"`, + /// `"presence_diff"`, `"presence_state"`, `"system"`. + public let event: String + + /// Raw payload as received. JSON for text frames, `Data` for binary. + public let payload: PhoenixPayload + + /// Local receipt timestamp. + public let receivedAt: Date +} + +public enum PhoenixPayload: Sendable { + case json(JSONValue) + case binary(Data) +} ``` Key invariants: - **Topic identity.** `realtime.channel("x")` always returns the same actor. One server-side subscription per topic per `Realtime` instance. -- **No auto-unsubscribe.** Dropping a `Channel` or `ActiveChannel` reference - does nothing. Explicit `leave()` is the only way. -- **Single join, multiple paths.** `channel.subscribe()`, broadcast iteration, - and presence iteration all converge on a single `phx_join`. The first to - fire commits the join payload — including any postgres_changes tokens - registered up to that moment. +- **No auto-unsubscribe.** Dropping a `Channel` or `ChannelSubscription` does + nothing. Explicit `leave()` is the only way. +- **`subscribe()` is the only join path.** No lazy-join via iteration. The + WebSocket opens lazily on the first `subscribe()` (§6.1). - **Postgres tokens register before join.** `channel.changes(...)`, `channel.inserts(...)`, etc. mutate channel state and return tokens. Calling these *after* the channel has joined throws `.cannotRegisterAfterJoin`. After `leave()`, registration is allowed again — tokens are reusable across subscribe cycles. (§5) -- **`channel.broadcast(...)` throws** `.channelNotJoined` if the channel hasn't - joined — for one-shot sends without joining, use `realtime.httpBroadcast`. +- **Sending is only available on a subscription.** Type-level gate: there is + no `Channel.broadcast(...)` method. For one-shot sends without joining, use + `realtime.httpBroadcast` (§3.3). +- **Multiple `subscribe()` calls return equivalent subscriptions.** All point + at the same backing channel state; any subscription's `leave()` ends the + channel for every holder of the topic. - **Leaked-channel warning.** When `Realtime` deinits with channels that have been joined but never left, an `IssueReporting` warning fires in debug builds. Release builds silently rely on server-side timeouts. @@ -275,27 +326,29 @@ public enum CloseReason: Sendable, Equatable { ## 3. Broadcast +All broadcast surfaces — typed receiving, typed sending, and the untyped +iteration over the raw Phoenix feed — live on `ChannelSubscription`. The +type system enforces "you must have subscribed before consuming or sending." + ### 3.1 Receiving ```swift -public extension Channel { - /// Typed event stream — decodes each message's payload to `T`. - /// Fan-out: each call returns an independent subscription; multiple - /// iterators observe every message independently. +public extension ChannelSubscription { + /// Typed event stream — decodes each broadcast message's payload to `T`, + /// filtered to a single event name. Fan-out: each call returns an + /// independent stream; multiple iterators observe every matching message. func broadcasts( of type: T.Type, event: String ) -> AsyncThrowingStream - - /// Untyped stream — raw `JSONValue` payloads across all events. - func broadcasts() -> AsyncStream } -public struct BroadcastMessage: Sendable { - public let event: String - public let payload: JSONValue - public let receivedAt: Date -} +// Untyped iteration is the AsyncSequence conformance on ChannelSubscription +// itself (§2.1). Element is `PhoenixMessage`, which spans broadcasts, +// postgres_changes, presence_diff, and other channel-level events. To filter +// to broadcasts only, match on `event == "broadcast"` and decode `payload` +// manually — but the typed `broadcasts(of:event:)` method is the recommended +// path. ``` Streams pause silently during reconnection and resume on rejoin. Gaps are @@ -309,15 +362,14 @@ will accumulate pending messages and eventually OOM under sustained lag. A ### 3.2 Sending ```swift -public extension Channel { +public extension ChannelSubscription { /// Sends a broadcast. Behavior depends on `ChannelOptions.broadcast.acknowledge`: /// - `false` (default): fire-and-forget; returns after the frame is queued. /// - `true`: awaits server ack; throws on timeout (`broadcastAckTimeout`). /// - /// Throws `.channelNotJoined` if the channel has not yet joined (use - /// `channel.join()` or subscribe first — or use `realtime.httpBroadcast` - /// for one-shot sends). - /// Throws `.disconnected` if the socket is down — no queuing. + /// Throws `.channelClosed` if `leave()` has been called on this or any + /// other holder of the topic. Throws `.disconnected` if the socket is down + /// — no queuing. func broadcast( _ payload: T, as event: String @@ -328,6 +380,10 @@ public extension Channel { } ``` +Type-level guarantee: there is no `Channel.broadcast(...)`. To send, callers +must first `await channel.subscribe()`. The previous `.channelNotJoined` +runtime error is gone — the situation is unrepresentable. + ### 3.3 HTTP one-shot broadcast For senders that don't need a subscription, `realtime.httpBroadcast` POSTs @@ -361,8 +417,12 @@ taxonomy (`.authenticationFailed`, `.rateLimited`, `.serverError`). ## 4. Presence +Presence — like broadcast consumption — is gated behind `ChannelSubscription`. +The presence key is still configured at channel creation via `ChannelOptions` +(§2.2); `track`, `observe`, and `diffs` require a live subscription. + ```swift -public extension Channel { +public extension ChannelSubscription { var presence: Presence { get } } @@ -478,7 +538,7 @@ at compile time. Phoenix requires postgres_changes filters in the `phx_join` payload — they cannot be added after join. The API reflects this: registration mutates channel state and returns a token; `subscribe()` triggers the join with all -pending tokens; consumption happens through the returned `ActiveChannel`. +pending tokens; consumption happens through the returned `ChannelSubscription`. ```swift public extension Channel { @@ -526,17 +586,17 @@ let allMsgs = channel.changes(to: Message.self, where: .eq(\.roomId, id)) let roomGone = channel.deletes(from: Room.self, where: .eq(\.id, id)) // 2. Trigger join. All three tokens land in the same phx_join payload. -let active = try await channel.subscribe() +let sub = try await channel.subscribe() // 3. Consume — element type follows the token's variant. await withThrowingDiscardingTaskGroup { group in group.addTask { - for try await row in active.events(for: inserts) { + for try await row in sub.events(for: inserts) { // row: Message } } group.addTask { - for try await event in active.events(for: allMsgs) { + for try await event in sub.events(for: allMsgs) { // event: PostgresChange switch event { case .insert(let row): handle(row) @@ -546,22 +606,22 @@ await withThrowingDiscardingTaskGroup { group in } } group.addTask { - for try await _ in active.events(for: roomGone) { close() } + for try await _ in sub.events(for: roomGone) { close() } } } ``` -**Tokens are reusable across subscribe cycles.** After `active.leave()`, the +**Tokens are reusable across subscribe cycles.** After `sub.leave()`, the same tokens replay on the next `channel.subscribe()`. New tokens may also be registered between leave and resubscribe. Registering while joined throws `.cannotRegisterAfterJoin`. -**Fan-out per token.** Each `active.events(for: token)` call returns a fresh +**Fan-out per token.** Each `sub.events(for: token)` call returns a fresh stream; multiple iterators of the same token each receive every event. -**Reconnect is transparent.** `ActiveChannel` survives silent reconnects -(§9.2); all tokens are re-registered automatically on rejoin. The handle is -invalidated only by explicit `leave()` or terminal `.transportFailure`. +**Reconnect is transparent.** `ChannelSubscription` survives silent reconnects +(§9.2); all tokens are re-registered automatically on rejoin. The subscription +is invalidated only by explicit `leave()` or terminal `.transportFailure`. **AND composition is not available on the wire.** Callers needing multiple clauses on the same event stream must client-side filter after receipt, or @@ -581,9 +641,9 @@ let token = channel.changes( ) // token: ChangeRegistration -let active = try await channel.subscribe() +let sub = try await channel.subscribe() -for try await record in active.events(for: token) { +for try await record in sub.events(for: token) { // record: JSONValue — caller decodes manually } ``` @@ -594,12 +654,14 @@ for try await record in active.events(for: token) { ### 6.1 Lazy open -The WebSocket opens lazily on first channel join (implicit via subscribe, or -explicit via `channel.join()`). `httpBroadcast` does not open the socket. +The WebSocket opens lazily on the first `channel.subscribe()` call. There is +no iteration-driven lazy-join in v3 — the only path from "no socket" to +"joined channel" is an explicit `subscribe()`. `httpBroadcast` does not open +the socket. Explicit `realtime.connect()` is available for callers who want to pre-warm -or surface auth errors early. Calls are idempotent — a second `connect()` -on an already-connected client returns immediately. +or surface auth errors early without joining a channel. Calls are idempotent +— a second `connect()` on an already-connected client returns immediately. ### 6.2 Disconnect @@ -669,7 +731,6 @@ public enum RealtimeError: Error, Sendable { case transportFailure(underlying: any Error & Sendable) case reconnectionGaveUp(lastError: any Error & Sendable) - case channelNotJoined case channelJoinTimeout case channelJoinRejected(reason: String) case channelClosed(CloseReason) @@ -817,14 +878,14 @@ dependency in the core module. | ----------------------------------------------- | --------------------------------------------------------- | | `RealtimeClientV2(url:options:)` | `Realtime(url:apiKey:configuration:transport:)` | | `client.channel("x")` | `realtime.channel("x")` (shared; explicit `leave()`) | -| `await channel.subscribe()` | `await channel.join()` (implicit on first subscribe) | -| `await channel.unsubscribe()` | `try await channel.leave()` (typed throws, global) | -| `channel.broadcastStream(event:)` | `channel.broadcasts(of: T.self, event:)` (typed stream) | -| `await channel.broadcast(event:message:)` | `try await channel.broadcast(payload, as: event)` | +| `await channel.subscribe()` | `let sub = try await channel.subscribe()` (returns `ChannelSubscription`) | +| `await channel.unsubscribe()` | `try await sub.leave()` (typed throws, global) | +| `channel.broadcastStream(event:)` | `sub.broadcasts(of: T.self, event:)` (typed stream) | +| `await channel.broadcast(event:message:)` | `try await sub.broadcast(payload, as: event)` | | — (no equivalent) | `realtime.httpBroadcast(topic:event:payload:)` | -| `channel.postgresChange(.all, …)` | `let token = channel.changes(to: Message.self, …); let active = try await channel.subscribe(); active.events(for: token)` | -| `channel.presenceChange()` | `channel.presence.diffs(T.self)` / `.observe(T.self)` | -| `channel.track(...)` | `try await channel.presence.track(state)` → handle | +| `channel.postgresChange(.all, …)` | `let token = channel.changes(to: Message.self, …); let sub = try await channel.subscribe(); sub.events(for: token)` | +| `channel.presenceChange()` | `sub.presence.diffs(T.self)` / `.observe(T.self)` | +| `channel.track(...)` | `try await sub.presence.track(state)` → handle | | `ObservationToken` / `subscription.cancel()` | `AsyncSequence` iteration ends on task cancel | | `accessToken: () async -> String?` closure | `APIKeySource.dynamic(…)` + `realtime.updateToken(…)` | | `any Error` | `RealtimeError` (typed throws everywhere) | @@ -855,10 +916,16 @@ so implementors don't re-litigate. | 12 | Single optional `Filter` per postgres_changes subscription | Reflects the Phoenix wire constraint | | 13 | `Filter` is a struct with static factories; reads like an enum | Preserves `KeyPath` + `V` binding in generics | | 14 | `@RealtimeTable` macro for column-name resolution; manual conformance as escape hatch | Type-safe without forcing macros on every type | -| 14a | Postgres changes are **register-then-subscribe**: `channel.changes(...)` returns a `ChangeRegistration` token; `channel.subscribe()` triggers the join with all pending tokens; consumption via `active.events(for: token)` | Phoenix requires postgres_changes filters in the join payload — the API can't pretend lazy join works for them | +| 14a | Postgres changes are **register-then-subscribe**: `channel.changes(...)` returns a `ChangeRegistration` token; `channel.subscribe()` triggers the join with all pending tokens; consumption via `sub.events(for: token)` | Phoenix requires postgres_changes filters in the join payload — the API can't pretend lazy join works for them | | 14b | Tokens carry the event variant in their type (`Insert`/`Update`/`Delete`/`AnyEvent`); element type follows the variant | Compiler enforces the right consumer shape per token kind | | 14c | Registering after join throws `.cannotRegisterAfterJoin`; tokens are reusable across `leave()` + resubscribe cycles | Honest about the wire; ergonomic across reconnects and cycles | -| 14d | Broadcast/presence stay lazy-join; whichever path triggers the join first (subscribe or iteration) captures the pending postgres tokens | One `phx_join` per topic; uniform "first-join wins" rule | +| 14d | `subscribe()` is the **only** join path; no iteration-driven lazy-join | One mental model; no surprises from broadcast iteration silently joining | +| 14e | `subscribe()` returns `ChannelSubscription` — the post-join surface for consumption, sending, and presence | Type-level gate: ops requiring "joined" can only be reached from a `ChannelSubscription` value | +| 14f | `ChannelSubscription` conforms to `AsyncSequence` with `Element = PhoenixMessage` | Untyped raw feed available without an extra method; typed methods refine for normal use | +| 14g | `Channel.broadcast(_:as:)` (sending) is removed; sending lives only on `ChannelSubscription` | Compile-time gate replaces the v3-draft `.channelNotJoined` runtime error | +| 14h | Multiple `subscribe()` calls return equivalent subscriptions sharing one backing state | Topic identity (Decision 1) extends to subscriptions | +| 14i | Subscription drop without `leave()` does nothing (debug warning); leave is global as in Decision 3 | Consistency with channel rules; no auto-leave footguns under topic sharing | +| 14j | `Presence` accessor moves to `ChannelSubscription` (was on `Channel` in earlier draft) | Same gate as broadcast send; track/observe require a live join | | 15 | `PresenceHandle` is a regular class; explicit `cancel()`; debug warning on leak | Consistent with `Channel` lifecycle rule | | 16 | Multi-track supported (multiple metas per key) | Matches Phoenix; single-track is the trivial subset | | 17 | Presence key is channel-level only; server-generated if nil | Simpler; per-track keys confuse more than they help | @@ -919,7 +986,7 @@ final class ChatRoomModel { private let channel: Channel private let roomId: UUID private var runTask: Task? - private var active: ActiveChannel? + private var subscription: ChannelSubscription? private var trackHandle: PresenceHandle? var messages: [Message] = [] @@ -940,22 +1007,22 @@ final class ChatRoomModel { into: Message.self, where: .eq(\.roomId, roomId) ) - runTask = Task { [channel, realtime, roomId, messageInserts, weak self] in + runTask = Task { [channel, realtime, messageInserts, weak self] in do { // Single explicit join captures the registration above. - let active = try await channel.subscribe() - await MainActor.run { self?.active = active } + let sub = try await channel.subscribe() + await MainActor.run { self?.subscription = sub } try await withThrowingDiscardingTaskGroup { group in // Postgres inserts → append group.addTask { - for try await row in active.events(for: messageInserts) { + for try await row in sub.events(for: messageInserts) { await MainActor.run { self?.messages.append(row) } } } - // Presence observers (lazy-attaches to the joined channel) + // Presence observers group.addTask { - for await state in channel.presence.observe(UserPresence.self) { + for await state in sub.presence.observe(UserPresence.self) { let mapped = Dictionary( uniqueKeysWithValues: state.active.values .flatMap { $0 } @@ -966,7 +1033,7 @@ final class ChatRoomModel { } // Track myself group.addTask { - let handle = try await channel.presence.track( + let handle = try await sub.presence.track( UserPresence(userId: me, status: .active) ) await MainActor.run { self?.trackHandle = handle } @@ -986,10 +1053,12 @@ final class ChatRoomModel { } } - /// Broadcast on the same channel handle the subscribers use. - /// One server-side subscription; one round-trip. + /// Broadcast through the active subscription. Type-level gate: cannot + /// be called before `subscription` is set. One server-side subscription; + /// one round-trip. func send(_ text: String, from author: UUID) async throws(RealtimeError) { - try await channel.broadcast( + guard let subscription else { return } + try await subscription.broadcast( ChatMessage(authorId: author, text: text), as: "chat" ) @@ -998,7 +1067,7 @@ final class ChatRoomModel { func stop() async { runTask?.cancel() try? await trackHandle?.cancel() - try? await active?.leave() + try? await subscription?.leave() } } ``` From b7da27792ac4262d62fafb1346a57b79f69727c2 Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Thu, 7 May 2026 14:26:59 -0300 Subject: [PATCH 3/6] docs(realtime): make PhoenixMessage fully raw, defer subscription state accessor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - PhoenixMessage gains joinRef and ref fields for request/reply correlation visibility; raw iteration now includes internal phx_reply/phx_close/phx_error frames so advanced consumers can observe everything the SDK sees. - Document raw iteration as the unfiltered escape hatch (the SDK still consumes these frames internally for ack correlation and lifecycle). - Lock decisions 14k (raw PhoenixMessage shape) and 14l (defer ChannelSubscription.isAlive / state accessor — additive if needed). - subscribe() remains async throws (no change); confirmed lock. Co-Authored-By: Claude Opus 4.7 --- docs/design/realtime-v3.md | 34 +++++++++++++++++++++++++--------- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/docs/design/realtime-v3.md b/docs/design/realtime-v3.md index 49c0fd108..e490f157b 100644 --- a/docs/design/realtime-v3.md +++ b/docs/design/realtime-v3.md @@ -186,15 +186,18 @@ public final actor Channel: Sendable { public func leave() async throws(RealtimeError) } -/// The post-join surface. Iterating directly yields the untyped Phoenix -/// message stream; methods refine into typed views for broadcasts, postgres -/// changes, and presence. Holds the only handle for sending broadcasts. +/// The post-join surface. Iterating directly yields the raw Phoenix message +/// stream — every frame received on this channel, with no SDK-side filtering. +/// Methods refine into typed views for broadcasts, postgres changes, and +/// presence. Holds the only handle for sending broadcasts. public struct ChannelSubscription: AsyncSequence, Sendable { public typealias Element = PhoenixMessage - /// Untyped iteration — every user-visible Phoenix message on this channel. - /// Excludes internal frames (`phx_reply`, `phx_close`). Fan-out: each - /// iteration is independent; all iterators observe every message. + /// Raw iteration — every Phoenix frame on this channel, including + /// `broadcast`, `postgres_changes`, `presence_diff`, `presence_state`, + /// `system`, `phx_reply`, `phx_close`, and `phx_error`. The SDK still + /// consumes these internally (ack correlation, lifecycle); raw consumers + /// observe a copy. Fan-out: each iteration is independent. public func makeAsyncIterator() -> AsyncIterator // Typed views (§3, §5, §4) — see the relevant sections for full signatures. @@ -217,11 +220,22 @@ public struct ChannelSubscription: AsyncSequence, Sendable { } public struct PhoenixMessage: Sendable { - /// Server-side event name. Common values: `"broadcast"`, `"postgres_changes"`, - /// `"presence_diff"`, `"presence_state"`, `"system"`. + /// Phoenix join reference correlating this frame to its `phx_join`. `nil` + /// for frames that predate the current join (rare). + public let joinRef: String? + + /// Phoenix message reference for request/reply correlation. Set on + /// pushes the SDK sent and on the matching `phx_reply`. `nil` for + /// server-pushed events (`broadcast`, `postgres_changes`, etc.). + public let ref: String? + + /// Server-side event name. Includes user-level events (`"broadcast"`, + /// `"postgres_changes"`, `"presence_diff"`, `"presence_state"`, `"system"`) + /// and Phoenix internals (`"phx_reply"`, `"phx_close"`, `"phx_error"`). public let event: String - /// Raw payload as received. JSON for text frames, `Data` for binary. + /// Raw payload as received. JSON for text frames, `Data` for binary + /// (Phoenix v2 broadcast). public let payload: PhoenixPayload /// Local receipt timestamp. @@ -926,6 +940,8 @@ so implementors don't re-litigate. | 14h | Multiple `subscribe()` calls return equivalent subscriptions sharing one backing state | Topic identity (Decision 1) extends to subscriptions | | 14i | Subscription drop without `leave()` does nothing (debug warning); leave is global as in Decision 3 | Consistency with channel rules; no auto-leave footguns under topic sharing | | 14j | `Presence` accessor moves to `ChannelSubscription` (was on `Channel` in earlier draft) | Same gate as broadcast send; track/observe require a live join | +| 14k | `PhoenixMessage` is fully raw — exposes `joinRef`, `ref`, `event`, `payload` (JSON or binary). Includes internal `phx_reply`/`phx_close`/`phx_error` frames | Direct iteration is the escape hatch for advanced consumers; SDK consumes the same frames internally for correlation | +| 14l | `ChannelSubscription.isAlive` / `state` accessor **deferred** | Callers can mirror `realtime.status` or `channel.state`; can be added additively later | | 15 | `PresenceHandle` is a regular class; explicit `cancel()`; debug warning on leak | Consistent with `Channel` lifecycle rule | | 16 | Multi-track supported (multiple metas per key) | Matches Phoenix; single-track is the trivial subset | | 17 | Presence key is channel-level only; server-generated if nil | Simpler; per-track keys confuse more than they help | From 6bfdf474ecd61a329cf7dfb2cc1e559f6892e575 Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Thu, 7 May 2026 14:30:27 -0300 Subject: [PATCH 4/6] docs(realtime): add topic to PhoenixMessage for cross-boundary routing Including topic on the raw frame so consumers that pass PhoenixMessage values across logging, debugging, or multi-topic aggregation surfaces keep the routing key without threading it separately. Always matches the ChannelSubscription's topic for in-iteration consumers. Confirmed locks (no spec change needed): - Socket-level RTT via ConnectionStatus.latency is sufficient (no per-channel / per-broadcast latency stream). - ChannelSubscription stays as the type name. Co-Authored-By: Claude Opus 4.7 --- docs/design/realtime-v3.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docs/design/realtime-v3.md b/docs/design/realtime-v3.md index e490f157b..bdacddaa4 100644 --- a/docs/design/realtime-v3.md +++ b/docs/design/realtime-v3.md @@ -229,6 +229,12 @@ public struct PhoenixMessage: Sendable { /// server-pushed events (`broadcast`, `postgres_changes`, etc.). public let ref: String? + /// Channel topic this frame belongs to. Always matches the subscription's + /// channel topic for `ChannelSubscription` iterators; included on the + /// struct so consumers that hand `PhoenixMessage` values across boundaries + /// (logging, debugging, multi-topic aggregation) keep the routing key. + public let topic: String + /// Server-side event name. Includes user-level events (`"broadcast"`, /// `"postgres_changes"`, `"presence_diff"`, `"presence_state"`, `"system"`) /// and Phoenix internals (`"phx_reply"`, `"phx_close"`, `"phx_error"`). From 0ca1f8255b6ecf0ebea3369198bf256f4276ec0d Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Thu, 7 May 2026 14:39:21 -0300 Subject: [PATCH 5/6] docs(realtime): redesign ChangeRegistration generics, lock leave-invalidates-subscription MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three improvements after consistency sweep: 1. ChangeRegistration generics simplified. - Variants (Insert/Update/Delete/AnyEvent) are themselves generic over T and conform to a ChangeEventVariant protocol declaring Element. - ChangeRegistration drops to a single generic parameter (the variant carries T): ChangeRegistration> instead of ChangeRegistration. - sub.events(for:) becomes a single overload dispatched on the variant. - Fixes a real type-system bug in the previous shape, where ChangeEventVariant.Element referenced T but T wasn't in scope. 2. Filter split into Filter + UntypedFilter. - Untyped path no longer requires JSONValue to conform to RealtimeTable. - Untyped factories (channel.changes(schema:table:filter:), etc.) return ChangeRegistration>; identical type to typed registrations just with a different variant T. Mix freely. 3. Tighten subscription lifecycle. - §2.1 invariants now state that manual leave() invalidates the subscription value; methods throw .channelClosed(.userRequested); iteration terminates; reconnects do NOT invalidate. - Decision 14m captures this; 14n/14o capture the type split. Also drops .channelNotJoined from §7 references (already removed earlier; Decision 26 still mentioned it — fixed). Co-Authored-By: Claude Opus 4.7 --- docs/design/realtime-v3.md | 129 +++++++++++++++++++++++++++++-------- 1 file changed, 101 insertions(+), 28 deletions(-) diff --git a/docs/design/realtime-v3.md b/docs/design/realtime-v3.md index bdacddaa4..8511db49d 100644 --- a/docs/design/realtime-v3.md +++ b/docs/design/realtime-v3.md @@ -203,7 +203,7 @@ public struct ChannelSubscription: AsyncSequence, Sendable { // Typed views (§3, §5, §4) — see the relevant sections for full signatures. public func broadcasts(of type: T.Type, event: String) -> AsyncThrowingStream - public func events(for token: ChangeRegistration) + public func events(for token: ChangeRegistration) -> AsyncThrowingStream public var presence: Presence { get } @@ -273,6 +273,15 @@ Key invariants: - **Multiple `subscribe()` calls return equivalent subscriptions.** All point at the same backing channel state; any subscription's `leave()` ends the channel for every holder of the topic. +- **Subscriptions are invalidated by manual `leave()`.** After + `subscription.leave()` returns (or any other holder's `leave()` ACKs), the + subscription value is dead. Calling `broadcast`, `events(for:)`, + `broadcasts(of:event:)`, or `presence` on it throws + `.channelClosed(.userRequested)`; direct iteration terminates. To re-engage, + call `channel.subscribe()` again — that returns a *new* subscription value; + do not stash subscriptions long-term across leave cycles. Reconnects do + *not* invalidate subscriptions (§9.2) — only manual leave or terminal + `.transportFailure` does. - **Leaked-channel warning.** When `Realtime` deinits with channels that have been joined but never left, an `IssueReporting` warning fires in debug builds. Release builds silently rely on server-side timeouts. @@ -528,13 +537,16 @@ extension ExternalType: RealtimeTable { } ``` -### 5.2 Typed filter — single optional clause +### 5.2 Filters — typed and untyped Phoenix Realtime supports exactly one `column=op.value` per postgres_changes -subscription. The SDK reflects this constraint: a single optional `Filter` -per subscription. +subscription. The SDK reflects this with a single optional filter per +registration. Two filter types — same wire encoding, different input shape: ```swift +/// Type-checked filter for `RealtimeTable` types. Column is a `KeyPath`; the +/// value's type must match the keypath's `Value`. `.eq(\.roomId, 42)` against +/// `var roomId: UUID` fails at compile time. public struct Filter: Sendable { public static func eq( _ column: KeyPath, _ value: V @@ -546,12 +558,26 @@ public struct Filter: Sendable { public static func lte(…) -> Filter public static func `in`(_ column: KeyPath, _ values: [V]) -> Filter } + +/// Untyped filter for cases where the row type cannot or does not conform +/// to `RealtimeTable`. Column is a raw string; values are still constrained +/// to `RealtimePostgresFilterValue` for correct wire encoding. +public struct UntypedFilter: Sendable { + public static func eq(_ column: String, + _ value: any RealtimePostgresFilterValue) -> UntypedFilter + public static func neq(…) -> UntypedFilter + public static func gt(…) -> UntypedFilter + public static func gte(…) -> UntypedFilter + public static func lt(…) -> UntypedFilter + public static func lte(…) -> UntypedFilter + public static func `in`(_ column: String, + _ values: [any RealtimePostgresFilterValue]) -> UntypedFilter +} ``` -Reads like an enum at call site; implemented as a struct with static -factories so `KeyPath` + `V` type binding is preserved. Passing the -wrong value type for a column (`.eq(\.roomId, 42)` when `roomId: UUID`) fails -at compile time. +Both serialize to the same `column=op.value` wire string. The split is +purely about call-site ergonomics — typed gets compile-time checking via +`KeyPath`, untyped pays runtime cost for not requiring conformance. ### 5.3 Register-then-subscribe @@ -562,33 +588,70 @@ pending tokens; consumption happens through the returned `ChannelSubscription`. ```swift public extension Channel { - // Variant-typed factories — token preserves event variant in its type. + // Typed factories — require RealtimeTable, return registrations whose + // variant carries the row type. Filter is a typed `Filter`. func changes( to type: T.Type, where filter: Filter? = nil - ) -> ChangeRegistration + ) -> ChangeRegistration> func inserts( into type: T.Type, where filter: Filter? = nil - ) -> ChangeRegistration + ) -> ChangeRegistration> func updates( of type: T.Type, where filter: Filter? = nil - ) -> ChangeRegistration + ) -> ChangeRegistration> func deletes( from type: T.Type, where filter: Filter? = nil - ) -> ChangeRegistration + ) -> ChangeRegistration> + + // Untyped factories — for types without RealtimeTable. Return registrations + // whose variant carries `JSONValue`. Filter is `UntypedFilter`. + func changes( + schema: String, table: String, filter: UntypedFilter? = nil + ) -> ChangeRegistration> + + func inserts( + schema: String, table: String, filter: UntypedFilter? = nil + ) -> ChangeRegistration> + + func updates( + schema: String, table: String, filter: UntypedFilter? = nil + ) -> ChangeRegistration> + + func deletes( + schema: String, table: String, filter: UntypedFilter? = nil + ) -> ChangeRegistration> } -public struct ChangeRegistration: Sendable { - // Opaque. Holds enough state for the channel to compose the join payload - // and route incoming events to consumers. +/// Variant protocol — each variant is itself generic over the row type and +/// declares the element type of `events(for:)` for that variant. +public protocol ChangeEventVariant: Sendable { + associatedtype Element: Sendable } -public enum AnyEvent: ChangeEventVariant { public typealias Element = PostgresChange } -public enum Insert: ChangeEventVariant { public typealias Element = T } -public enum Update: ChangeEventVariant { public typealias Element = (old: T, new: T) } -public enum Delete: ChangeEventVariant { public typealias Element = T } +public enum Insert: ChangeEventVariant { public typealias Element = T } +public enum Update: ChangeEventVariant { public typealias Element = (old: T, new: T) } +public enum Delete: ChangeEventVariant { public typealias Element = T } +public enum AnyEvent: ChangeEventVariant { public typealias Element = PostgresChange } + +/// Single generic over the variant — variant carries `T`, no extra type +/// parameter on the registration. Same registration type for typed and +/// untyped paths; only the variant's `T` differs. +public struct ChangeRegistration: Sendable { + // Opaque. Holds the table descriptor (typed via RealtimeTable, or raw + // schema+table strings), optional filter, event mask, and routing state. +} + +public extension ChannelSubscription { + /// Single overload, dispatched on the variant. Element type follows from + /// `E.Element` — `T` for inserts/deletes, `(old: T, new: T)` for updates, + /// `PostgresChange` for `AnyEvent`. Works identically for typed and + /// untyped registrations (`T` is `JSONValue` in the untyped case). + func events(for token: ChangeRegistration) + -> AsyncThrowingStream +} public enum PostgresChange: Sendable { case insert(T) @@ -655,19 +718,26 @@ For types without `@RealtimeTable`, the same register-then-subscribe flow applies — only the filter and element types change. ```swift -let token = channel.changes( - schema: "public", table: "messages", event: .delete, - filter: .eq("room_id", id) // UntypedFilter — string column + any value +// Use the dedicated untyped factory (per-event variant) — the schema+table +// arguments are strings; the filter is an `UntypedFilter`. +let deletes = channel.deletes( + schema: "public", table: "messages", + filter: .eq("room_id", id) ) -// token: ChangeRegistration +// deletes: ChangeRegistration> let sub = try await channel.subscribe() -for try await record in sub.events(for: token) { +for try await record in sub.events(for: deletes) { // record: JSONValue — caller decodes manually } ``` +The untyped path produces the same `ChangeRegistration` type the typed +factories return — only the variant's `T` differs (`JSONValue` instead of +your row type). Consumption via `sub.events(for:)` is identical. Tokens +from typed and untyped factories can be mixed freely on the same channel. + --- ## 6. Connection @@ -936,8 +1006,8 @@ so implementors don't re-litigate. | 12 | Single optional `Filter` per postgres_changes subscription | Reflects the Phoenix wire constraint | | 13 | `Filter` is a struct with static factories; reads like an enum | Preserves `KeyPath` + `V` binding in generics | | 14 | `@RealtimeTable` macro for column-name resolution; manual conformance as escape hatch | Type-safe without forcing macros on every type | -| 14a | Postgres changes are **register-then-subscribe**: `channel.changes(...)` returns a `ChangeRegistration` token; `channel.subscribe()` triggers the join with all pending tokens; consumption via `sub.events(for: token)` | Phoenix requires postgres_changes filters in the join payload — the API can't pretend lazy join works for them | -| 14b | Tokens carry the event variant in their type (`Insert`/`Update`/`Delete`/`AnyEvent`); element type follows the variant | Compiler enforces the right consumer shape per token kind | +| 14a | Postgres changes are **register-then-subscribe**: `channel.changes(...)` returns a `ChangeRegistration` token; `channel.subscribe()` triggers the join with all pending tokens; consumption via `sub.events(for: token)` | Phoenix requires postgres_changes filters in the join payload — the API can't pretend lazy join works for them | +| 14b | Variants are themselves generic over the row type (`Insert`/`Update`/`Delete`/`AnyEvent` conforming to `ChangeEventVariant`); registration is `ChangeRegistration` (single generic, variant carries `T`); single `events(for:)` overload dispatched on the variant | Cleaner type signatures than two-param `` and a single overload covers typed and untyped paths | | 14c | Registering after join throws `.cannotRegisterAfterJoin`; tokens are reusable across `leave()` + resubscribe cycles | Honest about the wire; ergonomic across reconnects and cycles | | 14d | `subscribe()` is the **only** join path; no iteration-driven lazy-join | One mental model; no surprises from broadcast iteration silently joining | | 14e | `subscribe()` returns `ChannelSubscription` — the post-join surface for consumption, sending, and presence | Type-level gate: ops requiring "joined" can only be reached from a `ChannelSubscription` value | @@ -948,6 +1018,9 @@ so implementors don't re-litigate. | 14j | `Presence` accessor moves to `ChannelSubscription` (was on `Channel` in earlier draft) | Same gate as broadcast send; track/observe require a live join | | 14k | `PhoenixMessage` is fully raw — exposes `joinRef`, `ref`, `event`, `payload` (JSON or binary). Includes internal `phx_reply`/`phx_close`/`phx_error` frames | Direct iteration is the escape hatch for advanced consumers; SDK consumes the same frames internally for correlation | | 14l | `ChannelSubscription.isAlive` / `state` accessor **deferred** | Callers can mirror `realtime.status` or `channel.state`; can be added additively later | +| 14m | After manual `leave()`, the `ChannelSubscription` value is dead — methods throw `.channelClosed(.userRequested)`; iteration terminates. Reconnects do *not* invalidate subscriptions; only manual leave or `.transportFailure` does | Tightens the lifecycle contract; prevents stashed subscriptions from silently no-op-ing across cycles | +| 14n | Filters split into two types: `Filter` (KeyPath-based, compile-time-checked) and `UntypedFilter` (raw column strings + `any RealtimePostgresFilterValue`). Both serialize to the same `column=op.value` wire string | Untyped path needs raw column strings; typed path needs `RealtimeTable` for `columnName(for:)`; one type can't be both | +| 14o | Untyped factories (`channel.changes(schema:table:filter:)`, `inserts/updates/deletes(schema:table:filter:)`) return `ChangeRegistration>`. Tokens from typed and untyped factories interoperate — same registration type, different variant `T` | Single consumption surface; mix freely on one channel | | 15 | `PresenceHandle` is a regular class; explicit `cancel()`; debug warning on leak | Consistent with `Channel` lifecycle rule | | 16 | Multi-track supported (multiple metas per key) | Matches Phoenix; single-track is the trivial subset | | 17 | Presence key is channel-level only; server-generated if nil | Simpler; per-track keys confuse more than they help | @@ -959,7 +1032,7 @@ so implementors don't re-litigate. | 23 | Self-broadcast is channel-level only (wire constraint) | Don't lie about the wire | | 24 | Replay via `ChannelOptions.broadcast.replay` | Config at creation; not a separate method | | 25 | `Data` payloads bypass encoding; ship as binary frames | Natural Swift affordance | -| 26 | `broadcast` throws `.channelNotJoined` if not joined | Joining is a commitment; one-shot sends go via HTTP | +| 26 | Broadcast send is reachable only from `ChannelSubscription` (compile-time gate); one-shot sends without a subscription go via `realtime.httpBroadcast` | Joining is a commitment; the prior `.channelNotJoined` runtime error is unrepresentable | | 27 | `realtime.httpBroadcast(...)` for one-shot sends; shares `APIKeySource` | Clear separation from WS path | | 28 | Socket opens lazily on first channel join | Zero ceremony for common paths; explicit `connect()` still exists | | 29 | `disconnect()` closes socket, keeps channel cache | Pause/resume, not total teardown | From 9148c70ce5372f2657f0e7b7ef8febe02050857d Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Thu, 7 May 2026 14:47:53 -0300 Subject: [PATCH 6/6] =?UTF-8?q?docs(realtime):=20final=20review=20pass=20?= =?UTF-8?q?=E2=80=94=20fix=20typing=20bugs=20and=20stale=20references?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bugs fixed: - §1.2 Configuration: replace fictional .iso8601 / .automaticDefault statics with proper SDK-provided constants (JSONDecoder.realtimeDefault, LifecyclePolicy.automaticDefault). Add `& Sendable` to the clock type. - §1.2: rename `handleAppLifecycle: Bool` to `lifecycle: LifecyclePolicy` so the §9.3 enum is actually wired in (was dead code before). - §6.1: declare realtime.connect() — was referenced but never declared. - §4: define PresenceKey (typealias for String). - §5.2: add RealtimePostgresFilterValue constraint to .in factory. - Appendix A: pass `me: UUID` through init (Self.currentUserID was undefined); rename ChatMessage to ChatBroadcast and define the type. Inconsistencies fixed: - §2.1: drop "joins lazily on first subscribe" (subscribe IS the join). - §2.3: pipelined re-acquire returns the same Channel actor in unsubscribed state, not a "fresh Channel" (was contradicting Decision 1). - §4: docstring referenced channel.leave(); generalize to "leave on any holder of the topic". - §9.2: stale channel.broadcast(...) → sub.broadcast(...). - §6.2: split disconnect()'s close reason from policy giveup — add CloseReason.clientDisconnected. Reconnection policy applies only to UNEXPECTED closes. - §12: refresh Decisions 12, 13, 22 to reflect the typed/untyped filter split and the broadcast Data overload. Decision 14g merged into 26. Ambiguities tightened: - §6.3: clarify "same token" = byte-equal returned string, no rotation attempted; propagate .authenticationFailed. - §6.4: document `since` (state-entry timestamp) and `latency` semantics. - §7: explain .disconnected vs .channelClosed distinction. - §7: add .unknownToken for events(for:) called with cross-channel tokens. - §2.1 PhoenixMessage.joinRef: document v1 (always nil) vs v2 behavior. No semantic changes — all locked decisions stand. Co-Authored-By: Claude Opus 4.7 --- docs/design/realtime-v3.md | 146 +++++++++++++++++++++++++++---------- 1 file changed, 109 insertions(+), 37 deletions(-) diff --git a/docs/design/realtime-v3.md b/docs/design/realtime-v3.md index 8511db49d..115405bd6 100644 --- a/docs/design/realtime-v3.md +++ b/docs/design/realtime-v3.md @@ -44,7 +44,7 @@ let sub = try await channel.subscribe() // Typed broadcast receive. Task { - for try await msg in sub.broadcasts(of: ChatMessage.self, event: "chat") { + for try await msg in sub.broadcasts(of: ChatBroadcast.self, event: "chat") { render(msg) } } @@ -64,7 +64,8 @@ Task { } // Send (only available on the subscription — Channel has no broadcast method). -try await sub.broadcast(ChatMessage(text: "hi"), as: "chat") +let payload = ChatBroadcast(...) // any Encodable & Sendable +try await sub.broadcast(payload, as: "chat") // Explicit release when done. try await sub.leave() @@ -75,7 +76,7 @@ One-shot send without joining: ```swift try await realtime.httpBroadcast( topic: "room:42", event: "chat", - payload: ChatMessage(text: "hi") + payload: ChatBroadcast(...) ) ``` @@ -130,16 +131,34 @@ public struct Configuration: Sendable { initial: .seconds(1), max: .seconds(30), jitter: 0.2 ) public var disconnectOnEmptyChannelsAfter: Duration = .seconds(50) - public var handleAppLifecycle: Bool = .automaticDefault + public var lifecycle: LifecyclePolicy = .automaticDefault public var protocolVersion: RealtimeProtocolVersion = .v2 - public var clock: any Clock = ContinuousClock() + public var clock: any Clock & Sendable = ContinuousClock() public var headers: HTTPFields = [:] public var logger: (any RealtimeLogger)? = nil - public var decoder: JSONDecoder = .iso8601 - public var encoder: JSONEncoder = .iso8601 + public var decoder: JSONDecoder = .realtimeDefault // ISO 8601 dates + public var encoder: JSONEncoder = .realtimeDefault // ISO 8601 dates public static let `default` = Configuration() } + +extension LifecyclePolicy { + /// `.automatic` on iOS/macOS/tvOS/visionOS; `.manual` elsewhere + /// (including watchOS and Linux, where lifecycle observation is + /// not supported). + public static let automaticDefault: LifecyclePolicy +} + +extension JSONDecoder { + /// SDK-provided decoder configured with `.iso8601` date strategy. + /// Replace via `Configuration.decoder` for custom needs. + public static let realtimeDefault: JSONDecoder +} + +extension JSONEncoder { + /// SDK-provided encoder configured with `.iso8601` date strategy. + public static let realtimeDefault: JSONEncoder +} ``` `disconnectOnEmptyChannelsAfter` is an idle‑socket timeout: when the last @@ -157,8 +176,8 @@ public extension Realtime { /// Returns the `Channel` for `topic`. Shared by topic — two callers asking /// for the same topic receive the same underlying actor. /// - /// The channel joins lazily on the first subscribe. The caller must call - /// `leave()` to unsubscribe; `deinit` does NOT unsubscribe. + /// The channel does not join the server until `subscribe()` is called. The + /// caller must call `leave()` to unsubscribe; `deinit` does NOT unsubscribe. func channel( _ topic: String, configure: (inout ChannelOptions) -> Void = { _ in } @@ -220,8 +239,10 @@ public struct ChannelSubscription: AsyncSequence, Sendable { } public struct PhoenixMessage: Sendable { - /// Phoenix join reference correlating this frame to its `phx_join`. `nil` - /// for frames that predate the current join (rare). + /// Phoenix join reference correlating this frame to its `phx_join`. Always + /// `nil` when the channel is configured for protocol v1 (4-tuple frames + /// have no joinRef field). Under v2: `nil` for frames that predate the + /// current join (rare). public let joinRef: String? /// Phoenix message reference for request/reply correlation. Set on @@ -320,8 +341,9 @@ reflects the effective options. - `leave()` is **await-to-ack**: it returns only after the server ACKs `phx_leave`. On transport failure or timeout, it throws. - A **pipelined re-acquire** is safe: if `realtime.channel("x")` is called - while a leave for `"x"` is in flight, the caller gets a fresh `Channel` - whose join is queued behind the pending leave. Same-topic churn is + while a leave for `"x"` is in flight, the caller gets the same `Channel` + actor (topic identity, Decision 1) — now in `unsubscribed` state — and the + next `subscribe()` is queued behind the pending leave. Same-topic churn is transparent. > **Topic ownership convention.** Because `leave()` is global, coincidental @@ -343,6 +365,7 @@ public enum ChannelState: Sendable, Equatable { public enum CloseReason: Sendable, Equatable { case userRequested // someone called leave() + case clientDisconnected // someone called realtime.disconnect() case serverClosed(code: Int, message: String?) case timeout case unauthorized @@ -461,8 +484,9 @@ public struct Presence: Sendable { /// key (Phoenix multi-meta semantics). /// /// The handle must be explicitly `cancel()`ed to untrack. Dropping the - /// handle without cancelling does NOT untrack — but when `channel.leave()` - /// is called, all outstanding tracks are implicitly torn down server-side. + /// handle without cancelling does NOT untrack — but when `leave()` is + /// called on any holder of the topic, all outstanding tracks are + /// implicitly torn down server-side. /// /// Debug warning fires if a handle is deinited without `cancel()` while /// the channel is still joined. @@ -481,6 +505,10 @@ public struct Presence: Sendable { ) -> AsyncStream> } +/// The presence key string the server attaches to each meta. Comes from +/// `ChannelOptions.presenceKey` if set, otherwise server-generated. +public typealias PresenceKey = String + public struct PresenceState: Sendable { public let active: [PresenceKey: [T]] public let lastDiff: PresenceDiff? @@ -556,7 +584,9 @@ public struct Filter: Sendable { public static func gte(…) -> Filter public static func lt(…) -> Filter public static func lte(…) -> Filter - public static func `in`(_ column: KeyPath, _ values: [V]) -> Filter + public static func `in`( + _ column: KeyPath, _ values: [V] + ) -> Filter } /// Untyped filter for cases where the row type cannot or does not conform @@ -649,6 +679,10 @@ public extension ChannelSubscription { /// `E.Element` — `T` for inserts/deletes, `(old: T, new: T)` for updates, /// `PostgresChange` for `AnyEvent`. Works identically for typed and /// untyped registrations (`T` is `JSONValue` in the untyped case). + /// + /// Passing a token that was created on a different channel is a + /// programmer error: the iterator throws `.unknownToken` on first + /// iteration. (`Channel` actor identity is captured in the token.) func events(for token: ChangeRegistration) -> AsyncThrowingStream } @@ -744,14 +778,23 @@ from typed and untyped factories can be mixed freely on the same channel. ### 6.1 Lazy open +```swift +public extension Realtime { + /// Opens the WebSocket without joining any channel. Useful for pre-warming + /// or surfacing auth errors before the first `subscribe()`. Idempotent: + /// calling on an already-connected client returns immediately. Concurrent + /// calls coalesce around a single in-flight connect. + func connect() async throws(RealtimeError) +} +``` + The WebSocket opens lazily on the first `channel.subscribe()` call. There is no iteration-driven lazy-join in v3 — the only path from "no socket" to "joined channel" is an explicit `subscribe()`. `httpBroadcast` does not open the socket. -Explicit `realtime.connect()` is available for callers who want to pre-warm -or surface auth errors early without joining a channel. Calls are idempotent -— a second `connect()` on an already-connected client returns immediately. +`realtime.connect()` is the explicit pre-warm path; it does not join any +channel. ### 6.2 Disconnect @@ -759,15 +802,16 @@ or surface auth errors early without joining a channel. Calls are idempotent public extension Realtime { /// Closes the socket, awaits close ACK. Does NOT evict the channel cache /// or call leave() on any channel. Streams throw - /// `.channelClosed(.transportFailure)`; subsequent operations trigger a + /// `.channelClosed(.clientDisconnected)`; subsequent operations trigger a /// fresh connect + rejoin. func disconnect() async } ``` -After a manual `disconnect()`, the `ReconnectionPolicy` does NOT auto-reopen. -The next channel operation (subscribe, send, or explicit `connect()`) -triggers a fresh connect. +After a manual `disconnect()`, the `ReconnectionPolicy` does NOT auto-reopen +— the policy applies only to *unexpected* closes (transport failure, server +hangup). The next channel operation (`subscribe()`, send via a re-acquired +subscription, or explicit `connect()`) triggers a fresh connect. ### 6.3 Mid-session token rotation @@ -780,8 +824,10 @@ public extension Realtime { ``` **Reactive path.** If the server rejects an operation with `token_expired`, -the SDK calls `APIKeySource.dynamic()` once and retries. If the same token -comes back, it propagates `.authenticationFailed`. +the SDK calls `APIKeySource.dynamic()` once and retries. If the returned +string is byte-equal to the one that was just rejected, the SDK assumes +the caller has not actually rotated and propagates `.authenticationFailed` +without retrying. **If `dynamic()` throws:** propagates as `.authenticationFailed(underlying:)`. Connection enters `.closed(.unauthorized)`. The `ReconnectionPolicy` does @@ -806,8 +852,11 @@ public struct ConnectionStatus: Sendable { case closed(CloseReason) } public let state: State + /// When the *current* `state` was entered. Reset on every state transition. public let since: Date - public let latency: Duration? // last heartbeat RTT + /// Last successful heartbeat round-trip time, if any. `nil` before the + /// first heartbeat reply or after the connection drops. + public let latency: Duration? } ``` @@ -825,6 +874,7 @@ public enum RealtimeError: Error, Sendable { case channelJoinRejected(reason: String) case channelClosed(CloseReason) case cannotRegisterAfterJoin // postgres_changes registration after join (§5.3) + case unknownToken // events(for:) called with a token from another channel (§5.3) case authenticationFailed(reason: String, underlying: (any Error & Sendable)?) case tokenExpired @@ -846,6 +896,19 @@ Single flat enum. Swift's `CancellationError` is caught internally and re-thrown as `.cancelled` so call sites exhaustively handle one type. Underlying errors are preserved as `any Error & Sendable` for debugging. +**`.disconnected` vs `.channelClosed`.** `.disconnected` is thrown by +*operations attempted while the socket is down* — sends, broadcast acks, +explicit `connect()` failures during reconnect. The channel itself may +still be subscribed in the SDK; just not reachable on the wire right now. +`.channelClosed(reason)` is thrown by *streams whose channel has actually +terminated* — manual leave, server-initiated close, transport giveup. +Once a stream throws `.channelClosed`, it ends; `.disconnected` is +recoverable on reconnect. + +**`.cannotRegisterAfterJoin`.** Thrown by `channel.changes(...)`, +`channel.inserts(...)`, etc. when the channel has already joined. Tokens +must be registered before the first `subscribe()` returns. + --- ## 8. Transport and Testing @@ -913,7 +976,7 @@ public struct ReconnectionPolicy: Sendable { state on rejoin. Observers see the re-synced state naturally. - **Postgres change subscriptions are restored.** Filters re-register on join. -- **In-flight sends throw immediately.** `try await channel.broadcast(...)` +- **In-flight sends throw immediately.** `try await sub.broadcast(...)` during an outage throws `.disconnected` — no queuing. - **On give-up.** Channel streams throw `.channelClosed(.transportFailure)`, the channel cache evicts affected entries, `channel.state` transitions @@ -1003,8 +1066,8 @@ so implementors don't re-litigate. | 9 | `APIKeySource.dynamic(_:)` for fetch; `updateToken(_:)` for push | No JWT parsing in the SDK | | 10 | On `token_expired`: retry once, then propagate | Tolerates race between rotation and notify | | 11 | `dynamic()` throwing does NOT trigger `ReconnectionPolicy` | Auth recovery is caller-owned | -| 12 | Single optional `Filter` per postgres_changes subscription | Reflects the Phoenix wire constraint | -| 13 | `Filter` is a struct with static factories; reads like an enum | Preserves `KeyPath` + `V` binding in generics | +| 12 | Single optional filter per postgres_changes subscription (typed `Filter` or `UntypedFilter`); see Decision 14n for the type split | Reflects the Phoenix wire constraint (one `column=op.value` per entry) | +| 13 | Both `Filter` and `UntypedFilter` are structs with static factories; read like enums at call site | Typed path preserves `KeyPath` + `V` binding; untyped path is a sibling type for raw column strings | | 14 | `@RealtimeTable` macro for column-name resolution; manual conformance as escape hatch | Type-safe without forcing macros on every type | | 14a | Postgres changes are **register-then-subscribe**: `channel.changes(...)` returns a `ChangeRegistration` token; `channel.subscribe()` triggers the join with all pending tokens; consumption via `sub.events(for: token)` | Phoenix requires postgres_changes filters in the join payload — the API can't pretend lazy join works for them | | 14b | Variants are themselves generic over the row type (`Insert`/`Update`/`Delete`/`AnyEvent` conforming to `ChangeEventVariant`); registration is `ChangeRegistration` (single generic, variant carries `T`); single `events(for:)` overload dispatched on the variant | Cleaner type signatures than two-param `` and a single overload covers typed and untyped paths | @@ -1012,7 +1075,7 @@ so implementors don't re-litigate. | 14d | `subscribe()` is the **only** join path; no iteration-driven lazy-join | One mental model; no surprises from broadcast iteration silently joining | | 14e | `subscribe()` returns `ChannelSubscription` — the post-join surface for consumption, sending, and presence | Type-level gate: ops requiring "joined" can only be reached from a `ChannelSubscription` value | | 14f | `ChannelSubscription` conforms to `AsyncSequence` with `Element = PhoenixMessage` | Untyped raw feed available without an extra method; typed methods refine for normal use | -| 14g | `Channel.broadcast(_:as:)` (sending) is removed; sending lives only on `ChannelSubscription` | Compile-time gate replaces the v3-draft `.channelNotJoined` runtime error | +| 14g | (merged into Decision 26) | — | | 14h | Multiple `subscribe()` calls return equivalent subscriptions sharing one backing state | Topic identity (Decision 1) extends to subscriptions | | 14i | Subscription drop without `leave()` does nothing (debug warning); leave is global as in Decision 3 | Consistency with channel rules; no auto-leave footguns under topic sharing | | 14j | `Presence` accessor moves to `ChannelSubscription` (was on `Channel` in earlier draft) | Same gate as broadcast send; track/observe require a live join | @@ -1028,7 +1091,7 @@ so implementors don't re-litigate. | 19 | `withChannel` dropped entirely | Dangerous under global-leave semantics; 3-line explicit pattern is clearer | | 20 | Flat `RealtimeError` enum; cancellation folded as `.cancelled` | Simpler call sites than grouped or union-throws | | 21 | Underlying errors preserved as `any Error & Sendable` | Debug value outweighs Equatable/Codable loss | -| 22 | Single `broadcast` method; ack at channel-level config | Uniform call site | +| 22 | Single `broadcast` call site (with a `Data` overload for binary payloads, Decision 25); ack at channel-level config | Uniform call site | | 23 | Self-broadcast is channel-level only (wire constraint) | Don't lie about the wire | | 24 | Replay via `ChannelOptions.broadcast.replay` | Config at creation; not a separate method | | 25 | `Data` payloads bypass encoding; ship as binary frames | Natural Swift affordance | @@ -1069,6 +1132,13 @@ struct Message: Codable, Sendable, Identifiable { var createdAt: Date } +/// Wire payload for the "chat" broadcast event — distinct from `Message`, +/// which is the persisted postgres row consumed via `events(for:)`. +struct ChatBroadcast: Codable, Sendable { + let authorId: UUID + let text: String +} + struct UserPresence: Codable, Sendable { let userId: UUID let status: Status @@ -1080,6 +1150,7 @@ final class ChatRoomModel { private let realtime: Realtime private let channel: Channel private let roomId: UUID + private let me: UUID private var runTask: Task? private var subscription: ChannelSubscription? private var trackHandle: PresenceHandle? @@ -1088,21 +1159,22 @@ final class ChatRoomModel { var onlineUsers: [UUID: UserPresence] = [:] var connection: ConnectionStatus.State = .idle - init(realtime: Realtime, roomId: UUID) { + init(realtime: Realtime, roomId: UUID, me: UUID) { self.realtime = realtime self.roomId = roomId + self.me = me self.channel = realtime.channel("chat:room:\(roomId)") { - $0.presenceKey = "user-\(Self.currentUserID)" + $0.presenceKey = "user-\(me)" } } - func start(me: UUID) { + func start() { // Register postgres tokens BEFORE subscribe — they bake into phx_join. let messageInserts = channel.inserts( into: Message.self, where: .eq(\.roomId, roomId) ) - runTask = Task { [channel, realtime, messageInserts, weak self] in + runTask = Task { [channel, realtime, messageInserts, me, weak self] in do { // Single explicit join captures the registration above. let sub = try await channel.subscribe() @@ -1151,10 +1223,10 @@ final class ChatRoomModel { /// Broadcast through the active subscription. Type-level gate: cannot /// be called before `subscription` is set. One server-side subscription; /// one round-trip. - func send(_ text: String, from author: UUID) async throws(RealtimeError) { + func send(_ text: String) async throws(RealtimeError) { guard let subscription else { return } try await subscription.broadcast( - ChatMessage(authorId: author, text: text), + ChatBroadcast(authorId: me, text: text), as: "chat" ) }