Skip to content

Commit 8b52f50

Browse files
refactor to fused batching
1 parent 4b132a1 commit 8b52f50

File tree

6 files changed

+124
-160
lines changed

6 files changed

+124
-160
lines changed

plans/20260401_01_batch_queue_associations.md

Lines changed: 72 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -4,187 +4,123 @@ When a batch of SUB or NSUB commands arrives from a service client, each command
44

55
## Goal
66

7-
Reduce to 1 DB query per batch, using `UPDATE ... RETURNING recipient_id` to identify which queues were actually updated.
7+
Reduce to at most 2 DB queries per batch (one for rcv associations, one for ntf associations), using `UPDATE ... RETURNING recipient_id` to identify which queues were actually updated.
88

9-
`clntServiceId` is per-client (not per-queue), so all queues in a batch that need association changes share the same target value. The per-queue decision is binary: update or not.
9+
Also fuse message pre-fetch and association batching into a single batch preparation step with a clean contract.
1010

11-
```haskell
12-
type NeedsAssocUpdate = Bool
13-
```
14-
15-
## Current code
16-
17-
### `sharedSubscribeQueue` (Server.hs:1757-1805)
18-
19-
Called per SUB/NSUB command. Based on `clntServiceId` and `queueServiceId` from `QueueRec`:
20-
21-
- `clntServiceId == Just sId`, `queueServiceId == Just sId` (line 1763): Already associated. No DB write. STM + stats only.
22-
23-
- `clntServiceId == Just sId`, `queueServiceId /= Just sId` (line 1772): New/updated association. Calls `setQueueService q party (Just sId)` - **DB WRITE**. Then STM + stats.
24-
25-
- `clntServiceId == Nothing`, `queueServiceId == Just _` (line 1787): Removing association. Calls `setQueueService q party Nothing` - **DB WRITE**. Then STM + stats.
26-
27-
- `clntServiceId == Nothing`, `queueServiceId == Nothing` (line 1795): No service. No DB write. STM only.
28-
29-
### Where `sharedSubscribeQueue` is called from
30-
31-
Only from the `client` function's `foldrM` loop in `Server.hs` (via `processCommand` -> `subscribeQueueAndDeliver` or `subscribeNotifications`). The forwarded command handler (line 2094) only processes sender commands, never SUB/NSUB. So `prepareBatch` always runs before `sharedSubscribeQueue`.
32-
33-
### `setQueueService` for Postgres (QueueStore/Postgres.hs:484-505)
11+
## Contract
3412

35-
Per queue:
36-
1. `withQueueRec sq` - reads QueueRec TVar under queue lock, fails if deleted
37-
2. Checks if already set to target value - returns immediately if so
38-
3. `assertUpdated $ withDB' ... DB.execute "UPDATE ..."` - one DB query, asserts 1 row affected
39-
4. `atomically $ writeTVar (queueRec sq) $ Just q'` - updates in-memory QueueRec
40-
5. `withLog ... logQueueService` - writes store log entry
41-
42-
### `setQueueService` for STM (QueueStore/STM.hs:312-338)
43-
44-
Per queue:
45-
1. `atomically (readQueueRec qr $>>= setService)` - reads QueueRec, updates TVar, updates per-service queue sets
46-
2. `$>> withLog ... logQueueService` - writes store log entry
47-
48-
## Implementation (top-down)
49-
50-
### Step 1: Extend batch preparation in the `client` function (Server.hs)
51-
52-
Currently (Server.hs:1372-1381):
5313
```haskell
54-
forever $ do
55-
batch <- atomically (readTBQueue rcvQ)
56-
msgMap <- prefetchMsgs batch
57-
foldrM (process msgMap) ([], []) batch
58-
>>= ...
14+
prepareBatch :: Maybe ServiceId -> NonEmpty (VerifiedTransmission s) -> M s (Either ErrorType (Map RecipientId (Maybe Message, Maybe (Either ErrorType ()))))
5915
```
6016

61-
Rename `prefetchMsgs` to `prepareBatch`. It returns an additional `Map RecipientId (Either ErrorType ())` for association results.
17+
`Left e` = batch-level failure (message pre-fetch or association query failed entirely). All SUBs/NSUBs in the batch get this error.
6218

