Skip to content

Commit 0741583

Browse files
agent: read queues in batches for subscriptions (#1758)
* agent: read queues in batches for subscriptions * resubscribe in batches too --------- Co-authored-by: Evgeny @ SimpleX Chat <259188159+evgeny-simplex@users.noreply.github.com>
1 parent f8f172f commit 0741583

File tree

4 files changed

+27
-24
lines changed

4 files changed

+27
-24
lines changed

src/Simplex/Messaging/Agent.hs

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1564,12 +1564,11 @@ subscribeAllConnections' :: AgentClient -> Bool -> Maybe UserId -> AM ()
15641564
subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do
15651565
userSrvs <- withStore' c (`getSubscriptionServers` onlyNeeded)
15661566
unless (null userSrvs) $ do
1567-
maxPending <- asks $ maxPendingSubscriptions . config
1568-
currPending <- newTVarIO 0
1567+
batchSize <- asks $ subsBatchSize . config
15691568
let userSrvs' = case activeUserId_ of
15701569
Just activeUserId -> sortOn (\(uId, _) -> if uId == activeUserId then 0 else 1 :: Int) userSrvs
15711570
Nothing -> userSrvs
1572-
rs <- lift $ mapConcurrently (subscribeUserServer maxPending currPending) userSrvs'
1571+
rs <- lift $ mapConcurrently (subscribeUserServer batchSize) userSrvs'
15731572
let (errs, oks) = partitionEithers rs
15741573
logInfo $ "subscribed " <> tshow (sum oks) <> " queues"
15751574
forM_ (L.nonEmpty errs) $ notifySub c . ERRS . L.map ("",)
@@ -1578,18 +1577,16 @@ subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do
15781577
resumeAllCommands c
15791578
where
15801579
handleErr = (`catchAllErrors` \e -> notifySub' c "" (ERR e) >> throwE e)
1581-
subscribeUserServer :: Int -> TVar Int -> (UserId, SMPServer) -> AM' (Either AgentErrorType Int)
1582-
subscribeUserServer maxPending currPending (userId, srv) = do
1583-
atomically $ whenM ((maxPending <=) <$> readTVar currPending) retry
1584-
tryAllErrors' $ do
1585-
qs <- withStore' c $ \db -> do
1586-
qs <- getUserServerRcvQueueSubs db userId srv onlyNeeded
1587-
unless (null qs) $ atomically $ modifyTVar' currPending (+ length qs) -- update before leaving transaction
1588-
pure qs
1589-
let n = length qs
1590-
unless (null qs) $ lift $ subscribe qs `E.finally` atomically (modifyTVar' currPending $ subtract n)
1591-
pure n
1580+
subscribeUserServer :: Int -> (UserId, SMPServer) -> AM' (Either AgentErrorType Int)
1581+
subscribeUserServer batchSize (userId, srv) = tryAllErrors' $ loop 0 Nothing
15921582
where
1583+
loop !n cursor_ = do
1584+
qs <- withStore' c $ \db -> getUserServerRcvQueueSubs db userId srv onlyNeeded batchSize cursor_
1585+
if null qs then pure n else do
1586+
lift $ subscribe qs
1587+
let n' = n + length qs
1588+
lastRcvId = Just $ queueId $ last qs
1589+
if length qs < batchSize then pure n' else loop n' lastRcvId
15931590
subscribe qs = do
15941591
rs <- subscribeUserServerQueues c userId srv qs
15951592
-- TODO [certs rcv] storeClientServiceAssocs store associations of queues with client service ID

src/Simplex/Messaging/Agent/Client.hs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1598,9 +1598,15 @@ checkQueues c = fmap partitionEithers . mapM checkQueue
15981598
-- and that they are already added to pending subscriptions.
15991599
resubscribeSessQueues :: AgentClient -> SMPTransportSession -> [RcvQueueSub] -> AM' ()
16001600
resubscribeSessQueues c tSess qs = do
1601+
batchSize <- asks $ subsBatchSize . config
16011602
(errs, qs_) <- checkQueues c qs
1602-
forM_ (L.nonEmpty qs_) $ \qs' -> void $ subscribeSessQueues_ c True (tSess, qs')
1603+
subscribeChunks $ toChunks batchSize qs_
16031604
forM_ (L.nonEmpty errs) $ notifySub c . ERRS . L.map (first qConnId)
1605+
where
1606+
subscribeChunks [] = pure ()
1607+
subscribeChunks (qs' : rest) = do
1608+
(_, active) <- subscribeSessQueues_ c True (tSess, qs')
1609+
when active $ subscribeChunks rest
16041610

16051611
subscribeSessQueues_ :: AgentClient -> Bool -> (SMPTransportSession, NonEmpty RcvQueueSub) -> AM' (BatchResponses RcvQueueSub AgentErrorType (Maybe ServiceId), Bool)
16061612
subscribeSessQueues_ c withEvents qs = sendClientBatch_ "SUB" False subscribe_ c NRMBackground qs

src/Simplex/Messaging/Agent/Env/SQLite.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ data AgentConfig = AgentConfig
168168
ntfBatchSize :: Int,
169169
ntfSubFirstCheckInterval :: NominalDiffTime,
170170
ntfSubCheckInterval :: NominalDiffTime,
171-
maxPendingSubscriptions :: Int,
171+
subsBatchSize :: Int,
172172
caCertificateFile :: FilePath,
173173
privateKeyFile :: FilePath,
174174
certificateFile :: FilePath,
@@ -241,7 +241,7 @@ defaultAgentConfig =
241241
ntfBatchSize = 150,
242242
ntfSubFirstCheckInterval = nominalDay,
243243
ntfSubCheckInterval = 3 * nominalDay,
244-
maxPendingSubscriptions = 35000,
244+
subsBatchSize = 1350,
245245
-- CA certificate private key is not needed for initialization
246246
-- ! we do not generate these
247247
caCertificateFile = "/etc/opt/simplex-agent/ca.crt",

src/Simplex/Messaging/Agent/Store/AgentStore.hs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2211,14 +2211,14 @@ getSubscriptionServers db onlyNeeded =
22112211
toUserServer :: (UserId, NonEmpty TransportHost, ServiceName, C.KeyHash) -> (UserId, SMPServer)
22122212
toUserServer (userId, host, port, keyHash) = (userId, SMPServer host port keyHash)
22132213
2214-
getUserServerRcvQueueSubs :: DB.Connection -> UserId -> SMPServer -> Bool -> IO [RcvQueueSub]
2215-
getUserServerRcvQueueSubs db userId (SMPServer h p kh) onlyNeeded =
2216-
map toRcvQueueSub
2217-
<$> DB.query
2218-
db
2219-
(rcvQueueSubQuery <> toSubscribe <> " c.deleted = 0 AND q.deleted = 0 AND c.user_id = ? AND q.host = ? AND q.port = ? AND COALESCE(q.server_key_hash, s.key_hash) = ?")
2220-
(userId, h, p, kh)
2214+
getUserServerRcvQueueSubs :: DB.Connection -> UserId -> SMPServer -> Bool -> Int -> Maybe SMP.RecipientId -> IO [RcvQueueSub]
2215+
getUserServerRcvQueueSubs db userId (SMPServer h p kh) onlyNeeded limit cursor_ =
2216+
map toRcvQueueSub <$> case cursor_ of
2217+
Nothing -> DB.query db (q <> orderLimit) (userId, h, p, kh, limit)
2218+
Just cursor -> DB.query db (q <> " AND q.rcv_id > ? " <> orderLimit) (userId, h, p, kh, cursor, limit)
22212219
where
2220+
q = rcvQueueSubQuery <> toSubscribe <> " c.deleted = 0 AND q.deleted = 0 AND c.user_id = ? AND q.host = ? AND q.port = ? AND COALESCE(q.server_key_hash, s.key_hash) = ?"
2221+
orderLimit = " ORDER BY q.rcv_id LIMIT ?"
22222222
toSubscribe
22232223
| onlyNeeded = " WHERE q.to_subscribe = 1 AND "
22242224
| otherwise = " WHERE "

0 commit comments

Comments
 (0)