2121import org .reactivestreams .Publisher ;
2222import reactor .core .publisher .Flux ;
2323import reactor .core .publisher .Mono ;
24- import reactor .ipc . netty .NettyContext ;
25- import reactor . ipc . netty . NettyInbound ;
26- import reactor . ipc . netty . NettyOutbound ;
24+ import reactor .netty .Connection ;
25+
26+ import java . util . Objects ;
2727
2828/** An implementation of {@link DuplexConnection} that connects via TCP. */
2929public final class TcpDuplexConnection implements DuplexConnection {
3030
31- private final NettyContext context ;
32-
33- private final NettyInbound in ;
34-
35- private final NettyOutbound out ;
31+ private final Connection connection ;
3632
3733 /**
3834 * Creates a new instance
3935 *
40- * @param in the {@link NettyInbound} to listen on
41- * @param out the {@link NettyOutbound} to send with
42- * @param context the {@link NettyContext} to for managing the server
36+ * @param connection the {@link Connection} to for managing the server
4337 */
44- public TcpDuplexConnection (NettyInbound in , NettyOutbound out , NettyContext context ) {
45- this .in = in ;
46- this .out = out ;
47- this .context = context ;
38+ public TcpDuplexConnection (Connection connection ) {
39+ this .connection = Objects .requireNonNull (connection , "connection must not be null" );
4840 }
4941
5042 @ Override
5143 public void dispose () {
52- context .dispose ();
44+ connection .dispose ();
5345 }
5446
5547 @ Override
5648 public boolean isDisposed () {
57- return context .isDisposed ();
49+ return connection .isDisposed ();
5850 }
5951
6052 @ Override
6153 public Mono <Void > onClose () {
62- return context . onClose ();
54+ return connection . onDispose ();
6355 }
6456
6557 @ Override
6658 public Flux <Frame > receive () {
67- return in .receive ().map (buf -> Frame .from (buf .retain ()));
59+ return connection . inbound () .receive ().map (buf -> Frame .from (buf .retain ()));
6860 }
6961
7062 @ Override
@@ -74,6 +66,6 @@ public Mono<Void> send(Publisher<Frame> frames) {
7466
7567 @ Override
7668 public Mono <Void > sendOne (Frame frame ) {
77- return out .sendObject (frame .content ()).then ();
69+ return connection . outbound () .sendObject (frame .content ()).then ();
7870 }
7971}
0 commit comments