Skip to content

Commit 1c253dd

Browse files
simpler
1 parent 8b52f50 commit 1c253dd

File tree

4 files changed

+44
-56
lines changed

4 files changed

+44
-56
lines changed

src/Simplex/Messaging/Server.hs

Lines changed: 35 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1366,41 +1366,35 @@ client
13661366
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " commands"
13671367
let THandleParams {thVersion} = thParams'
13681368
clntServiceId = (\THClientService {serviceId} -> serviceId) <$> (peerClientService =<< thAuth thParams')
1369-
process prepared t acc@(rs, msgs) =
1369+
process batchSubs t acc@(rs, msgs) =
13701370
(maybe acc (\(!r, !msg_) -> (r : rs, maybe msgs (: msgs) msg_)))
1371-
<$> processCommand clntServiceId thVersion prepared t
1371+
<$> processCommand clntServiceId thVersion batchSubs t
13721372
forever $ do
13731373
batch <- atomically (readTBQueue rcvQ)
1374-
prepared <- prepareBatch clntServiceId batch
1375-
foldrM (process prepared) ([], []) batch
1374+
batchSubs <- prepareBatchSubs clntServiceId batch
1375+
foldrM (process batchSubs) ([], []) batch
13761376
>>= \(rs_, msgs) -> mapM_ (atomically . writeTBQueue sndQ . (,msgs)) (L.nonEmpty rs_)
13771377
where
1378-
prepareBatch :: Maybe ServiceId -> NonEmpty (VerifiedTransmission s) -> M s (Either ErrorType (Map RecipientId (Maybe Message, Maybe (Either ErrorType ()))))
1379-
prepareBatch clntServiceId_ batch = do
1380-
let (subMsgQs, rcvAssocQs, ntfAssocQs) = foldl' classify ([], [], []) $ L.toList batch
1381-
classify (!msgQs, !rcvQs, !ntfQs) = \case
1378+
prepareBatchSubs ::
1379+
Maybe ServiceId ->
1380+
NonEmpty (VerifiedTransmission s) ->
1381+
M s (Either ErrorType (Map RecipientId Message, Map RecipientId (Either ErrorType ()), Map RecipientId (Either ErrorType ())))
1382+
prepareBatchSubs clntServiceId_ batch = do
1383+
let (subMsgQs, rcvAssocQs, ntfAssocQs) = foldl' partitionSubs ([], [], []) batch
1384+
partitionSubs (msgQs, rcvQs, ntfQs) = \case
13821385
(Just (q, qr), (_, _, Cmd SRecipient SUB))
13831386
| clntServiceId_ /= rcvServiceId qr -> (q : msgQs, q : rcvQs, ntfQs)
13841387
| otherwise -> (q : msgQs, rcvQs, ntfQs)
13851388
(Just (q, qr), (_, _, Cmd SNotifier NSUB))
13861389
| clntServiceId_ /= (notifier qr >>= ntfServiceId) -> (msgQs, rcvQs, q : ntfQs)
1387-
| otherwise -> (msgQs, rcvQs, ntfQs)
13881390
_ -> (msgQs, rcvQs, ntfQs)
13891391
liftIO $ runExceptT $ do
1390-
msgs <- if null subMsgQs then pure M.empty else tryPeekMsgs ms subMsgQs
1391-
rcvUpdated <- if null rcvAssocQs then pure S.empty else ExceptT $ Right <$> setRcvQueueServices @(StoreQueue s) (queueStore ms) clntServiceId_ rcvAssocQs
1392-
ntfUpdated <- if null ntfAssocQs then pure S.empty else ExceptT $ Right <$> setNtfQueueServices @(StoreQueue s) (queueStore ms) clntServiceId_ ntfAssocQs
1393-
let rcvSet = S.fromList $ map recipientId rcvAssocQs
1394-
assocResult rId updated = if S.member rId updated then Right () else Left AUTH
1395-
mkEntry q =
1396-
let rId = recipientId q
1397-
msg_ = M.lookup rId msgs
1398-
assoc_
1399-
| S.member rId rcvSet = Just $ assocResult rId rcvUpdated
1400-
| otherwise = Nothing
1401-
in (rId, (msg_, assoc_))
1402-
mkNtfEntry q = let rId = recipientId q in (rId, (Nothing, Just $ assocResult rId ntfUpdated))
1403-
pure $ M.fromList $ map mkEntry subMsgQs <> map mkNtfEntry ntfAssocQs
1392+
rcvAssocs <- ifNotNull rcvAssocQs $ ExceptT . setRcvQueueServices @(StoreQueue s) (queueStore ms) clntServiceId_
1393+
ntfAssocs <- ifNotNull ntfAssocQs $ ExceptT . setNtfQueueServices @(StoreQueue s) (queueStore ms) clntServiceId_
1394+
msgs <- ifNotNull subMsgQs $ tryPeekMsgs ms
1395+
pure (msgs, rcvAssocs, ntfAssocs)
1396+
where
1397+
ifNotNull qs f = if null qs then pure M.empty else f qs
14041398

14051399
processProxiedCmd :: Transmission (Command 'ProxiedClient) -> M s (Maybe ResponseAndMessage)
14061400
processProxiedCmd (corrId, EntityId sessId, command) = (\t -> ((corrId, EntityId sessId, t), Nothing)) <$$> case command of
@@ -1482,8 +1476,8 @@ client
14821476
mkIncProxyStats ps psOwn own sel = do
14831477
incStat $ sel ps
14841478
when own $ incStat $ sel psOwn
1485-
processCommand :: Maybe ServiceId -> VersionSMP -> Either ErrorType (Map RecipientId (Maybe Message, Maybe (Either ErrorType ()))) -> VerifiedTransmission s -> M s (Maybe ResponseAndMessage)
1486-
processCommand clntServiceId clntVersion prepared (q_, (corrId, entId, cmd)) = case cmd of
1479+
processCommand :: Maybe ServiceId -> VersionSMP -> Either ErrorType (Map RecipientId Message, Map RecipientId (Either ErrorType ()), Map RecipientId (Either ErrorType ())) -> VerifiedTransmission s -> M s (Maybe ResponseAndMessage)
1480+
processCommand clntServiceId clntVersion batchSubs (q_, (corrId, entId, cmd)) = case cmd of
14871481
Cmd SProxiedClient command -> processProxiedCmd (corrId, entId, command)
14881482
Cmd SSender command -> case command of
14891483
SKEY k -> withQueue $ \q qr -> checkMode QMMessaging qr $ secureQueue_ q k
@@ -1495,10 +1489,8 @@ client
14951489
LGET -> withQueue $ \q qr -> checkContact qr $ getQueueLink_ q qr
14961490
Cmd SNotifier NSUB -> response . (corrId,entId,) <$> case q_ of
14971491
Just (q, QueueRec {notifier = Just ntfCreds}) ->
1498-
let assoc_ = case prepared of
1499-
Left _ -> Nothing
1500-
Right prepMap -> M.lookup entId prepMap >>= snd
1501-
in subscribeNotifications assoc_ q ntfCreds
1492+
either (pure . ERR) (\_ -> subscribeNotifications q ntfCreds)
1493+
$ batchSubs >>= sequence . M.lookup (recipientId q) . \(_, _, n) -> n
15021494
_ -> pure $ ERR INTERNAL
15031495
Cmd SNotifierService (NSUBS n idsHash) -> response . (corrId,entId,) <$> case clntServiceId of
15041496
Just serviceId -> subscribeServiceNotifications serviceId (n, idsHash)
@@ -1511,10 +1503,11 @@ client
15111503
pure $ allowNewQueues && maybe True ((== auth_) . Just) newQueueBasicAuth
15121504
Cmd SRecipient command ->
15131505
case command of
1514-
SUB -> case prepared of
1506+
SUB -> case batchSubs of
15151507
Left e -> pure $ Just (err e, Nothing)
1516-
Right prepMap -> let (msg_, assoc_) = maybe (Nothing, Nothing) id (M.lookup entId prepMap)
1517-
in withQueue' $ subscribeQueueAndDeliver msg_ assoc_
1508+
Right (msgs, rcvAssocs, _) -> case sequence $ M.lookup entId rcvAssocs of
1509+
Left e -> pure $ Just (err e, Nothing)
1510+
Right _ -> withQueue' $ subscribeQueueAndDeliver (M.lookup entId msgs)
15181511
GET -> withQueue getMessage
15191512
ACK msgId -> withQueue $ acknowledgeMsg msgId
15201513
KEY sKey -> withQueue $ \q _ -> either err (corrId,entId,) <$> secureQueue_ q sKey
@@ -1655,11 +1648,11 @@ client
16551648
suspendQueue_ :: (StoreQueue s, QueueRec) -> M s (Transmission BrokerMsg)
16561649
suspendQueue_ (q, _) = liftIO $ either err (const ok) <$> suspendQueue (queueStore ms) q
16571650

1658-
subscribeQueueAndDeliver :: Maybe Message -> Maybe (Either ErrorType ()) -> StoreQueue s -> QueueRec -> M s ResponseAndMessage
1659-
subscribeQueueAndDeliver msg_ assocResult q qr@QueueRec {rcvServiceId} =
1651+
subscribeQueueAndDeliver :: Maybe Message -> StoreQueue s -> QueueRec -> M s ResponseAndMessage
1652+
subscribeQueueAndDeliver msg_ q qr@QueueRec {rcvServiceId} =
16601653
liftIO (TM.lookupIO entId $ subscriptions clnt) >>= \case
16611654
Nothing ->
1662-
sharedSubscribeQueue assocResult q SRecipientService rcvServiceId subscribers subscriptions serviceSubsCount (newSubscription NoSub) rcvServices >>= \case
1655+
sharedSubscribeQueue q SRecipientService rcvServiceId subscribers subscriptions serviceSubsCount (newSubscription NoSub) rcvServices >>= \case
16631656
Left e -> pure (err e, Nothing)
16641657
Right s -> deliver s
16651658
Just s@Sub {subThread} -> do
@@ -1761,9 +1754,9 @@ client
17611754
then action q qr
17621755
else liftIO (updateQueueTime (queueStore ms) q t) >>= either (pure . err') (action q)
17631756

1764-
subscribeNotifications :: Maybe (Either ErrorType ()) -> StoreQueue s -> NtfCreds -> M s BrokerMsg
1765-
subscribeNotifications assocResult q NtfCreds {ntfServiceId} =
1766-
sharedSubscribeQueue assocResult q SNotifierService ntfServiceId ntfSubscribers ntfSubscriptions ntfServiceSubsCount (pure ()) ntfServices >>= \case
1757+
subscribeNotifications :: StoreQueue s -> NtfCreds -> M s BrokerMsg
1758+
subscribeNotifications q NtfCreds {ntfServiceId} =
1759+
sharedSubscribeQueue q SNotifierService ntfServiceId ntfSubscribers ntfSubscriptions ntfServiceSubsCount (pure ()) ntfServices >>= \case
17671760
Left e -> pure $ ERR e
17681761
Right (hasSub, _) -> do
17691762
when (isNothing clntServiceId) $
@@ -1772,7 +1765,6 @@ client
17721765

17731766
sharedSubscribeQueue ::
17741767
(PartyI p, ServiceParty p) =>
1775-
Maybe (Either ErrorType ()) ->
17761768
StoreQueue s ->
17771769
SParty p ->
17781770
Maybe ServiceId ->
@@ -1782,7 +1774,7 @@ client
17821774
STM sub ->
17831775
(ServerStats -> ServiceStats) ->
17841776
M s (Either ErrorType (Bool, Maybe sub))
1785-
sharedSubscribeQueue assocResult q party queueServiceId srvSubscribers clientSubs clientServiceSubs mkSub servicesSel = do
1777+
sharedSubscribeQueue q party queueServiceId srvSubscribers clientSubs clientServiceSubs mkSub servicesSel = do
17861778
stats <- asks serverStats
17871779
let incSrvStat sel = incStat $ sel $ servicesSel stats
17881780
writeSub = writeTQueue (subQ srvSubscribers) (CSClient entId queueServiceId clntServiceId, clientId)
@@ -1797,9 +1789,7 @@ client
17971789
incSrvStat srvSubQueues
17981790
incSrvStat srvAssocDuplicate
17991791
pure $ Right (hasSub, Nothing)
1800-
| otherwise -> case assocResult of
1801-
Just (Left e) -> pure $ Left e
1802-
_ -> runExceptT $ do
1792+
| otherwise -> runExceptT $ do
18031793
-- association already done in prepareBatch
18041794
hasSub <- atomically $ (<$ incServiceQueueSubs) =<< hasServiceSub
18051795
atomically writeSub
@@ -1813,9 +1803,7 @@ client
18131803
-- This function is used when queue association with the service is created.
18141804
incServiceQueueSubs = modifyTVar' (clientServiceSubs clnt) $ addServiceSubs (1, queueIdHash (recipientId q)) -- service count and IDS hash
18151805
Nothing -> case queueServiceId of
1816-
Just _ -> case assocResult of
1817-
Just (Left e) -> pure $ Left e
1818-
_ -> runExceptT $ do
1806+
Just _ -> runExceptT $ do
18191807
-- unassociation already done in prepareBatch
18201808
liftIO $ incSrvStat srvAssocRemoved
18211809
-- getSubscription may be Just for receiving service, where clientSubs also hold active deliveries for service subscriptions.
@@ -2125,7 +2113,7 @@ client
21252113
-- rejectOrVerify filters allowed commands, no need to repeat it here.
21262114
-- INTERNAL is used because processCommand never returns Nothing for sender commands (could be extracted for better types).
21272115
-- `fst` removes empty message that is only returned for `SUB` command
2128-
Right t''@(_, (corrId', entId', _)) -> maybe (corrId', entId', ERR INTERNAL) fst <$> lift (processCommand Nothing fwdVersion (Right M.empty) t'')
2116+
Right t''@(_, (corrId', entId', _)) -> maybe (corrId', entId', ERR INTERNAL) fst <$> lift (processCommand Nothing fwdVersion (Right (M.empty, M.empty, M.empty)) t'')
21292117
-- encode response
21302118
r' <- case batchTransmissions clntTHParams [Right (Nothing, encodeTransmission clntTHParams r)] of
21312119
[] -> throwE INTERNAL -- at least 1 item is guaranteed from NonEmpty/Right

src/Simplex/Messaging/Server/QueueStore/Postgres.hs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -504,10 +504,10 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
504504
atomically $ writeTVar (queueRec sq) $ Just q'
505505
withLog "setQueueService" st $ \sl -> logQueueService sl rId party serviceId
506506

507-
setRcvQueueServices _ _ [] = pure S.empty
508-
setRcvQueueServices _ _ _ = pure S.empty -- TODO batch implementation
509-
setNtfQueueServices _ _ [] = pure S.empty
510-
setNtfQueueServices _ _ _ = pure S.empty -- TODO batch implementation
507+
setRcvQueueServices _ _ [] = pure $ Right M.empty
508+
setRcvQueueServices _ _ _ = pure $ Right M.empty -- TODO batch implementation
509+
setNtfQueueServices _ _ [] = pure $ Right M.empty
510+
setNtfQueueServices _ _ _ = pure $ Right M.empty -- TODO batch implementation
511511

512512
getQueueNtfServices :: PostgresQueueStore q -> [(NotifierId, a)] -> IO (Either ErrorType ([(Maybe ServiceId, [(NotifierId, a)])], [(NotifierId, a)]))
513513
getQueueNtfServices st ntfs = E.uninterruptibleMask_ $ runExceptT $ do

src/Simplex/Messaging/Server/QueueStore/STM.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -337,8 +337,8 @@ instance StoreQueueClass q => QueueStoreClass q (STMQueueStore q) where
337337
mapM_ (removeServiceQueue st serviceSel qId) prevSrvId
338338
mapM_ (addServiceQueue st serviceSel qId) serviceId
339339

340-
setRcvQueueServices _ _ _ = pure S.empty -- TODO loop implementation
341-
setNtfQueueServices _ _ _ = pure S.empty -- TODO loop implementation
340+
setRcvQueueServices _ _ _ = pure $ Right M.empty -- TODO loop implementation
341+
setNtfQueueServices _ _ _ = pure $ Right M.empty -- TODO loop implementation
342342

343343
getQueueNtfServices :: STMQueueStore q -> [(NotifierId, a)] -> IO (Either ErrorType ([(Maybe ServiceId, [(NotifierId, a)])], [(NotifierId, a)]))
344344
getQueueNtfServices st ntfs = do

src/Simplex/Messaging/Server/QueueStore/Types.hs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import Control.Concurrent.STM
1616
import Control.Monad
1717
import Data.Int (Int64)
1818
import Data.List.NonEmpty (NonEmpty)
19-
import Data.Set (Set)
19+
import Data.Map.Strict (Map)
2020
import Data.Text (Text)
2121
import Simplex.Messaging.Protocol
2222
import Simplex.Messaging.Server.QueueStore
@@ -52,8 +52,8 @@ class StoreQueueClass q => QueueStoreClass q s where
5252
deleteStoreQueue :: s -> q -> IO (Either ErrorType QueueRec)
5353
getCreateService :: s -> ServiceRec -> IO (Either ErrorType ServiceId)
5454
setQueueService :: (PartyI p, ServiceParty p) => s -> q -> SParty p -> Maybe ServiceId -> IO (Either ErrorType ())
55-
setRcvQueueServices :: s -> Maybe ServiceId -> [q] -> IO (Set RecipientId)
56-
setNtfQueueServices :: s -> Maybe ServiceId -> [q] -> IO (Set RecipientId)
55+
setRcvQueueServices :: s -> Maybe ServiceId -> [q] -> IO (Either ErrorType (Map RecipientId (Either ErrorType ())))
56+
setNtfQueueServices :: s -> Maybe ServiceId -> [q] -> IO (Either ErrorType (Map RecipientId (Either ErrorType ())))
5757
getQueueNtfServices :: s -> [(NotifierId, a)] -> IO (Either ErrorType ([(Maybe ServiceId, [(NotifierId, a)])], [(NotifierId, a)]))
5858
getServiceQueueCountHash :: (PartyI p, ServiceParty p) => s -> SParty p -> ServiceId -> IO (Either ErrorType (Int64, IdsHash))
5959

0 commit comments

Comments
 (0)