63-
```haskell
64-
forever $ do
65-
batch <- atomically (readTBQueue rcvQ)
66-
(msgMap, assocResults) <- prepareBatch batch
67-
foldrM (process msgMap assocResults) ([], []) batch
68-
>>= ...
69-
```
19+
`Right map` = per-queue results as a tuple:
20+
- `Maybe Message` - pre-fetched message for SUB queues, `Nothing` for NSUB or no message
21+
- `Maybe (Either ErrorType ())` - association result. `Nothing` = no update needed. `Just (Right ())` = update succeeded. `Just (Left e)` = update failed for this queue.
7022

71-
`assocResults` contains entries only for queues that needed an association update. Keyed by `RecipientId`. `Right ()` means the update succeeded. `Left e` means it failed.
23+
One map, one lookup per queue. `processCommand` passes both values to `subscribeQueueAndDeliver` / `subscribeNotifications` -> `sharedSubscribeQueue`.
7224

73-
### Step 2: Implement `prepareBatch` (Server.hs)
25+
Queues not in the map (non-SUB/NSUB commands, failed verification) are not affected.
7426

75-
Replaces current `prefetchMsgs`. Does three things:
27+
## prepareBatch implementation
7628

77-
1. Collects SUB queues for message pre-fetch (existing `tryPeekMsgs` logic, unchanged).
29+
One accumulating fold over the batch, collecting three lists:
30+
- `subMsgQs :: [StoreQueue s]` - SUB queues for message pre-fetch
31+
- `rcvAssocQs :: [StoreQueue s]` - SUB queues needing `rcv_service_id` update (`clntServiceId /= rcvServiceId qr`)
32+
- `ntfAssocQs :: [StoreQueue s]` - NSUB queues needing `ntf_service_id` update (`clntServiceId /= ntfServiceId` from `NtfCreds`)
7833

79-
2. Classifies each SUB/NSUB queue's association need by reading `queueServiceId` from the already-loaded `QueueRec` in `VerifiedTransmission` and comparing with `clntServiceId`. Produces `NonEmpty (Either ErrorType NeedsAssocUpdate)` aligned with the batch. Error if `q_ = Nothing` for a SUB/NSUB command. `True` if the queue needs its association updated. `False` if no change needed.
34+
Classification reads from the already-loaded `QueueRec` in `VerifiedTransmission` - no extra DB query.
8035

81-
3. Collects `StoreQueue`s where classification produced `Right True`. If non-empty, calls `setQueueServices` with `clntServiceId` as target and this list. Gets back `Set RecipientId` of queues that were actually updated.
36+
Then three store calls (each skipped if its list is empty):
37+
1. `tryPeekMsgs ms subMsgQs` -> `Map RecipientId Message`
38+
2. `setRcvQueueServices (queueStore ms) clntServiceId rcvAssocQs` -> `Set RecipientId`
39+
3. `setNtfQueueServices (queueStore ms) clntServiceId ntfAssocQs` -> `Set RecipientId`
8240

83-
4. Builds `assocResults :: Map RecipientId (Either ErrorType ())`: for each queue that needed an update (`Right True`), if its `recipientId` is in the returned set then `Right ()`, otherwise `Left AUTH`.
41+
Then one pass to merge results into `Map RecipientId (Maybe Message, Maybe (Either ErrorType ()))`:
42+
- For each SUB queue: `(M.lookup rId msgMap, assocResult rId rcvUpdated rcvAssocQs)`
43+
- For each NSUB queue: `(Nothing, assocResult rId ntfUpdated ntfAssocQs)`
8444

85-
### Step 3: Thread `assocResults` through `processCommand` (Server.hs:1463)
45+
Where `assocResult rId updated assocQs` = if the queue was in `assocQs` (needed update), then `Just (Right ())` if `rId` is in `updated`, else `Just (Left AUTH)`. If not in `assocQs` (no update needed), `Nothing`.
8646

87-
Add parameter:
88-
```haskell
89-
processCommand :: Maybe ServiceId -> VersionSMP -> Either ErrorType (Map RecipientId Message) -> Map RecipientId (Either ErrorType ()) -> VerifiedTransmission s -> M s (Maybe ResponseAndMessage)
90-
```
47+
If any of the three calls fails entirely, return `Left e`.
9148

92-
In the SUB case, pass `M.lookup entId assocResults` to `subscribeQueueAndDeliver`.
93-
In the NSUB case, pass `M.lookup entId assocResults` to `subscribeNotifications`.
94-
In the forwarded command call (line 2094), pass `M.empty`.
95-
All other commands ignore it.
49+
## Store interface
9650

