|
| 1 | +# Agent Infrastructure |
| 2 | + |
| 3 | +The Agent's internal machinery: worker lifecycle, command dispatch, message delivery, subscription tracking, operation suspension, protocol client management, and dual-backend store. These cross-module patterns are not visible from any single module spec. |
| 4 | + |
| 5 | +This document covers the "big agent" (`Agent.hs` + `Agent/Client.hs`) used in client applications. The "small agent" (`SMPClientAgent`) used in routers is documented in [clients.md](../clients.md). |
| 6 | + |
| 7 | +For per-module details: [Agent](../modules/Simplex/Messaging/Agent.md) · [Agent Client](../modules/Simplex/Messaging/Agent/Client.md) · [Store Interface](../modules/Simplex/Messaging/Agent/Store/Interface.md) · [NtfSubSupervisor](../modules/Simplex/Messaging/Agent/NtfSubSupervisor.md) · [XFTP Agent](../modules/Simplex/FileTransfer/Agent.md). For the component diagram, see [agent.md](../agent.md). |
| 8 | + |
| 9 | +- [Worker framework](#worker-framework) |
| 10 | +- [Async command processing](#async-command-processing) |
| 11 | +- [Message delivery](#message-delivery) |
| 12 | +- [Subscription tracking](#subscription-tracking) |
| 13 | +- [Operation suspension cascade](#operation-suspension-cascade) |
| 14 | +- [SessionVar lifecycle](#sessionvar-lifecycle) |
| 15 | +- [Dual-backend store](#dual-backend-store) |
| 16 | + |
| 17 | +--- |
| 18 | + |
| 19 | +## Worker framework |
| 20 | + |
| 21 | +**Source**: [Agent/Client.hs](../../src/Simplex/Messaging/Agent/Client.hs), [Agent/Env/SQLite.hs](../../src/Simplex/Messaging/Agent/Env/SQLite.hs) (Worker type) |
| 22 | + |
| 23 | +All agent background processing - async commands, message delivery, notification workers, XFTP workers - uses a shared worker infrastructure defined in `Agent/Client.hs`. |
| 24 | + |
| 25 | +**Create-or-reuse**: `getAgentWorker` atomically checks a `TMap` for an existing worker keyed by the work item (connection+server, send queue address, etc.). If absent, creates a new `Worker` with a unique monotonic `workerId` from `workerSeq` and inserts it. If present and `hasWork=True`, signals the existing worker via `tryPutTMVar doWork ()`. |
| 26 | + |
| 27 | +**Fork and run**: `runWorkerAsync` uses bracket on the worker's `action` TMVar. If the taken value is `Nothing`, the worker is idle - start it. If `Just _`, it's already running - put it back and return. The `action` TMVar holds `Just (Weak ThreadId)` to avoid preventing GC of the worker thread. |
| 28 | + |
| 29 | +**Task retrieval race prevention**: `withWork` clears the `doWork` flag *before* calling `getWork` (not after). This prevents a race: query finds nothing → another thread adds work + signals → worker clears flag (losing the signal). By clearing first, any signal that arrives during the query is preserved. |
| 30 | + |
| 31 | +**Error classification**: `withWork` distinguishes two failure modes: |
| 32 | +- *Work-item error* (`isWorkItemError`): the task itself is broken (likely recurring). Worker stops and sends `CRITICAL False`. |
| 33 | +- *Store error*: transient database issue. Worker re-signals `doWork` and reports `INTERNAL` (retry may succeed). |
| 34 | + |
| 35 | +**Restart rate limiting**: On worker exit, `restartOrDelete` checks the `restarts` counter against `maxWorkerRestartsPerMin`. Under the limit: reset action, re-signal, restart. Over the limit: delete the worker from the map and send `CRITICAL True` (escalation to the application). A restart only proceeds if the `workerId` in the map still matches the current worker - a stale restart from a replaced worker is a no-op. |
| 36 | + |
| 37 | +**Consumers**: Four families use this framework: |
| 38 | +- Async command workers - keyed by `(ConnId, Maybe SMPServer)`, in `asyncCmdWorkers` TMap |
| 39 | +- Delivery workers - keyed by `SndQAddr`, in `smpDeliveryWorkers` TMap, paired with a `TMVar ()` retry lock |
| 40 | +- NTF workers - three pools (`ntfWorkers` per NTF server, `ntfSMPWorkers` per SMP server, `ntfTknDelWorkers` for token deletion) in `NtfSubSupervisor` |
| 41 | +- XFTP workers - three worker types (rcv, snd, del) with TMVar-based connection sharing |
| 42 | + |
| 43 | +--- |
| 44 | + |
| 45 | +## Async command processing |
| 46 | + |
| 47 | +**Source**: [Agent.hs](../../src/Simplex/Messaging/Agent.hs), [Agent/Protocol.hs](../../src/Simplex/Messaging/Agent/Protocol.hs) (command types), [Agent/Store.hs](../../src/Simplex/Messaging/Agent/Store.hs) (internal command types) |
| 48 | + |
| 49 | +Async commands handle state transitions that require network calls but shouldn't block the API thread: securing queues, deleting old queues during rotation, acknowledging messages. The dispatch loop `runCommandProcessing` runs one worker per `(ConnId, Maybe SMPServer)` key. |
| 50 | + |
| 51 | +**Enqueueing**: API functions call `enqueueCommand`, which persists the command to the `commands` table (crash-safe) and spawns/wakes the worker via `getAsyncCmdWorker`. On agent startup, `resumeAllCommands` fetches all pending commands grouped by connection+server and signals their workers. |
| 52 | + |
| 53 | +**Command types**: Two categories share the same dispatch loop: |
| 54 | +- *Client commands* (`AClientCommand`): `NEW`, `JOIN`, `LET` (allow connection), `ACK`, `LSET`/`LGET` (set/get connection link data), `SWCH` (switch queue), `DEL`. Triggered by application API calls. |
| 55 | +- *Internal commands* (`AInternalCommand`): `ICAck` (ack to router), `ICAckDel` (ack + delete local message), `ICAllowSecure`/`ICDuplexSecure` (secure after confirmation), `ICQSecure` (secure queue during switch), `ICQDelete` (delete old queue after switch), `ICDeleteConn` (delete connection), `ICDeleteRcvQueue` (delete specific receive queue). Generated *during* message processing to handle state transitions asynchronously. |
| 56 | + |
| 57 | +**Retry and movement**: `tryMoveableCommand` wraps execution with `withRetryInterval`. On `temporaryOrHostError`, it retries with backoff. On cross-server errors (e.g., queue moved to different router), it updates the command's server field in the store (`CCMoved`) and retries against the new server. |
| 58 | + |
| 59 | +**Locking**: State-sensitive commands use `tryWithLock` / `tryMoveableWithLock`, which acquire `withConnLock` before execution. This serializes operations on the same connection, preventing races between concurrent command processing and message receipt. |
| 60 | + |
| 61 | +**Event overflow**: Events are written directly to `subQ` if there is room. When `subQ` is full, events overflow into a local `pendingCmds` list and are flushed to `subQ` after the command completes, providing backpressure handling. |
| 62 | + |
| 63 | +--- |
| 64 | + |
| 65 | +## Message delivery |
| 66 | + |
| 67 | +**Source**: [Agent.hs](../../src/Simplex/Messaging/Agent.hs), [Agent/RetryInterval.hs](../../src/Simplex/Messaging/Agent/RetryInterval.hs) |
| 68 | + |
| 69 | +Message delivery uses a split-phase encryption design: the ratchet advances in the API thread (serialized), while the actual body encryption happens in the per-queue delivery worker (parallel). This avoids ratchet lock contention across queues. |
| 70 | + |
| 71 | +**Phase 1 - API thread** (`enqueueMessageB`): |
| 72 | +1. Encode the agent message with `internalSndId` + `prevMsgHash` (for the receiver's integrity chain) |
| 73 | +2. Call `agentRatchetEncryptHeader` - advances the double ratchet, produces a message encryption key (MEK), padded length, and PQ encryption status |
| 74 | +3. Store `SndMsg` with `SndMsgPrepData` (MEK, paddedLen, sndMsgBodyId) in the database |
| 75 | +4. Create `SndMsgDelivery` record for each send queue |
| 76 | +5. Increment `msgDeliveryOp.opsInProgress` (for suspension tracking) |
| 77 | +6. Signal delivery workers via `getDeliveryWorker` |
| 78 | + |
| 79 | +**Phase 2 - delivery worker** (`runSmpQueueMsgDelivery`): |
| 80 | +1. `throwWhenNoDelivery` - kills the worker thread if the queue's address has been removed from `smpDeliveryWorkers` (prevents delivery to queues replaced during switch) |
| 81 | +2. `getPendingQueueMsg` - fetches the next pending message from the store, resolving the `sndMsgBodyId` reference into the actual message body and constructing `PendingMsgPrepData` |
| 82 | +3. Re-encode the message with `internalSndId`/`prevMsgHash`, then `rcEncryptMsg` to encrypt with the stored MEK (no ratchet access needed) |
| 83 | +4. `sendAgentMessage` - per-queue encrypt + SEND to the router |
| 84 | + |
| 85 | +**Connection info messages** (`AM_CONN_INFO`, `AM_CONN_INFO_REPLY`) skip split-phase encryption entirely - they are sent as plaintext confirmation bodies via `sendConfirmation`. |
| 86 | + |
| 87 | +**Retry with dual intervals**: Delivery uses `withRetryLock2`, which maintains two independent retry clocks (slow and fast). A background thread sleeps for the current interval, then signals the delivery worker via `tryPutTMVar`. When the router sends `QCONT` (queue buffer cleared), the agent calls `tryPutTMVar retryLock ()` to wake the delivery thread immediately, avoiding unnecessary delay. |
| 88 | + |
| 89 | +**Error handling**: |
| 90 | +- `SMP QUOTA` - switch to slow retry, don't penalize (backpressure from router) |
| 91 | +- `SMP AUTH` - permanent failure: for data messages, notify and delete; for handshake messages, report connection error; for queue-switch messages, report queue error |
| 92 | +- `temporaryOrHostError` - retry with backoff |
| 93 | +- Other errors - report to application, delete command |
| 94 | + |
| 95 | +--- |
| 96 | + |
| 97 | +## Subscription tracking |
| 98 | + |
| 99 | +**Source**: [Agent/TSessionSubs.hs](../../src/Simplex/Messaging/Agent/TSessionSubs.hs), [Agent/Client.hs](../../src/Simplex/Messaging/Agent/Client.hs) |
| 100 | + |
| 101 | +The agent tracks per-queue subscription state in `TSessionSubs` (defined in `Agent/TSessionSubs.hs`), keyed by `SMPTransportSession = (UserId, SMPServer, Maybe ByteString)` where the `ByteString` carries the entity ID in entity-session mode or `Nothing` in shared mode. Each transport session holds: |
| 102 | + |
| 103 | +``` |
| 104 | +SessSubs |
| 105 | +├── subsSessId :: TVar (Maybe SessionId) -- TLS session ID |
| 106 | +├── activeSubs :: TMap RecipientId RcvQueueSub |
| 107 | +├── pendingSubs :: TMap RecipientId RcvQueueSub |
| 108 | +├── activeServiceSub :: TVar (Maybe ServiceSub) |
| 109 | +└── pendingServiceSub :: TVar (Maybe ServiceSub) |
| 110 | +``` |
| 111 | + |
| 112 | +**State machine**: Subscriptions move between three states: |
| 113 | + |
| 114 | +- **Pending → Active**: After subscription RPC succeeds, `addActiveSub'` promotes the queue - but only if the returned session ID matches the stored TLS session ID (`Just sessId == sessId'`). On mismatch (TLS reconnected between RPC send and response), the subscription is silently added to pending instead. No exception - concurrent resubscription paths handle this naturally. |
| 115 | + |
| 116 | +- **Active → Pending**: When `setSessionId` is called with a *different* session ID (TLS reconnect), all active subscriptions are atomically demoted to pending. Session ID is updated to the new value. |
| 117 | + |
| 118 | +- **Pending → Removed**: `failSubscriptions` moves permanently-failed queues (non-temporary SMP errors) to `removedSubs`. The removal is tracked for diagnostic reporting via `getSubscriptions`. |
| 119 | + |
| 120 | +**Service-associated queues**: Queues with `serviceAssoc=True` are *not* added to `activeSubs` individually. Instead, the service subscription's count is incremented and its `idsHash` XOR-accumulates the queue's hash. The router tracks individual queues via the service subscription; the agent only tracks the aggregate. Consequence: `hasActiveSub(rId)` returns `False` for service-associated queues - callers must check the service subscription separately. |
| 121 | + |
| 122 | +**Disconnect cleanup** (`smpClientDisconnected`): |
| 123 | +1. `removeSessVar` with CAS check (monotonic `sessionVarId` prevents stale callbacks from removing newer clients) |
| 124 | +2. `setSubsPending` - demote active→pending, filtered by matching `SessionId` only |
| 125 | +3. Delete proxied relay sessions created by this client |
| 126 | +4. Fire `DISCONNECT`, `DOWN` (affected connections), `SERVICE_DOWN` (if service sub existed) |
| 127 | +5. Release GET locks for affected queues |
| 128 | +6. Resubscribe: either spawn `resubscribeSMPSession` worker (entity-session mode) or directly resubscribe queues and services (other modes) |
| 129 | + |
| 130 | +**Resubscription worker**: Per-transport-session worker with exponential backoff. Loops until `pendingSubs` and `pendingServiceSub` are both empty. Uses `waitForUserNetwork` with bounded wait - proceeds even without network (prevents indefinite blocking). Worker self-cleans via `removeSessVar` on exit. |
| 131 | + |
| 132 | +**UP event deduplication**: After a batch subscription RPC, `UP` events are emitted only for connections that were *not* already in `activeSubs` before the batch. This prevents duplicate notifications for already-subscribed connections. |
| 133 | + |
| 134 | +--- |
| 135 | + |
| 136 | +## Operation suspension cascade |
| 137 | + |
| 138 | +**Source**: [Agent/Client.hs](../../src/Simplex/Messaging/Agent/Client.hs) |
| 139 | + |
| 140 | +Five `AgentOpState` TVars track in-flight operations for graceful shutdown. Each holds `{opSuspended :: Bool, opsInProgress :: Int}`. |
| 141 | + |
| 142 | +**Cascade ordering**: |
| 143 | +``` |
| 144 | +AONtfNetwork (independent - no cascading) |
| 145 | +
|
| 146 | +AORcvNetwork → AOMsgDelivery → AOSndNetwork → AODatabase |
| 147 | +``` |
| 148 | + |
| 149 | +**Mechanics**: `endAgentOperation` decrements `opsInProgress`. If the count reaches zero and the operation is suspended, it calls the cascade action: `AORcvNetwork` suspends `AOMsgDelivery`, which suspends `AOSndNetwork`, which suspends `AODatabase`. At the leaf (`AODatabase`), `notifySuspended` writes `SUSPENDED` to `subQ` and sets `agentState = ASSuspended`. |
| 150 | + |
| 151 | +**Blocking**: `beginAgentOperation` blocks (STM `retry`) while `opSuspended == True`. This means new operations of a suspended type cannot start - they wait until the operation is resumed. `agentOperationBracket` provides structured bracketing (begin on entry, end on exit). |
| 152 | + |
| 153 | +**Two wait modes**: |
| 154 | +- `waitWhileSuspended` - blocks only during `ASSuspended`, proceeds during `ASSuspending` (allows in-flight operations to complete) |
| 155 | +- `waitUntilForeground` - blocks during both `ASSuspending` and `ASSuspended` (stricter, for operations that need full foreground) |
| 156 | + |
| 157 | +**Usage**: `withStore` brackets all database access with `AODatabase`. Message delivery uses `AOSndNetwork` + `AOMsgDelivery`. Receive processing uses `AORcvNetwork`. This ensures that suspending receive processing cascades through delivery to database, and nothing touches the database after all operations drain. |
| 158 | + |
| 159 | +--- |
| 160 | + |
| 161 | +## SessionVar lifecycle |
| 162 | + |
| 163 | +**Source**: [Agent/Client.hs](../../src/Simplex/Messaging/Agent/Client.hs) |
| 164 | + |
| 165 | +Protocol client connections (SMP, XFTP, NTF) use a lazy singleton pattern via `SessionVar` - a `TMVar` in a `TMap` keyed by transport session. |
| 166 | + |
| 167 | +**Connection**: `getSessVar` atomically checks the TMap. Returns `Left newVar` (absent - caller must connect) or `Right existingVar` (present - wait for result). `newProtocolClient` wraps the connection attempt: on success, fills the TMVar with `Right client` and writes `CONNECT` event; on failure, fills with `Left (error, maybeRetryTime)` and re-throws. |
| 168 | + |
| 169 | +**Error caching**: Failed connections cache the error with an expiry timestamp based on `persistErrorInterval`. Future attempts during the interval immediately receive the cached error without reconnecting - this prevents connection storms when a router is down. When `persistErrorInterval == 0`, the SessionVar is removed immediately on failure (fresh connection on next attempt). |
| 170 | + |
| 171 | +**Compare-and-swap**: Each SessionVar has a monotonic `sessionVarId` from `workerSeq`. `removeSessVar` only removes if the `sessionVarId` matches the current map entry. This prevents a stale disconnect callback (from an old client) from removing a newer client that connected after the old one disconnected. |
| 172 | + |
| 173 | +**Service credential synchronization** (`updateClientService`): On SMP reconnect, the agent reconciles service credentials between client and router state - updating, creating, or removing service associations as needed. Router version downgrade (router loses service support) triggers client-side service deletion. |
| 174 | + |
| 175 | +**XFTP special case**: `getProtocolServerClient` ignores the caller's `NetworkRequestMode` parameter for XFTP, always using `NRMBackground` timing. XFTP connections always use background retry timing regardless of the caller's request. |
| 176 | + |
| 177 | +--- |
| 178 | + |
| 179 | +## Dual-backend store |
| 180 | + |
| 181 | +**Source**: [Agent/Store/SQLite.hs](../../src/Simplex/Messaging/Agent/Store/SQLite.hs), [Agent/Store/Postgres.hs](../../src/Simplex/Messaging/Agent/Store/Postgres.hs), [Agent/Store/AgentStore.hs](../../src/Simplex/Messaging/Agent/Store/AgentStore.hs) |
| 182 | + |
| 183 | +The agent supports SQLite and PostgreSQL via CPP compilation flags (`#if defined(dbPostgres)`). Three wrapper modules (`Interface.hs`, `Common.hs`, `DB.hs`) re-export the appropriate backend. A single binary compiles with one active backend. |
| 184 | + |
| 185 | +**Key behavioral differences**: |
| 186 | + |
| 187 | +| Aspect | SQLite | PostgreSQL | |
| 188 | +|--------|--------|------------| |
| 189 | +| Row locking | Single-writer model (no locking needed) | `FOR UPDATE` on reads preceding writes | |
| 190 | +| Batch queries | Per-row `forM` loops | `IN ?` with `In` wrapper | |
| 191 | +| Constraint violations | `SQL.ErrorConstraint` pattern match | `constraintViolation` function | |
| 192 | +| Transaction savepoints | Not needed | Used in `createWithRandomId'` (failed statement aborts entire transaction without them) | |
| 193 | +| Busy/locked errors | `ErrorBusy`/`ErrorLocked` → `SEDatabaseBusy` → `CRITICAL True` | All SQL errors → `SEInternal` | |
| 194 | + |
| 195 | +**Store access bracketing**: `withStore` wraps all database operations with `agentOperationBracket AODatabase`, connecting the store to the suspension cascade. `withStoreBatch` / `withStoreBatch'` run multiple operations in a single transaction with per-operation error catching. |
| 196 | + |
| 197 | +**Known bug**: `checkConfirmedSndQueueExists_` uses `#if defined(dpPostgres)` (typo - should be `dbPostgres`), so the `FOR UPDATE` clause is never included on either backend. |
0 commit comments