Skip to content

Commit 4922812

Browse files
committed
Merge branch 'ep/fix-async-race' into ep/conc-msgs
2 parents 906da42 + 3175ae3 commit 4922812

2 files changed

Lines changed: 57 additions & 58 deletions

File tree

src/Simplex/Messaging/Agent.hs

Lines changed: 33 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ module Simplex.Messaging.Agent
5050
deleteUser,
5151
setUserService,
5252
connRequestPQSupport,
53+
prepareConnectionToCreate,
5354
createConnectionAsync,
5455
setConnShortLinkAsync,
5556
getConnShortLinkAsync,
@@ -362,8 +363,14 @@ setUserService c = withAgentEnv c .: setUserService' c
362363
{-# INLINE setUserService #-}
363364

364365
-- | Create SMP agent connection (NEW command) asynchronously, synchronous response is new connection id
365-
createConnectionAsync :: ConnectionModeI c => AgentClient -> UserId -> ACorrId -> Bool -> SConnectionMode c -> CR.InitialKeys -> SubscriptionMode -> AE ConnId
366-
createConnectionAsync c userId aCorrId enableNtfs = withAgentEnv c .:. newConnAsync c userId aCorrId enableNtfs
366+
-- | Create SMP agent connection without queue (to be used with createConnectionAsync).
367+
prepareConnectionToCreate :: ConnectionModeI c => AgentClient -> UserId -> Bool -> SConnectionMode c -> PQSupport -> AE ConnId
368+
prepareConnectionToCreate c userId enableNtfs cMode pqSup = withAgentEnv c $ newConnNoQueues c userId enableNtfs cMode pqSup
369+
{-# INLINE prepareConnectionToCreate #-}
370+
371+
-- | Enqueue NEW command for a prepared connection.
372+
createConnectionAsync :: ConnectionModeI c => AgentClient -> ACorrId -> ConnId -> Bool -> SConnectionMode c -> CR.InitialKeys -> SubscriptionMode -> AE ()
373+
createConnectionAsync c aCorrId connId enableNtfs = withAgentEnv c .:. newConnAsync c aCorrId connId enableNtfs
367374
{-# INLINE createConnectionAsync #-}
368375

369376
-- | Create or update user's contact connection short link (LSET command) asynchronously, no synchronous response
@@ -376,20 +383,21 @@ getConnShortLinkAsync :: AgentClient -> UserId -> ACorrId -> Maybe ConnId -> Con
376383
getConnShortLinkAsync c = withAgentEnv c .:: getConnShortLinkAsync' c
377384
{-# INLINE getConnShortLinkAsync #-}
378385

379-
-- | Join SMP agent connection (JOIN command) asynchronously, synchronous response is new connection id.
380-
-- If connId is provided (for contact URIs), it updates the existing connection record created by getConnShortLinkAsync.
381-
joinConnectionAsync :: AgentClient -> UserId -> ACorrId -> Maybe ConnId -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> AE ConnId
382-
joinConnectionAsync c userId aCorrId connId_ enableNtfs = withAgentEnv c .:: joinConnAsync c userId aCorrId connId_ enableNtfs
386+
-- | Enqueue JOIN command for a prepared connection.
387+
joinConnectionAsync :: ConnectionModeI c => AgentClient -> ACorrId -> Bool -> ConnId -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> AE ()
388+
joinConnectionAsync c aCorrId updateConn connId enableNtfs cReqUri cInfo pqSup subMode =
389+
withAgentEnv c $ joinConnAsync c aCorrId updateConn connId enableNtfs cReqUri cInfo pqSup subMode
383390
{-# INLINE joinConnectionAsync #-}
384391

385392
-- | Allow connection to continue after CONF notification (LET command), no synchronous response
386393
allowConnectionAsync :: AgentClient -> ACorrId -> ConnId -> ConfirmationId -> ConnInfo -> AE ()
387394
allowConnectionAsync c = withAgentEnv c .:: allowConnectionAsync' c
388395
{-# INLINE allowConnectionAsync #-}
389396

390-
-- | Accept contact after REQ notification (ACPT command) asynchronously, synchronous response is new connection id
391-
acceptContactAsync :: AgentClient -> UserId -> ACorrId -> Bool -> ConfirmationId -> ConnInfo -> PQSupport -> SubscriptionMode -> AE ConnId
392-
acceptContactAsync c userId aCorrId enableNtfs = withAgentEnv c .:: acceptContactAsync' c userId aCorrId enableNtfs
397+
-- | Accept contact after REQ notification (ACPT command) asynchronously, for a prepared connection.
398+
acceptContactAsync :: AgentClient -> ACorrId -> ConnId -> Bool -> ConfirmationId -> ConnInfo -> PQSupport -> SubscriptionMode -> AE ()
399+
acceptContactAsync c aCorrId connId enableNtfs invId ownConnInfo pqSupport subMode =
400+
withAgentEnv c $ acceptContactAsync' c aCorrId connId enableNtfs invId ownConnInfo pqSupport subMode
393401
{-# INLINE acceptContactAsync #-}
394402

395403
-- | Acknowledge message (ACK command) asynchronously, no synchronous response
@@ -842,11 +850,10 @@ setUserService' c userId enable = do
842850
unless ok $ throwE $ CMD PROHIBITED "setUserService"
843851
when (changed && not enable) $ withStore' c (`deleteClientServices` userId)
844852

845-
newConnAsync :: ConnectionModeI c => AgentClient -> UserId -> ACorrId -> Bool -> SConnectionMode c -> CR.InitialKeys -> SubscriptionMode -> AM ConnId
846-
newConnAsync c userId corrId enableNtfs cMode pqInitKeys subMode = do
847-
connId <- newConnNoQueues c userId enableNtfs cMode (CR.connPQEncryption pqInitKeys)
853+
newConnAsync :: ConnectionModeI c => AgentClient -> ACorrId -> ConnId -> Bool -> SConnectionMode c -> CR.InitialKeys -> SubscriptionMode -> AM ()
854+
newConnAsync c corrId connId enableNtfs cMode pqInitKeys subMode =
848855
enqueueCommand c corrId connId Nothing $ AClientCommand $ NEW enableNtfs (ACM cMode) pqInitKeys subMode
849-
pure connId
856+
{-# INLINE newConnAsync #-}
850857

851858
newConnNoQueues :: AgentClient -> UserId -> Bool -> SConnectionMode c -> PQSupport -> AM ConnId
852859
newConnNoQueues c userId enableNtfs cMode pqSupport = do
@@ -855,36 +862,20 @@ newConnNoQueues c userId enableNtfs cMode pqSupport = do
855862
let cData = ConnData {userId, connId = "", connAgentVersion, enableNtfs, lastExternalSndId = 0, deleted = False, ratchetSyncState = RSOk, pqSupport}
856863
withStore c $ \db -> createNewConn db g cData cMode
857864

858-
-- TODO [short links] TBC, but probably we will need async join for contact addresses as the contact will be created after user confirming the connection,
859-
-- and join should retry, the same as 1-time invitation joins.
860-
joinConnAsync :: AgentClient -> UserId -> ACorrId -> Maybe ConnId -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> AM ConnId
861-
joinConnAsync c userId corrId connId_ enableNtfs cReqUri@CRInvitationUri {} cInfo pqSup subMode = do
862-
when (isJust connId_) $ throwE $ CMD PROHIBITED "joinConnAsync: connId not allowed for invitation URI"
863-
withInvLock c (strEncode cReqUri) "joinConnAsync" $ do
864-
lift (compatibleInvitationUri cReqUri) >>= \case
865-
Just (_, Compatible (CR.E2ERatchetParams v _ _ _), Compatible connAgentVersion) -> do
866-
g <- asks random
867-
let pqSupport = pqSup `CR.pqSupportAnd` versionPQSupport_ connAgentVersion (Just v)
868-
cData = ConnData {userId, connId = "", connAgentVersion, enableNtfs, lastExternalSndId = 0, deleted = False, ratchetSyncState = RSOk, pqSupport}
869-
connId <- withStore c $ \db -> createNewConn db g cData SCMInvitation
870-
enqueueCommand c corrId connId Nothing $ AClientCommand $ JOIN enableNtfs (ACR sConnectionMode cReqUri) pqSupport subMode cInfo
871-
pure connId
872-
Nothing -> throwE $ AGENT A_VERSION
873-
joinConnAsync c userId corrId connId_ enableNtfs cReqUri@(CRContactUri _) cInfo pqSup subMode = do
865+
joinConnAsync :: ConnectionModeI c => AgentClient -> ACorrId -> Bool -> ConnId -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> AM ()
866+
joinConnAsync c corrId updateConn connId enableNtfs cReqUri@CRInvitationUri {} cInfo pqSup subMode =
867+
lift (compatibleInvitationUri cReqUri) >>= \case
868+
Just (_, Compatible (CR.E2ERatchetParams v _ _ _), Compatible connAgentVersion) -> do
869+
let pqSupport = pqSup `CR.pqSupportAnd` versionPQSupport_ connAgentVersion (Just v)
870+
when updateConn $ withStore' c $ \db -> updateNewConnJoin db connId connAgentVersion pqSupport enableNtfs
871+
enqueueCommand c corrId connId Nothing $ AClientCommand $ JOIN enableNtfs (ACR sConnectionMode cReqUri) pqSupport subMode cInfo
872+
Nothing -> throwE $ AGENT A_VERSION
873+
joinConnAsync c corrId updateConn connId enableNtfs cReqUri@(CRContactUri _) cInfo pqSup subMode =
874874
lift (compatibleContactUri cReqUri) >>= \case
875875
Just (_, Compatible connAgentVersion) -> do
876876
let pqSupport = pqSup `CR.pqSupportAnd` versionPQSupport_ connAgentVersion Nothing
877-
connId <- case connId_ of
878-
Just cId -> do
879-
-- update connection record created by getConnShortLinkAsync
880-
withStore' c $ \db -> updateNewConnJoin db cId connAgentVersion pqSupport enableNtfs
881-
pure cId
882-
Nothing -> do
883-
g <- asks random
884-
let cData = ConnData {userId, connId = "", connAgentVersion, enableNtfs, lastExternalSndId = 0, deleted = False, ratchetSyncState = RSOk, pqSupport}
885-
withStore c $ \db -> createNewConn db g cData SCMInvitation
877+
when updateConn $ withStore' c $ \db -> updateNewConnJoin db connId connAgentVersion pqSupport enableNtfs
886878
enqueueCommand c corrId connId Nothing $ AClientCommand $ JOIN enableNtfs (ACR sConnectionMode cReqUri) pqSupport subMode cInfo
887-
pure connId
888879
Nothing -> throwE $ AGENT A_VERSION
889880

890881
allowConnectionAsync' :: AgentClient -> ACorrId -> ConnId -> ConfirmationId -> ConnInfo -> AM ()
@@ -900,11 +891,11 @@ allowConnectionAsync' c corrId connId confId ownConnInfo =
900891
-- and also it can't be triggered by user concurrently several times in a row. It could be improved similarly to
901892
-- `acceptContact` by creating a new map for invitation locks and taking lock here, and removing `unacceptInvitation`
902893
-- while marking invitation as accepted inside "lock level transaction" after successful `joinConnAsync`.
903-
acceptContactAsync' :: AgentClient -> UserId -> ACorrId -> Bool -> InvitationId -> ConnInfo -> PQSupport -> SubscriptionMode -> AM ConnId
904-
acceptContactAsync' c userId corrId enableNtfs invId ownConnInfo pqSupport subMode = do
894+
acceptContactAsync' :: AgentClient -> ACorrId -> ConnId -> Bool -> InvitationId -> ConnInfo -> PQSupport -> SubscriptionMode -> AM ()
895+
acceptContactAsync' c corrId connId enableNtfs invId ownConnInfo pqSupport subMode = do
905896
Invitation {connReq} <- withStore c $ \db -> getInvitation db "acceptContactAsync'" invId
906897
withStore' c $ \db -> acceptInvitation db invId ownConnInfo
907-
joinConnAsync c userId corrId Nothing enableNtfs connReq ownConnInfo pqSupport subMode `catchAllErrors` \err -> do
898+
joinConnAsync c corrId False connId enableNtfs connReq ownConnInfo pqSupport subMode `catchAllErrors` \err -> do
908899
withStore' c (`unacceptInvitation` invId)
909900
throwE err
910901

tests/AgentTests/FunctionalAPITests.hs

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -410,13 +410,13 @@ prepareConnectionToAccept c = A.prepareConnectionToAccept (client c)
410410
allowConnectionAsync :: AgentClient -> ACorrId -> ConnId -> ConfirmationId -> ConnInfo -> AE ()
411411
allowConnectionAsync c = A.allowConnectionAsync (client c)
412412

413-
joinConnectionAsync :: AgentClient -> UserId -> ACorrId -> Maybe ConnId -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> AE ConnId
413+
joinConnectionAsync :: ConnectionModeI c => AgentClient -> ACorrId -> Bool -> ConnId -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> AE ()
414414
joinConnectionAsync c = A.joinConnectionAsync (client c)
415415

416416
sendMessagesB :: AgentClient -> [Either AgentErrorType MsgReq] -> AE [Either AgentErrorType (AgentMsgId, PQEncryption)]
417417
sendMessagesB c = A.sendMessagesB (client c)
418418

419-
createConnectionAsync :: ConnectionModeI c => AgentClient -> UserId -> ACorrId -> Bool -> SConnectionMode c -> CR.InitialKeys -> SubscriptionMode -> AE ConnId
419+
createConnectionAsync :: ConnectionModeI c => AgentClient -> ACorrId -> ConnId -> Bool -> SConnectionMode c -> CR.InitialKeys -> SubscriptionMode -> AE ()
420420
createConnectionAsync c = A.createConnectionAsync (client c)
421421

422422
testProtocolServer :: SMP.ProtocolTypeI p => AgentClient -> NetworkRequestMode -> UserId -> SMP.ProtoServerWithAuth p -> IO (Maybe ProtocolTestFailure)
@@ -428,9 +428,6 @@ getConnectionRatchetAdHash c = A.getConnectionRatchetAdHash (client c)
428428
getConnectionMessages :: AgentClient -> NonEmpty ConnMsgReq -> IO (NonEmpty (Either AgentErrorType (Maybe SMP.SMPMsgMeta)))
429429
getConnectionMessages c = A.getConnectionMessages (client c)
430430

431-
prepareConnectionLink :: AgentClient -> UserId -> C.KeyPairEd25519 -> ByteString -> Bool -> Maybe CRClientData -> AE (CreatedConnLink 'CMContact, PreparedLinkParams)
432-
prepareConnectionLink c = A.prepareConnectionLink (client c)
433-
434431
waitForUserNetwork :: AgentClient -> IO ()
435432
waitForUserNetwork = AC.waitForUserNetwork . client
436433

@@ -1539,7 +1536,8 @@ testInvitationShortLinkAsync viaProxy a b = do
15391536
connReq' `shouldBe` connReq
15401537
linkUserData connData' `shouldBe` userData
15411538
runRight $ do
1542-
aId <- A.joinConnectionAsync (client b) 1 "123" Nothing True connReq "bob's connInfo" PQSupportOn SMSubscribe
1539+
aId <- A.prepareConnectionToJoin (client b) 1 True connReq PQSupportOn
1540+
A.joinConnectionAsync (client b) "123" False aId True connReq "bob's connInfo" PQSupportOn SMSubscribe
15431541
get b =##> \case ("123", c, JOINED sndSecure) -> c == aId && sndSecure; _ -> False
15441542
("", _, CONF confId _ "bob's connInfo") <- get a
15451543
allowConnection a bId confId "alice's connInfo"
@@ -2799,10 +2797,12 @@ receiveMsg c cId msgId msg = do
27992797
testAsyncCommands :: SndQueueSecured -> AgentClient -> AgentClient -> AgentMsgId -> IO ()
28002798
testAsyncCommands sqSecured alice bob baseId =
28012799
runRight_ $ do
2802-
bobId <- createConnectionAsync alice 1 "1" True SCMInvitation IKPQOn SMSubscribe
2800+
bobId <- prepareConnectionToCreate (client alice) 1 True SCMInvitation PQSupportOn
2801+
createConnectionAsync alice "1" bobId True SCMInvitation IKPQOn SMSubscribe
28032802
("1", bobId', INV (ACR _ qInfo)) <- get alice
28042803
liftIO $ bobId' `shouldBe` bobId
2805-
aliceId <- joinConnectionAsync bob 1 "2" Nothing True qInfo "bob's connInfo" PQSupportOn SMSubscribe
2804+
aliceId <- prepareConnectionToJoin (client bob) 1 True qInfo PQSupportOn
2805+
joinConnectionAsync bob "2" False aliceId True qInfo "bob's connInfo" PQSupportOn SMSubscribe
28062806
("2", aliceId', JOINED sqSecured') <- get bob
28072807
liftIO $ do
28082808
aliceId' `shouldBe` aliceId
@@ -2893,8 +2893,8 @@ testGetConnShortLinkAsync ps = withAgentClients2 $ \alice bob ->
28932893
liftIO $ qInfo' `shouldBe` qInfo
28942894
liftIO $ userCtData' `shouldBe` userCtData
28952895
-- join connection async using connId from getConnShortLinkAsync
2896-
aliceId <- joinConnectionAsync bob 1 "2" (Just newId) True qInfo' "bob's connInfo" PQSupportOn SMSubscribe
2897-
liftIO $ aliceId `shouldBe` newId
2896+
joinConnectionAsync bob "2" True newId True qInfo' "bob's connInfo" PQSupportOn SMSubscribe
2897+
let aliceId = newId
28982898
("2", aliceId', JOINED False) <- get bob
28992899
liftIO $ aliceId' `shouldBe` aliceId
29002900
-- complete connection
@@ -2910,7 +2910,10 @@ testGetConnShortLinkAsync ps = withAgentClients2 $ \alice bob ->
29102910
testAsyncCommandsRestore :: (ASrvTransport, AStoreType) -> IO ()
29112911
testAsyncCommandsRestore ps = do
29122912
alice <- getSMPAgentClient' 1 agentCfg initAgentServers testDB
2913-
bobId <- runRight $ createConnectionAsync alice 1 "1" True SCMInvitation IKPQOn SMSubscribe
2913+
bobId <- runRight $ do
2914+
connId <- prepareConnectionToCreate (client alice) 1 True SCMInvitation PQSupportOn
2915+
createConnectionAsync alice "1" connId True SCMInvitation IKPQOn SMSubscribe
2916+
pure connId
29142917
liftIO $ noMessages alice "alice doesn't receive INV because server is down"
29152918
disposeAgentClient alice
29162919
withAgent 2 agentCfg initAgentServers testDB $ \alice' ->
@@ -2926,7 +2929,8 @@ testAcceptContactAsync sqSecured alice bob baseId =
29262929
(aliceId, sqSecuredJoin) <- joinConnection bob 1 True qInfo "bob's connInfo" SMSubscribe
29272930
liftIO $ sqSecuredJoin `shouldBe` False -- joining via contact address connection
29282931
("", _, REQ invId _ "bob's connInfo") <- get alice
2929-
bobId <- A.acceptContactAsync (client alice) 1 "1" True invId "alice's connInfo" PQSupportOn SMSubscribe
2932+
bobId <- prepareConnectionToAccept alice 1 True invId PQSupportOn
2933+
acceptContactAsync (client alice) "1" bobId True invId "alice's connInfo" PQSupportOn SMSubscribe
29302934
get alice =##> \case ("1", c, JOINED sqSecured') -> c == bobId && sqSecured' == sqSecured; _ -> False
29312935
("", _, CONF confId _ "alice's connInfo") <- get bob
29322936
allowConnection bob aliceId confId "bob's connInfo"
@@ -3197,10 +3201,12 @@ testJoinConnectionAsyncReplyErrorV8 ps@(t, ASType qsType _) = do
31973201
withAgent 1 cfg' initAgentServers testDB $ \a ->
31983202
withAgent 2 cfg' initAgentServersSrv2 testDB2 $ \b -> do
31993203
(aId, bId) <- withSmpServerStoreLogOn ps testPort $ \_ -> runRight $ do
3200-
bId <- createConnectionAsync a 1 "1" True SCMInvitation IKPQOn SMSubscribe
3204+
bId <- prepareConnectionToCreate (client a) 1 True SCMInvitation PQSupportOn
3205+
createConnectionAsync a "1" bId True SCMInvitation IKPQOn SMSubscribe
32013206
("1", bId', INV (ACR _ qInfo)) <- get a
32023207
liftIO $ bId' `shouldBe` bId
3203-
aId <- joinConnectionAsync b 1 "2" Nothing True qInfo "bob's connInfo" PQSupportOn SMSubscribe
3208+
aId <- prepareConnectionToJoin (client b) 1 True qInfo PQSupportOn
3209+
joinConnectionAsync b "2" False aId True qInfo "bob's connInfo" PQSupportOn SMSubscribe
32043210
liftIO $ threadDelay 500000
32053211
ConnectionStats {rcvQueuesInfo = [], sndQueuesInfo = [SndQueueInfo {}]} <- getConnectionServers b aId
32063212
pure (aId, bId)
@@ -3242,10 +3248,12 @@ testJoinConnectionAsyncReplyError ps@(t, ASType qsType _) = do
32423248
withAgent 1 agentCfg initAgentServers testDB $ \a ->
32433249
withAgent 2 agentCfg initAgentServersSrv2 testDB2 $ \b -> do
32443250
(aId, bId) <- withSmpServerStoreLogOn ps testPort $ \_ -> runRight $ do
3245-
bId <- createConnectionAsync a 1 "1" True SCMInvitation IKPQOn SMSubscribe
3251+
bId <- prepareConnectionToCreate (client a) 1 True SCMInvitation PQSupportOn
3252+
createConnectionAsync a "1" bId True SCMInvitation IKPQOn SMSubscribe
32463253
("1", bId', INV (ACR _ qInfo)) <- get a
32473254
liftIO $ bId' `shouldBe` bId
3248-
aId <- joinConnectionAsync b 1 "2" Nothing True qInfo "bob's connInfo" PQSupportOn SMSubscribe
3255+
aId <- prepareConnectionToJoin (client b) 1 True qInfo PQSupportOn
3256+
joinConnectionAsync b "2" False aId True qInfo "bob's connInfo" PQSupportOn SMSubscribe
32493257
liftIO $ threadDelay 500000
32503258
ConnectionStats {rcvQueuesInfo = [], sndQueuesInfo = [SndQueueInfo {}]} <- getConnectionServers b aId
32513259
pure (aId, bId)

0 commit comments

Comments
 (0)