97-
### Step 4: Thread through `subscribeQueueAndDeliver` (Server.hs:1631) and `subscribeNotifications` (Server.hs:1737)
51+
Replace the polymorphic `setQueueServices` with two plain functions in `QueueStoreClass`:
9852

99-
Both gain `assocResult :: Maybe (Either ErrorType ())` and pass it to `sharedSubscribeQueue`.
100-
101-
`subscribeQueueAndDeliver` signature becomes:
10253
```haskell
103-
subscribeQueueAndDeliver :: Maybe Message -> Maybe (Either ErrorType ()) -> StoreQueue s -> QueueRec -> M s ResponseAndMessage
54+
setRcvQueueServices :: s -> Maybe ServiceId -> [q] -> IO (Set RecipientId)
55+
setNtfQueueServices :: s -> Maybe ServiceId -> [q] -> IO (Set RecipientId)
10456
```
10557

106-
### Step 5: Modify `sharedSubscribeQueue` (Server.hs:1757)
58+
No `SParty p` polymorphism. Each function knows its column.
10759

108-
Gains `assocResult :: Maybe (Either ErrorType ())`.
60+
### Postgres implementation
10961

110-
Where the queue needs a new or changed association (line 1772), currently:
111-
```haskell
112-
| otherwise -> runExceptT $ do
113-
ExceptT $ setQueueService (queueStore ms) q party (Just serviceId)
114-
hasSub <- ...
62+
`setRcvQueueServices`:
63+
```sql
64+
UPDATE msg_queues SET rcv_service_id = ?
65+
WHERE recipient_id IN ? AND deleted_at IS NULL
66+
RETURNING recipient_id
11567
```
11668

117-
Becomes:
118-
```haskell
119-
| otherwise -> case assocResult of
120-
Just (Left e) -> pure $ Left e
121-
_ -> runExceptT $ do
122-
hasSub <- ...
69+
`setNtfQueueServices`:
70+
```sql
71+
UPDATE msg_queues SET ntf_service_id = ?
72+
WHERE recipient_id IN ? AND notifier_id IS NOT NULL AND deleted_at IS NULL
73+
RETURNING recipient_id
12374
```
12475

125-
`Just (Left e)` means the batch update failed for this queue - return the error.
126-
`Just (Right ())` means the batch update succeeded - skip `setQueueService`, proceed with STM work.
127-
`Nothing` cannot happen here because `prepareBatch` always runs before this code and classifies every SUB/NSUB queue.
76+
After each batch query, for each queue in the returned set:
77+
1. Read QueueRec TVar, update with new serviceId
78+
2. Write store log entry
12879

129-
Same change where removing association (line 1787):
130-
```haskell
131-
Just _ -> case assocResult of
132-
Just (Left e) -> pure $ Left e
133-
_ -> runExceptT $ do
134-
liftIO $ incSrvStat srvAssocRemoved
135-
...
136-
```
80+
### STM implementation
13781

138-
Queues that are already associated correctly or have no service involvement have `assocResult = Nothing` (not in the map). These paths don't call `setQueueService` today, so nothing changes for them.
82+
Loop over queues, call existing per-item logic, collect succeeded `RecipientId`s into a Set.
13983

140-
### Step 6: Add `setQueueServices` to `QueueStoreClass` (QueueStore/Types.hs:53)
84+
## Downstream changes in Server.hs
14185

142-
```haskell
143-
setQueueServices :: (PartyI p, ServiceParty p) => s -> SParty p -> Maybe ServiceId -> [q] -> IO (Set RecipientId)
144-
```
86+
### processCommand
14587

146-
Takes target `serviceId` and list of queues. Returns set of `RecipientId`s that were actually updated in the DB.
88+
Gains one parameter: `Map RecipientId (Maybe Message, Maybe (Either ErrorType ()))`.
14789

148-
### Step 7: Postgres implementation (QueueStore/Postgres.hs)
90+
SUB case: `M.lookup entId prepared` gives `Just (msg_, assocResult)` or `Nothing`. Pass both to `subscribeQueueAndDeliver`.
14991

150-
For `SRecipientService`:
151-
```sql
152-
UPDATE msg_queues SET rcv_service_id = ?
153-
WHERE recipient_id IN ? AND deleted_at IS NULL
154-
RETURNING recipient_id
155-
```
92+
NSUB case: `M.lookup entId prepared` gives `Just (Nothing, assocResult)` or `Nothing`. Pass `assocResult` to `subscribeNotifications`.
15693

