3434import static io .rsocket .frame .FrameType .REQUEST_N ;
3535import static io .rsocket .frame .FrameType .REQUEST_RESPONSE ;
3636import static io .rsocket .frame .FrameType .REQUEST_STREAM ;
37- import static org .hamcrest .MatcherAssert .assertThat ;
38- import static org .hamcrest .Matchers .anyOf ;
39- import static org .hamcrest .Matchers .empty ;
40- import static org .hamcrest .Matchers .is ;
37+ import static org .assertj .core .api .Assertions .assertThat ;
4138
4239import io .netty .buffer .ByteBuf ;
4340import io .netty .buffer .ByteBufAllocator ;
7673import java .util .concurrent .atomic .AtomicBoolean ;
7774import java .util .concurrent .atomic .AtomicReference ;
7875import java .util .stream .Stream ;
79- import org .assertj .core .api .Assertions ;
8076import org .assertj .core .api .Assumptions ;
8177import org .junit .jupiter .api .AfterEach ;
8278import org .junit .jupiter .api .BeforeEach ;
@@ -126,12 +122,13 @@ public void testHandleKeepAlive() {
126122 rule .connection .addToReceivedBuffer (
127123 KeepAliveFrameCodec .encode (rule .alloc (), true , 0 , Unpooled .EMPTY_BUFFER ));
128124 ByteBuf sent = rule .connection .awaitFrame ();
129- assertThat ("Unexpected frame sent." , frameType (sent ), is (FrameType .KEEPALIVE ));
125+ assertThat (frameType (sent ))
126+ .describedAs ("Unexpected frame sent." )
127+ .isEqualTo (FrameType .KEEPALIVE );
130128 /*Keep alive ack must not have respond flag else, it will result in infinite ping-pong of keep alive frames.*/
131- assertThat (
132- "Unexpected keep-alive frame respond flag." ,
133- KeepAliveFrameCodec .respondFlag (sent ),
134- is (false ));
129+ assertThat (KeepAliveFrameCodec .respondFlag (sent ))
130+ .describedAs ("Unexpected keep-alive frame respond flag." )
131+ .isEqualTo (false );
135132 }
136133
137134 @ Test
@@ -149,10 +146,9 @@ public Mono<Payload> requestResponse(Payload payload) {
149146 });
150147 rule .sendRequest (streamId , FrameType .REQUEST_RESPONSE );
151148 testPublisher .complete ();
152- assertThat (
153- "Unexpected frame sent." ,
154- frameType (rule .connection .awaitFrame ()),
155- anyOf (is (FrameType .COMPLETE ), is (FrameType .NEXT_COMPLETE )));
149+ assertThat (frameType (rule .connection .awaitFrame ()))
150+ .describedAs ("Unexpected frame sent." )
151+ .isIn (FrameType .COMPLETE , FrameType .NEXT_COMPLETE );
156152 testPublisher .assertWasNotCancelled ();
157153 }
158154
@@ -162,8 +158,9 @@ public void testHandlerEmitsError() {
162158 final int streamId = 4 ;
163159 rule .prefetch = 1 ;
164160 rule .sendRequest (streamId , FrameType .REQUEST_STREAM );
165- assertThat (
166- "Unexpected frame sent." , frameType (rule .connection .awaitFrame ()), is (FrameType .ERROR ));
161+ assertThat (frameType (rule .connection .awaitFrame ()))
162+ .describedAs ("Unexpected frame sent." )
163+ .isEqualTo (FrameType .ERROR );
167164 }
168165
169166 @ Test
@@ -182,12 +179,12 @@ public Mono<Payload> requestResponse(Payload payload) {
182179 });
183180 rule .sendRequest (streamId , FrameType .REQUEST_RESPONSE );
184181
185- assertThat ("Unexpected frame sent." , rule . connection . getSent (), is ( empty ()) );
182+ assertThat (rule . connection . getSent ()). describedAs ( "Unexpected frame sent." ). isEmpty ( );
186183
187184 rule .connection .addToReceivedBuffer (CancelFrameCodec .encode (allocator , streamId ));
188185
189- assertThat ("Unexpected frame sent." , rule . connection . getSent (), is ( empty ()) );
190- assertThat ("Subscription not cancelled." , cancelled . get (), is ( true ) );
186+ assertThat (rule . connection . getSent ()). describedAs ( "Unexpected frame sent." ). isEmpty ( );
187+ assertThat (cancelled . get ()). describedAs ( "Subscription not cancelled." ). isTrue ( );
191188 rule .assertHasNoLeaks ();
192189 }
193190
@@ -243,7 +240,7 @@ protected void hookOnSubscribe(Subscription subscription) {
243240 for (Runnable runnable : runnables ) {
244241 rule .connection .clearSendReceiveBuffers ();
245242 runnable .run ();
246- Assertions . assertThat (rule .connection .getSent ())
243+ assertThat (rule .connection .getSent ())
247244 .hasSize (1 )
248245 .first ()
249246 .matches (bb -> FrameHeaderCodec .frameType (bb ) == FrameType .ERROR )
@@ -253,7 +250,7 @@ protected void hookOnSubscribe(Subscription subscription) {
253250 .contains (String .format (INVALID_PAYLOAD_ERROR_MESSAGE , maxFrameLength )))
254251 .matches (ReferenceCounted ::release );
255252
256- assertThat ("Subscription not cancelled." , cancelled . get (), is ( true ) );
253+ assertThat (cancelled . get ()). describedAs ( "Subscription not cancelled." ). isTrue ( );
257254 }
258255
259256 rule .assertHasNoLeaks ();
@@ -308,9 +305,9 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
308305 sink .tryEmitEmpty ();
309306 });
310307
311- Assertions . assertThat (assertSubscriber .values ()).allMatch (ReferenceCounted ::release );
308+ assertThat (assertSubscriber .values ()).allMatch (ReferenceCounted ::release );
312309
313- Assertions . assertThat (rule .connection .getSent ()).allMatch (ReferenceCounted ::release );
310+ assertThat (rule .connection .getSent ()).allMatch (ReferenceCounted ::release );
314311
315312 rule .assertHasNoLeaks ();
316313 testRequestInterceptor .expectOnStart (1 , REQUEST_CHANNEL ).expectOnComplete (1 ).expectNothing ();
@@ -353,7 +350,7 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
353350 sink .complete ();
354351 });
355352
356- Assertions . assertThat (rule .connection .getSent ()).allMatch (ReferenceCounted ::release );
353+ assertThat (rule .connection .getSent ()).allMatch (ReferenceCounted ::release );
357354
358355 rule .assertHasNoLeaks ();
359356 testRequestInterceptor .expectOnStart (1 , REQUEST_CHANNEL ).expectOnCancel (1 ).expectNothing ();
@@ -398,7 +395,7 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
398395 sink .complete ();
399396 });
400397
401- Assertions . assertThat (rule .connection .getSent ()).allMatch (ReferenceCounted ::release );
398+ assertThat (rule .connection .getSent ()).allMatch (ReferenceCounted ::release );
402399 testRequestInterceptor .expectOnStart (1 , REQUEST_CHANNEL ).expectOnCancel (1 ).expectNothing ();
403400 rule .assertHasNoLeaks ();
404401 }
@@ -483,13 +480,13 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
483480 sink .error (new RuntimeException ());
484481 });
485482
486- Assertions . assertThat (rule .connection .getSent ()).allMatch (ReferenceCounted ::release );
483+ assertThat (rule .connection .getSent ()).allMatch (ReferenceCounted ::release );
487484
488485 assertSubscriber
489486 .assertTerminated ()
490487 .assertError (CancellationException .class )
491488 .assertErrorMessage ("Outbound has terminated with an error" );
492- Assertions . assertThat (assertSubscriber .values ())
489+ assertThat (assertSubscriber .values ())
493490 .allMatch (
494491 msg -> {
495492 ReferenceCountUtil .safeRelease (msg );
@@ -531,7 +528,7 @@ public Flux<Payload> requestStream(Payload payload) {
531528 sink .next (ByteBufPayload .create ("d3" , "m3" ));
532529 });
533530
534- Assertions . assertThat (rule .connection .getSent ()).allMatch (ReferenceCounted ::release );
531+ assertThat (rule .connection .getSent ()).allMatch (ReferenceCounted ::release );
535532
536533 rule .assertHasNoLeaks ();
537534
@@ -573,15 +570,15 @@ public void subscribe(CoreSubscriber<? super Payload> actual) {
573570 sources [0 ].complete (ByteBufPayload .create ("d1" , "m1" ));
574571 });
575572
576- Assertions . assertThat (rule .connection .getSent ()).allMatch (ReferenceCounted ::release );
573+ assertThat (rule .connection .getSent ()).allMatch (ReferenceCounted ::release );
577574
578575 rule .assertHasNoLeaks ();
579576
580577 testRequestInterceptor
581578 .expectOnStart (1 , REQUEST_RESPONSE )
582579 .assertNext (
583580 e ->
584- Assertions . assertThat (e .eventType )
581+ assertThat (e .eventType )
585582 .isIn (
586583 TestRequestInterceptor .EventType .ON_COMPLETE ,
587584 TestRequestInterceptor .EventType .ON_CANCEL ))
@@ -614,7 +611,7 @@ public Flux<Payload> requestStream(Payload payload) {
614611 sink .next (ByteBufPayload .create ("d3" , "m3" ));
615612 rule .connection .addToReceivedBuffer (cancelFrame );
616613
617- Assertions . assertThat (rule .connection .getSent ()).allMatch (ReferenceCounted ::release );
614+ assertThat (rule .connection .getSent ()).allMatch (ReferenceCounted ::release );
618615
619616 rule .assertHasNoLeaks ();
620617 }
@@ -660,7 +657,7 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
660657
661658 rule .connection .addToReceivedBuffer (cancelFrame );
662659
663- Assertions . assertThat (rule .connection .getSent ()).allMatch (ReferenceCounted ::release );
660+ assertThat (rule .connection .getSent ()).allMatch (ReferenceCounted ::release );
664661
665662 rule .assertHasNoLeaks ();
666663 }
@@ -730,17 +727,15 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
730727 }
731728
732729 if (responsesCnt > 0 ) {
733- Assertions .assertThat (
734- rule .connection .getSent ().stream ().filter (bb -> frameType (bb ) != REQUEST_N ))
730+ assertThat (rule .connection .getSent ().stream ().filter (bb -> frameType (bb ) != REQUEST_N ))
735731 .describedAs (
736732 "Interaction Type :[%s]. Expected to observe %s frames sent" , frameType , responsesCnt )
737733 .hasSize (responsesCnt )
738734 .allMatch (bb -> !FrameHeaderCodec .hasMetadata (bb ));
739735 }
740736
741737 if (framesCnt > 1 ) {
742- Assertions .assertThat (
743- rule .connection .getSent ().stream ().filter (bb -> frameType (bb ) == REQUEST_N ))
738+ assertThat (rule .connection .getSent ().stream ().filter (bb -> frameType (bb ) == REQUEST_N ))
744739 .describedAs (
745740 "Interaction Type :[%s]. Expected to observe single RequestN(%s) frame" ,
746741 frameType , framesCnt - 1 )
@@ -749,9 +744,9 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
749744 .matches (bb -> RequestNFrameCodec .requestN (bb ) == (framesCnt - 1 ));
750745 }
751746
752- Assertions . assertThat (rule .connection .getSent ()).allMatch (ReferenceCounted ::release );
747+ assertThat (rule .connection .getSent ()).allMatch (ReferenceCounted ::release );
753748
754- Assertions . assertThat (assertSubscriber .awaitAndAssertNextValueCount (framesCnt ).values ())
749+ assertThat (assertSubscriber .awaitAndAssertNextValueCount (framesCnt ).values ())
755750 .hasSize (framesCnt )
756751 .allMatch (p -> !p .hasMetadata ())
757752 .allMatch (ReferenceCounted ::release );
@@ -796,7 +791,7 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
796791
797792 rule .sendRequest (1 , frameType );
798793
799- Assertions . assertThat (rule .connection .getSent ())
794+ assertThat (rule .connection .getSent ())
800795 .hasSize (1 )
801796 .first ()
802797 .matches (
@@ -837,13 +832,13 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
837832 rule .connection .addToReceivedBuffer (
838833 ErrorFrameCodec .encode (rule .alloc (), 1 , new RuntimeException ("test" )));
839834
840- Assertions . assertThat (rule .connection .getSent ())
835+ assertThat (rule .connection .getSent ())
841836 .hasSize (1 )
842837 .first ()
843838 .matches (bb -> FrameHeaderCodec .frameType (bb ) == REQUEST_N )
844839 .matches (ReferenceCounted ::release );
845840
846- Assertions . assertThat (rule .socket .isDisposed ()).isFalse ();
841+ assertThat (rule .socket .isDisposed ()).isFalse ();
847842 testPublisher .assertWasCancelled ();
848843
849844 rule .assertHasNoLeaks ();
0 commit comments