Skip to content

Commit b720966

Browse files
committed
Remove modeling of client connection ID in the QUIC scheme
1 parent b125ad1 commit b720966

5 files changed

Lines changed: 67 additions & 104 deletions

File tree

packages/network-transport-quic/src/Network/Transport/QUIC/Internal.hs

Lines changed: 29 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,11 @@ import Control.Monad (unless, when)
3434
import Data.Bifunctor (Bifunctor (first))
3535
import Data.Binary qualified as Binary (decodeOrFail)
3636
import Data.ByteString (ByteString, fromStrict)
37-
import Data.Foldable (forM_)
3837
import Data.Function ((&))
3938
import Data.Functor ((<&>))
4039
import Data.IORef (newIORef, readIORef, writeIORef)
4140
import Data.List.NonEmpty (NonEmpty)
4241
import Data.Map.Strict qualified as Map
43-
import Data.Maybe (isNothing)
4442
import Lens.Micro.Platform ((+~))
4543
import Network.QUIC qualified as QUIC
4644
import Network.TLS (Credential)
@@ -62,8 +60,7 @@ import Network.Transport
6260
)
6361
import Network.Transport.QUIC.Internal.Configuration (credentialLoadX509)
6462
import Network.Transport.QUIC.Internal.Messaging
65-
( ClientConnId,
66-
MessageReceived (..),
63+
( MessageReceived (..),
6764
createConnectionId,
6865
decodeMessage,
6966
encodeMessage,
@@ -101,7 +98,6 @@ import Network.Transport.QUIC.Internal.QUICTransport
10198
nextSelfConnOutId,
10299
remoteEndPointAddress,
103100
remoteEndPointState,
104-
remoteIncoming,
105101
remoteServerConnId,
106102
remoteStream,
107103
transportConfig,
@@ -182,17 +178,15 @@ handleNewStream quicTransport stream = do
182178
(remoteEndPoint, _) <- either throwIO pure =<< createRemoteEndPoint ourEndPoint remoteAddress Incoming
183179
doneMVar <- newEmptyMVar
184180

185-
clientConnId <- either (throwIO . userError) (pure . fromIntegral) =<< recvWord32 stream
186181
let serverConnId = remoteServerConnId remoteEndPoint
187-
connectionId = createConnectionId serverConnId clientConnId
182+
-- One logical connection per stream; clientConnId is always 0.
183+
connectionId = createConnectionId serverConnId 0
188184

189185
let st =
190186
RemoteEndPointValid $
191187
ValidRemoteEndPointState
192188
{ _remoteStream = stream,
193-
_remoteStreamIsClosed = doneMVar,
194-
_remoteIncoming = Just clientConnId,
195-
_remoteNextConnOutId = 0
189+
_remoteStreamIsClosed = doneMVar
196190
}
197191
modifyMVar_
198192
(remoteEndPoint ^. remoteEndPointState)
@@ -253,12 +247,8 @@ handleIncomingMessages ourEndPoint remoteEndPoint =
253247
release (Left err) = closeRemoteEndPoint Incoming remoteEndPoint >> prematureExit err
254248
release (Right _) = closeRemoteEndPoint Incoming remoteEndPoint
255249

256-
connectionId = createConnectionId serverConnId
257-
258-
writeConnectionClosedSTM connId =
259-
writeTQueue
260-
ourQueue
261-
(ConnectionClosed (connectionId connId))
250+
-- One logical connection per stream; clientConnId is always 0.
251+
connectionId = createConnectionId serverConnId 0
262252

263253
go = either prematureExit loop
264254

@@ -268,34 +258,31 @@ handleIncomingMessages ourEndPoint remoteEndPoint =
268258
Left errmsg -> do
269259
-- Throwing will trigger 'prematureExit'
270260
throwIO $ userError $ "(handleIncomingMessages) Failed with: " <> errmsg
271-
Right (Message connId bytes) -> handleMessage connId bytes >> loop stream
261+
Right (Message bytes) -> handleMessage bytes >> loop stream
272262
Right StreamClosed -> throwIO $ userError "(handleIncomingMessages) Stream closed"
273-
Right (CloseConnection connId) -> do
274-
atomically (writeConnectionClosedSTM connId)
263+
Right CloseConnection -> do
264+
atomically (writeTQueue ourQueue (ConnectionClosed connectionId))
275265
mAct <- modifyMVar (remoteEndPoint ^. remoteEndPointState) $ \case
276266
RemoteEndPointInit -> pure (RemoteEndPointClosed, Nothing)
277267
RemoteEndPointClosed -> pure (RemoteEndPointClosed, Nothing)
278-
RemoteEndPointValid (ValidRemoteEndPointState _ isClosed _ _) -> do
268+
RemoteEndPointValid (ValidRemoteEndPointState _ isClosed) -> do
279269
pure (RemoteEndPointClosed, Just $ putMVar isClosed ())
280270
case mAct of
281271
Nothing -> pure ()
282272
Just cleanup -> cleanup
283273
Right CloseEndPoint -> do
284-
connIds <- modifyMVar (remoteEndPoint ^. remoteEndPointState) $ \case
285-
RemoteEndPointValid vst -> do
286-
pure (RemoteEndPointClosed, vst ^. remoteIncoming)
287-
other -> pure (other, Nothing)
288-
unless
289-
(isNothing connIds)
290-
( atomically $
291-
forM_
292-
connIds
293-
(writeTQueue ourQueue . ConnectionClosed . connectionId)
294-
)
274+
-- handleIncomingMessages only runs on incoming remote endpoints, so if
275+
-- the state was still Valid there is exactly one logical connection to
276+
-- surface as closed.
277+
wasValid <- modifyMVar (remoteEndPoint ^. remoteEndPointState) $ \case
278+
RemoteEndPointValid _ -> pure (RemoteEndPointClosed, True)
279+
other -> pure (other, False)
280+
when wasValid $
281+
atomically $ writeTQueue ourQueue (ConnectionClosed connectionId)
295282

296-
handleMessage :: ClientConnId -> [ByteString] -> IO ()
297-
handleMessage clientConnId payload =
298-
atomically (writeTQueue ourQueue (Received (connectionId clientConnId) payload))
283+
handleMessage :: [ByteString] -> IO ()
284+
handleMessage payload =
285+
atomically (writeTQueue ourQueue (Received connectionId payload))
299286

300287
prematureExit :: IOException -> IO ()
301288
prematureExit exc = do
@@ -363,24 +350,24 @@ newConnection ourEndPoint creds validateCreds remoteAddress _reliability _connec
363350
else
364351
createConnectionTo creds validateCreds ourEndPoint remoteAddress >>= \case
365352
Left err -> pure $ Left err
366-
Right (remoteEndPoint, connId) -> do
353+
Right remoteEndPoint -> do
367354
connAlive <- newIORef True
368355
pure
369356
. Right
370357
$ Connection
371-
{ send = sendConn remoteEndPoint connAlive connId,
372-
close = closeConn remoteEndPoint connAlive connId
358+
{ send = sendConn remoteEndPoint connAlive,
359+
close = closeConn remoteEndPoint connAlive
373360
}
374361
where
375362
ourAddress = ourEndPoint ^. localAddress
376-
sendConn remoteEndPoint connAlive connId packets =
363+
sendConn remoteEndPoint connAlive packets =
377364
readMVar (remoteEndPoint ^. remoteEndPointState) >>= \case
378365
RemoteEndPointInit -> undefined
379366
RemoteEndPointValid vst ->
380367
readIORef connAlive >>= \case
381368
False -> pure . Left $ TransportError SendClosed "Connection closed"
382369
True ->
383-
sendMessage (vst ^. remoteStream) connId packets
370+
sendMessage (vst ^. remoteStream) packets
384371
<&> first (TransportError SendFailed . show)
385372
RemoteEndPointClosed -> do
386373
readIORef connAlive >>= \case
@@ -390,9 +377,9 @@ newConnection ourEndPoint creds validateCreds remoteAddress _reliability _connec
390377
-- 'connAlive' IORefs.
391378
False -> pure . Left $ TransportError SendClosed "Connection closed"
392379
True -> pure . Left $ TransportError SendFailed "Remote endpoint closed"
393-
closeConn remoteEndPoint connAlive connId = do
380+
closeConn remoteEndPoint connAlive = do
394381
mCleanup <- modifyMVar (remoteEndPoint ^. remoteEndPointState) $ \case
395-
RemoteEndPointValid vst@(ValidRemoteEndPointState stream isClosed _ _) -> do
382+
RemoteEndPointValid vst@(ValidRemoteEndPointState stream isClosed) -> do
396383
readIORef connAlive >>= \case
397384
False -> pure (RemoteEndPointValid vst, Nothing)
398385
True -> do
@@ -401,7 +388,7 @@ newConnection ourEndPoint creds validateCreds remoteAddress _reliability _connec
401388
-- safe against races with the finally in streamToEndpoint that can
402389
-- also signal isClosed on QUIC.Client.run exit.
403390
let cleanup = do
404-
_ <- sendCloseConnection connId stream
391+
_ <- sendCloseConnection stream
405392
_ <- tryPutMVar isClosed ()
406393
pure ()
407394
pure (RemoteEndPointClosed, Just cleanup)

packages/network-transport-quic/src/Network/Transport/QUIC/Internal/Client.hs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import Network.QUIC qualified as QUIC
1717
import Network.QUIC.Client qualified as QUIC.Client
1818
import Network.Transport (ConnectErrorCode (ConnectNotFound), EndPointAddress, TransportError (..))
1919
import Network.Transport.QUIC.Internal.Configuration (Credential, mkClientConfig)
20-
import Network.Transport.QUIC.Internal.Messaging (ClientConnId, MessageReceived (..), handshake, receiveMessage)
20+
import Network.Transport.QUIC.Internal.Messaging (MessageReceived (..), handshake, receiveMessage)
2121
import Network.Transport.QUIC.Internal.QUICAddr (QUICAddr (QUICAddr), decodeQUICAddr)
2222

2323
streamToEndpoint ::
@@ -28,8 +28,6 @@ streamToEndpoint ::
2828
EndPointAddress ->
2929
-- | Their address
3030
EndPointAddress ->
31-
-- | Client-allocated connection id to send as part of the handshake
32-
ClientConnId ->
3331
-- | Called when the QUIC connection or stream ends without us having initiated the
3432
-- close. Must be idempotent (the caller typically gates on remote endpoint state so
3533
-- that repeated invocations are safe) — this handler is invoked from multiple sites
@@ -44,7 +42,7 @@ streamToEndpoint ::
4442
QUIC.Stream
4543
)
4644
)
47-
streamToEndpoint creds validateCreds ourAddress theirAddress clientConnId onConnLoss =
45+
streamToEndpoint creds validateCreds ourAddress theirAddress onConnLoss =
4846
case decodeQUICAddr theirAddress of
4947
Left errmsg -> pure $ Left (TransportError ConnectNotFound errmsg)
5048
Right (QUICAddr hostname servicename _) -> do
@@ -58,7 +56,7 @@ streamToEndpoint creds validateCreds ourAddress theirAddress clientConnId onConn
5856
QUIC.waitEstablished conn
5957
restore $
6058
bracket (QUIC.stream conn) QUIC.closeStream $ \stream -> do
61-
handshake (ourAddress, theirAddress) clientConnId stream
59+
handshake (ourAddress, theirAddress) stream
6260
>>= either
6361
(\_ -> putMVar streamMVar (Left $ TransportError ConnectNotFound "handshake failed"))
6462
(\_ -> putMVar streamMVar (Right stream))
@@ -104,7 +102,7 @@ streamToEndpoint creds validateCreds ourAddress theirAddress clientConnId onConn
104102
Right StreamClosed -> mask_ $ do
105103
_ <- tryPutMVar doneMVar ()
106104
onConnLoss
107-
Right (CloseConnection _) ->
105+
Right CloseConnection ->
108106
-- Peer closed the logical connection cleanly; no ErrorEvent.
109107
() <$ tryPutMVar doneMVar ()
110108
Right CloseEndPoint -> mask_ $ do

packages/network-transport-quic/src/Network/Transport/QUIC/Internal/Messaging.hs

Lines changed: 16 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -50,20 +50,19 @@ import Network.Transport (ConnectionId, EndPointAddress)
5050
import Network.Transport.Internal (decodeWord32, encodeWord32)
5151
import Network.Transport.QUIC.Internal.QUICAddr (QUICAddr (QUICAddr), decodeQUICAddr)
5252

53-
-- | Send a message to a remote endpoint ID
53+
-- | Send a message on the stream.
5454
--
5555
-- This function is thread-safe; while the data is sending, asynchronous
5656
-- exceptions are masked, to be rethrown after the data is sent.
5757
sendMessage ::
5858
Stream ->
59-
ClientConnId ->
6059
[ByteString] ->
6160
IO (Either QUIC.QUICException ())
62-
sendMessage stream connId messages =
61+
sendMessage stream messages =
6362
try
6463
( QUIC.sendStreamMany
6564
stream
66-
(encodeMessage connId messages)
65+
(encodeMessage messages)
6766
)
6867

6968
-- | Receive a message, including its local destination endpoint ID
@@ -87,18 +86,15 @@ receiveMessage stream = mask $ \restore ->
8786
-- The encoding is composed of a header, and the payloads.
8887
-- The message header is composed of:
8988
-- 1. A control byte, to determine how the message should be parsed.
90-
-- 2. A 32-bit word that encodes the endpoint ID of the destination endpoint;
91-
-- 3. A 32-bit word that encodes the number of frames in the message
89+
-- 2. A 32-bit word that encodes the number of frames in the message
9290
--
9391
-- The payload frames are each prepended with the length of the frame.
9492
encodeMessage ::
95-
ClientConnId ->
9693
[ByteString] ->
9794
[ByteString]
98-
encodeMessage connId messages =
95+
encodeMessage messages =
9996
BS.concat
10097
[ BS.singleton messageControlByte,
101-
encodeWord32 (fromIntegral connId),
10298
encodeWord32 (fromIntegral $ length messages)
10399
]
104100
: [encodeWord32 (fromIntegral $ BS.length message) <> message | message <- messages]
@@ -116,13 +112,12 @@ decodeMessage get =
116112
where
117113
go ctrl
118114
| ctrl == closeEndPointControlByte = pure $ Right CloseEndPoint
119-
| ctrl == closeConnectionControlByte = Right . CloseConnection . fromIntegral <$> getWord32
115+
| ctrl == closeConnectionControlByte = pure $ Right CloseConnection
120116
| ctrl == messageControlByte = do
121-
connId <- getWord32
122117
numMessages <- getWord32
123118
messages <- replicateM (fromIntegral numMessages) $ do
124119
getWord32 >>= get . fromIntegral
125-
pure . Right $ Message (fromIntegral connId) messages
120+
pure . Right $ Message messages
126121
| otherwise = pure $ Left $ "Unsupported control byte: " <> show ctrl
127122
getWord32 = get 4 <&> decodeWord32
128123

@@ -146,10 +141,8 @@ getAllBytes get n = go n mempty
146141
else go (m - BS.length bytes) (acc <> [bytes])
147142

148143
data MessageReceived
149-
= Message
150-
{-# UNPACK #-} !ClientConnId
151-
{-# UNPACK #-} ![ByteString]
152-
| CloseConnection !ClientConnId
144+
= Message {-# UNPACK #-} ![ByteString]
145+
| CloseConnection
153146
| CloseEndPoint
154147
| StreamClosed
155148
deriving (Show, Eq)
@@ -220,13 +213,12 @@ closeConnectionControlByte :: ControlByte
220213
closeConnectionControlByte = 255
221214

222215
-- | Send a message to close the connection.
223-
sendCloseConnection :: ClientConnId -> Stream -> IO (Either QUIC.QUICException ())
224-
sendCloseConnection connId stream =
216+
sendCloseConnection :: Stream -> IO (Either QUIC.QUICException ())
217+
sendCloseConnection stream =
225218
try
226219
( QUIC.sendStream
227220
stream
228-
( BS.concat [BS.singleton closeConnectionControlByte, encodeWord32 (fromIntegral connId)]
229-
)
221+
(BS.singleton closeConnectionControlByte)
230222
)
231223

232224
-- | Send a message to close the connection.
@@ -240,23 +232,21 @@ sendCloseEndPoint stream =
240232
)
241233

242234
-- | Handshake protocol that a client, connecting to a remote endpoint,
243-
-- has to perform. Two round-trips:
235+
-- has to perform:
244236
--
245237
-- 1. client -> server: address payload
246238
-- 2. server -> client: ack1 (handshake payload accepted)
247-
-- 3. client -> server: clientConnId
248-
-- 4. server -> client: ack2 (ConnectionOpened has been enqueued on server's endpoint)
239+
-- 3. server -> client: ack2 (ConnectionOpened has been enqueued on server's endpoint)
249240
--
250241
-- The ack2 step is load-bearing: when this function returns, the server has
251242
-- already written ConnectionOpened to its local queue. Without it, the server's
252243
-- @connect@ would return before the peer's queue has the ConnectionOpened event,
253244
-- which races with subsequent sends on other connections.
254245
handshake ::
255246
(EndPointAddress, EndPointAddress) ->
256-
ClientConnId ->
257247
Stream ->
258248
IO (Either () ())
259-
handshake (ourAddress, theirAddress) clientConnId stream =
249+
handshake (ourAddress, theirAddress) stream =
260250
case decodeQUICAddr theirAddress of
261251
Left errmsg -> throwIO $ userError ("Could not decode QUIC address: " <> errmsg)
262252
Right (QUICAddr _ _ serverEndPointId) -> do
@@ -273,12 +263,7 @@ handshake (ourAddress, theirAddress) clientConnId stream =
273263
Right _ ->
274264
recvAck stream >>= \case
275265
Left () -> pure $ Left ()
276-
Right () ->
277-
try @SomeException
278-
(QUIC.sendStream stream (BS.toStrict (Binary.encode clientConnId)))
279-
>>= \case
280-
Left _ -> pure $ Left ()
281-
Right () -> recvAck stream
266+
Right () -> recvAck stream
282267

283268
-- | Part of the connection ID that is client-allocated.
284269
newtype ClientConnId = ClientConnId Word32

0 commit comments

Comments
 (0)