@@ -50,6 +50,7 @@ module Simplex.Messaging.Agent
5050 setUserService ,
5151 connRequestPQSupport ,
5252 createConnectionAsync ,
53+ setConnShortLinkAsync ,
5354 joinConnectionAsync ,
5455 allowConnectionAsync ,
5556 acceptContactAsync ,
@@ -356,6 +357,11 @@ createConnectionAsync :: ConnectionModeI c => AgentClient -> UserId -> ACorrId -
356357createConnectionAsync c userId aCorrId enableNtfs = withAgentEnv c .:. newConnAsync c userId aCorrId enableNtfs
357358{-# INLINE createConnectionAsync #-}
358359
360+ -- | Create or update user's contact connection short link (LSET command) asynchronously, no synchronous response
361+ setConnShortLinkAsync :: ConnectionModeI c => AgentClient -> ACorrId -> ConnId -> SConnectionMode c -> UserConnLinkData c -> Maybe CRClientData -> AE ()
362+ setConnShortLinkAsync c = withAgentEnv c .::. setConnShortLinkAsync' c
363+ {-# INLINE setConnShortLinkAsync #-}
364+
359365-- | Join SMP agent connection (JOIN command) asynchronously, synchronous response is new connection id
360366joinConnectionAsync :: AgentClient -> UserId -> ACorrId -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> AE ConnId
361367joinConnectionAsync c userId aCorrId enableNtfs = withAgentEnv c .:: joinConnAsync c userId aCorrId enableNtfs
@@ -926,6 +932,16 @@ checkClientNotices AgentClient {clientNotices, presetServers} (ProtoServerWithAu
926932 when (maybe True (ts < ) expires_) $
927933 throwError NOTICE {server = safeDecodeUtf8 $ strEncode $ L. head host, preset = isNothing srvKey, expiresAt = roundedToUTCTime <$> expires_}
928934
935+ setConnShortLinkAsync' :: forall c . ConnectionModeI c => AgentClient -> ACorrId -> ConnId -> SConnectionMode c -> UserConnLinkData c -> Maybe CRClientData -> AM ()
936+ setConnShortLinkAsync' c corrId connId cMode userLinkData clientData =
937+ withConnLock c connId " setConnShortLinkAsync" $ do
938+ SomeConn _ conn <- withStore c (`getConn` connId)
939+ srv <- case (conn, cMode, userLinkData) of
940+ (ContactConnection _ RcvQueue {server}, SCMContact , UserContactLinkData {}) -> pure server
941+ (RcvConnection _ RcvQueue {server}, SCMInvitation , UserInvLinkData {}) -> pure server
942+ _ -> throwE $ CMD PROHIBITED " setConnShortLinkAsync: invalid connection or mode"
943+ enqueueCommand c corrId connId (Just srv) $ AClientCommand $ LSET (AUCLD cMode userLinkData) clientData
944+
929945setConnShortLink' :: AgentClient -> NetworkRequestMode -> ConnId -> SConnectionMode c -> UserConnLinkData c -> Maybe CRClientData -> AM (ConnShortLink c )
930946setConnShortLink' c nm connId cMode userLinkData clientData =
931947 withConnLock c connId " setConnShortLink" $ do
@@ -1169,7 +1185,8 @@ startJoinInvitation c userId connId sq_ enableNtfs cReqUri pqSup =
11691185 let cData = ConnData {userId, connId, connAgentVersion, enableNtfs, lastExternalSndId = 0 , deleted = False , ratchetSyncState = RSOk , pqSupport}
11701186 case sq_ of
11711187 Just sq@ SndQueue {e2ePubKey = Just _k} -> do
1172- e2eSndParams <- withStore c $ \ db ->
1188+ e2eSndParams <- withStore c $ \ db -> do
1189+ lockConnForUpdate db connId
11731190 getSndRatchet db connId v >>= \ case
11741191 Right r -> pure $ Right $ snd r
11751192 Left e -> do
@@ -1183,6 +1200,7 @@ startJoinInvitation c userId connId sq_ enableNtfs cReqUri pqSup =
11831200 sndKey_ = snd <$> invLink_
11841201 (q, _) <- lift $ newSndQueue userId " " qInfo sndKey_
11851202 withStore c $ \ db -> runExceptT $ do
1203+ liftIO $ lockConnForUpdate db connId
11861204 e2eSndParams <- createRatchet_ db g maxSupported pqSupport e2eRcvParams
11871205 sq' <- maybe (ExceptT $ updateNewConnSnd db connId q) pure sq_
11881206 pure (cData, sq', e2eSndParams, lnkId_)
@@ -1261,7 +1279,8 @@ joinConnSrv c nm userId connId enableNtfs cReqUri@CRContactUri {} cInfo pqSup su
12611279 AgentConfig {smpClientVRange = vr, smpAgentVRange, e2eEncryptVRange = e2eVR} <- asks config
12621280 let qUri = SMPQueueUri vr $ (rcvSMPQueueAddress rq) {queueMode = Just QMMessaging }
12631281 crData = ConnReqUriData SSSimplex smpAgentVRange [qUri] Nothing
1264- e2eRcvParams <- withStore' c $ \ db ->
1282+ e2eRcvParams <- withStore' c $ \ db -> do
1283+ lockConnForUpdate db connId
12651284 getRatchetX3dhKeys db connId >>= \ case
12661285 Right keys -> pure $ CR. mkRcvE2ERatchetParams (maxVersion e2eVR) keys
12671286 Left e -> do
@@ -1727,6 +1746,10 @@ runCommandProcessing c@AgentClient {subQ} connId server_ Worker {doWork} = do
17271746 tryCommand . withNextSrv c userId storageSrvs triedHosts [] $ \ srv -> do
17281747 CCLink cReq _ <- newRcvConnSrv c NRMBackground userId connId enableNtfs cMode Nothing Nothing pqEnc subMode srv
17291748 notify $ INV (ACR cMode cReq)
1749+ LSET auData@ (AUCLD cMode userLinkData) clientData ->
1750+ withServer' . tryCommand $ do
1751+ link <- setConnShortLink' c NRMBackground connId cMode userLinkData clientData
1752+ notify $ LINK (ACSL cMode link) auData
17301753 JOIN enableNtfs (ACR _ cReq@ (CRInvitationUri ConnReqUriData {crSmpQueues = q :| _} _)) pqEnc subMode connInfo -> noServer $ do
17311754 triedHosts <- newTVarIO S. empty
17321755 tryCommand . withNextSrv c userId storageSrvs triedHosts [qServer q] $ \ srv -> do
@@ -2007,7 +2030,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} sq@SndQueue {userId, connId, server,
20072030 withRetryLock2 ri' qLock $ \ riState loop -> do
20082031 liftIO $ waitWhileSuspended c
20092032 liftIO $ waitForUserNetwork c
2010- resp <- tryError $ case msgType of
2033+ resp <- tryAllErrors $ case msgType of
20112034 AM_CONN_INFO -> sendConfirmation c NRMBackground sq msgBody
20122035 AM_CONN_INFO_REPLY -> sendConfirmation c NRMBackground sq msgBody
20132036 _ -> case pendingMsgPrepData_ of
@@ -2147,10 +2170,12 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} sq@SndQueue {userId, connId, server,
21472170 notifyDelMsgs :: InternalId -> AgentErrorType -> UTCTime -> AM ()
21482171 notifyDelMsgs msgId err expireTs = do
21492172 notifyDel msgId $ MERR (unId msgId) err
2150- msgIds_ <- withStore' c $ \ db -> getExpiredSndMessages db connId sq expireTs
2173+ msgIds_ <- withStore' c $ \ db -> do
2174+ msgIds_ <- getExpiredSndMessages db connId sq expireTs
2175+ forM_ msgIds_ $ \ msgId' -> deleteSndMsgDelivery db connId sq msgId' False `catchAll_` pure ()
2176+ pure msgIds_
21512177 forM_ (L. nonEmpty msgIds_) $ \ msgIds -> do
21522178 notify $ MERRS (L. map unId msgIds) err
2153- withStore' c $ \ db -> forM_ msgIds $ \ msgId' -> deleteSndMsgDelivery db connId sq msgId' False `catchAll_` pure ()
21542179 atomically $ incSMPServerStat' c userId server sentExpiredErrs (length msgIds_ + 1 )
21552180 delMsg :: InternalId -> AM ()
21562181 delMsg = delMsgKeep False
@@ -2798,15 +2823,14 @@ subscriber c@AgentClient {msgQ} = forever $ do
27982823
27992824cleanupManager :: AgentClient -> AM' ()
28002825cleanupManager c@ AgentClient {subQ} = do
2801- delay <- asks (initialCleanupDelay . config)
2802- liftIO $ threadDelay' delay
2803- int <- asks (cleanupInterval . config)
2804- ttl <- asks $ storedMsgDataTTL . config
2826+ AgentConfig {initialCleanupDelay, cleanupInterval = int, storedMsgDataTTL = ttl, cleanupBatchSize = limit} <-
2827+ asks config
2828+ liftIO $ threadDelay' initialCleanupDelay
28052829 forever $ waitActive $ do
28062830 run ERR deleteConns
2807- run ERR $ withStore' c ( ` deleteRcvMsgHashesExpired` ttl)
2808- run ERR $ withStore' c ( ` deleteSndMsgsExpired` ttl)
2809- run ERR $ withStore' c ( ` deleteRatchetKeyHashesExpired` ttl)
2831+ run ERR $ withStore' c $ \ db -> deleteRcvMsgHashesExpired db ttl limit
2832+ run ERR $ withStore' c $ \ db -> deleteSndMsgsExpired db ttl limit
2833+ run ERR $ withStore' c $ \ db -> deleteRatchetKeyHashesExpired db ttl limit
28102834 run ERR $ withStore' c (`deleteExpiredNtfTokensToDelete` ttl)
28112835 run RFERR deleteRcvFilesExpired
28122836 run RFERR deleteRcvFilesDeleted
@@ -3084,7 +3108,8 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), THandlePar
30843108 throwE e
30853109 agentClientMsg :: TVar ChaChaDRG -> ByteString -> AM (Maybe (InternalId , MsgMeta , AMessage , CR. RatchetX448 ))
30863110 agentClientMsg g encryptedMsgHash = withStore c $ \ db -> runExceptT $ do
3087- rc <- ExceptT $ getRatchet db connId -- ratchet state pre-decryption - required for processing EREADY
3111+ liftIO $ lockConnForUpdate db connId
3112+ rc <- ExceptT $ getRatchetForUpdate db connId -- ratchet state pre-decryption - required for processing EREADY
30883113 (agentMsgBody, pqEncryption) <- agentRatchetDecrypt' g db connId rc encAgentMessage
30893114 liftEither (parse smpP (SEAgentError $ AGENT A_MESSAGE ) agentMsgBody) >>= \ case
30903115 agentMsg@ (AgentMessage APrivHeader {sndMsgId, prevMsgHash} aMessage) -> do
@@ -3331,6 +3356,7 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), THandlePar
33313356 Just sqs' -> do
33323357 (sq_@ SndQueue {sndPrivateKey}, dhPublicKey) <- lift $ newSndQueue userId connId qInfo Nothing
33333358 sq2 <- withStore c $ \ db -> do
3359+ lockConnForUpdate db connId
33343360 liftIO $ mapM_ (deleteConnSndQueue db connId) delSqs
33353361 addConnSndQueue db connId (sq_ :: NewSndQueue ) {primary = True , dbReplaceQueueId = Just dbQueueId}
33363362 logServer " <--" c srv rId $ " MSG <QADD>:" <> logSecret' srvMsgId <> " " <> logSecret (senderId queueAddress)
@@ -3635,7 +3661,7 @@ agentRatchetEncrypt db cData msg getPaddedLen pqEnc_ currentE2EVersion = do
36353661
36363662agentRatchetEncryptHeader :: DB. Connection -> ConnData -> (VersionSMPA -> PQSupport -> Int ) -> Maybe PQEncryption -> CR. VersionE2E -> ExceptT StoreError IO (CR. MsgEncryptKeyX448 , Int , PQEncryption )
36373663agentRatchetEncryptHeader db ConnData {connId, connAgentVersion = v, pqSupport} getPaddedLen pqEnc_ currentE2EVersion = do
3638- rc <- ExceptT $ getRatchet db connId
3664+ rc <- ExceptT $ getRatchetForUpdate db connId
36393665 let paddedLen = getPaddedLen v pqSupport
36403666 (mek, rc') <- withExceptT (SEAgentError . cryptoError) $ CR. rcEncryptHeader rc pqEnc_ currentE2EVersion
36413667 liftIO $ updateRatchet db connId rc' CR. SMDNoChange
@@ -3644,7 +3670,7 @@ agentRatchetEncryptHeader db ConnData {connId, connAgentVersion = v, pqSupport}
36443670-- encoded EncAgentMessage -> encoded AgentMessage
36453671agentRatchetDecrypt :: TVar ChaChaDRG -> DB. Connection -> ConnId -> ByteString -> ExceptT StoreError IO (ByteString , PQEncryption )
36463672agentRatchetDecrypt g db connId encAgentMsg = do
3647- rc <- ExceptT $ getRatchet db connId
3673+ rc <- ExceptT $ getRatchetForUpdate db connId
36483674 agentRatchetDecrypt' g db connId rc encAgentMsg
36493675
36503676agentRatchetDecrypt' :: TVar ChaChaDRG -> DB. Connection -> ConnId -> CR. RatchetX448 -> ByteString -> ExceptT StoreError IO (ByteString , PQEncryption )
0 commit comments