157-
For `SNotifierService`:
158-
```sql
159-
UPDATE msg_queues SET ntf_service_id = ?
160-
WHERE recipient_id IN ? AND notifier_id IS NOT NULL AND deleted_at IS NULL
161-
RETURNING recipient_id
162-
```
94+
Forwarded commands: pass `M.empty`.
16395

164-
Build `Set RecipientId` from RETURNING rows.
96+
### subscribeQueueAndDeliver
16597

166-
After the batch query, for each queue whose `recipientId` is in the returned set:
167-
1. Read `QueueRec` from TVar, update with new `serviceId` (same as `updateQueueRec` at line 502-504)
168-
2. Write store log entry (same as `withLog` at line 505)
98+
Takes `Maybe Message` and `Maybe (Either ErrorType ())` as before. No change in how it uses them.
16999

170-
Queues not in the returned set are not updated (deleted between verification and UPDATE). The caller sees them absent from the set and produces `Left AUTH`.
100+
### sharedSubscribeQueue
171101

172-
No per-queue lock needed: the batch UPDATE is a single SQL statement (Postgres handles row-level locking internally), and SUB/NSUB processing is single-threaded per connected client.
102+
Takes `Maybe (Either ErrorType ())`. On paths needing association update:
103+
- `Just (Left e)` -> return error
104+
- `Just (Right ())` -> skip `setQueueService`, proceed with STM work
105+
- `Nothing` -> no update needed, proceed with existing logic
173106

174-
### Step 8: STM implementation (QueueStore/STM.hs)
107+
## Implementation order (top-down)
175108

176-
Loop over queues. For each:
177-
1. Run existing `setService` STM logic from `setQueueService` (line 319-334): read QueueRec, update TVar, update per-service queue sets
178-
2. If succeeded, add `recipientId` to result set
179-
3. Write store log entry
109+
1. Define the `prepareBatch` contract and thread one map through `processCommand` -> `subscribeQueueAndDeliver` / `subscribeNotifications` -> `sharedSubscribeQueue` (Server.hs)
110+
2. Implement `prepareBatch` with the fold, three calls, and merge (Server.hs)
111+
3. Add `setRcvQueueServices` and `setNtfQueueServices` to `QueueStoreClass` (Types.hs)
112+
4. Implement for Postgres with batch `UPDATE ... RETURNING` (Postgres.hs)
113+
5. Implement for STM as loop (STM.hs)
114+
6. Implement for Journal as delegation (Journal.hs)
180115

181-
Return accumulated `Set RecipientId`.
116+
At step 2, store functions can initially be stubs returning empty sets. Steps 3-6 fill in the real implementations.
182117

183118
## Files changed
184119

185120
| File | Change |
186121
|---|---|
187-
| `src/Simplex/Messaging/Server.hs` | Rename `prefetchMsgs` to `prepareBatch` adding classification and `setQueueServices` call. Thread `assocResults` through `processCommand` -> `subscribeQueueAndDeliver` / `subscribeNotifications` -> `sharedSubscribeQueue`. Replace `setQueueService` calls with `assocResult` check. |
188-
| `src/Simplex/Messaging/Server/QueueStore/Types.hs` | Add `setQueueServices` to `QueueStoreClass` |
189-
| `src/Simplex/Messaging/Server/QueueStore/Postgres.hs` | Implement `setQueueServices` with batch `UPDATE ... RETURNING` + per-item TVar and store log updates |
190-
| `src/Simplex/Messaging/Server/QueueStore/STM.hs` | Implement `setQueueServices` as loop over existing STM logic |
122+
| `src/Simplex/Messaging/Server.hs` | `prepareBatch` with fold + merge; one map parameter through `processCommand` -> `subscribeQueueAndDeliver` / `subscribeNotifications` -> `sharedSubscribeQueue` |
123+
| `src/Simplex/Messaging/Server/QueueStore/Types.hs` | Add `setRcvQueueServices`, `setNtfQueueServices` to `QueueStoreClass` |
124+
| `src/Simplex/Messaging/Server/QueueStore/Postgres.hs` | Implement with batch `UPDATE ... RETURNING` + per-item TVar/log updates |
125+
| `src/Simplex/Messaging/Server/QueueStore/STM.hs` | Implement as loop |
126+
| `src/Simplex/Messaging/Server/MsgStore/Journal.hs` | Delegate to underlying store |

0 commit comments

Comments
 (0)