7979import io .grpc .protobuf .ProtoUtils ;
8080import io .grpc .stub .StreamObserver ;
8181import java .io .ByteArrayOutputStream ;
82- import java .io .IOException ;
8382import java .nio .ByteBuffer ;
8483import java .nio .channels .Channels ;
8584import java .nio .channels .ScatteringByteChannel ;
9998import java .util .concurrent .ExecutorService ;
10099import java .util .concurrent .Executors ;
101100import java .util .concurrent .Future ;
102- import java .util .concurrent .Semaphore ;
103101import java .util .concurrent .TimeUnit ;
104102import java .util .concurrent .atomic .AtomicInteger ;
105- import java .util .concurrent .atomic .AtomicReference ;
106103import java .util .function .Consumer ;
107104import java .util .stream .Collectors ;
108105import java .util .stream .Stream ;
111108import org .junit .function .ThrowingRunnable ;
112109
113110public final class ITObjectReadSessionFakeTest {
114-
115111 private static final Metadata .Key <com .google .rpc .Status > GRPC_STATUS_DETAILS_KEY =
116112 Metadata .Key .of (
117113 "grpc-status-details-bin" ,
@@ -1557,7 +1553,6 @@ public void expectRetryForRangeWithFailedChecksumValidation_channel() throws Exc
15571553 @ Test
15581554 public void serverOutOfRangeIsNotRetried () throws Exception {
15591555 ChecksummedTestContent expected = ChecksummedTestContent .of (ALL_OBJECT_BYTES , 10 , 20 );
1560- Semaphore sem = new Semaphore (1 );
15611556
15621557 BidiReadObjectResponse dataResp =
15631558 BidiReadObjectResponse .newBuilder ()
@@ -1572,7 +1567,14 @@ public void serverOutOfRangeIsNotRetried() throws Exception {
15721567 AtomicInteger bidiReadObjectCount = new AtomicInteger ();
15731568 ExecutorService exec =
15741569 Executors .newCachedThreadPool (
1575- new ThreadFactoryBuilder ().setDaemon (true ).setNameFormat ("test-server-%d" ).build ());
1570+ new ThreadFactoryBuilder ().setDaemon (true ).setNameFormat ("exec-%d" ).build ());
1571+
1572+ // The test will submit 4 different reads to the server, we want to wait until all 4 are
1573+ // received by the server before sending any responses.
1574+ CountDownLatch serverWaitCdl = new CountDownLatch (4 );
1575+ // Then, we want the test to wait for all read responses to be returned from the server before
1576+ // beginning assertions.
1577+ CountDownLatch testWaitCdl = new CountDownLatch (4 );
15761578
15771579 StorageImplBase fakeStorage =
15781580 new StorageImplBase () {
@@ -1589,13 +1591,16 @@ public void onNext(BidiReadObjectRequest request) {
15891591 exec .submit (
15901592 () -> {
15911593 try {
1592- sem .acquire ();
1594+ // when receiving a request on the stream for the valid range
1595+ // send it to a background thread that will wait for all reads to be setup
1596+ serverWaitCdl .await ();
15931597 BidiReadObjectResponse .Builder b = dataResp .toBuilder ();
15941598 ReadRange readRange = request .getReadRangesList ().get (0 );
15951599 ObjectRangeData .Builder bb = dataResp .getObjectDataRanges (0 ).toBuilder ();
15961600 bb .getReadRangeBuilder ().setReadId (readRange .getReadId ());
15971601 b .setObjectDataRanges (0 , bb .build ());
15981602 respond .onNext (b .build ());
1603+ testWaitCdl .countDown ();
15991604 } catch (InterruptedException e ) {
16001605 respond .onError (
16011606 TestUtils .apiException (Code .UNIMPLEMENTED , e .getMessage ()));
@@ -1641,6 +1646,7 @@ public void onNext(BidiReadObjectRequest request) {
16411646 StatusRuntimeException statusRuntimeException =
16421647 Status .OUT_OF_RANGE .withDescription (message ).asRuntimeException (trailers );
16431648 respond .onError (statusRuntimeException );
1649+ testWaitCdl .countDown ();
16441650 } else {
16451651 respond .onError (
16461652 apiException (
@@ -1668,69 +1674,77 @@ public void onCompleted() {
16681674
16691675 BlobId id = BlobId .of ("b" , "o" );
16701676
1671- try (BlobReadSession session = storage .blobReadSession (id ).get (5 , TimeUnit .SECONDS )) {
1677+ // define the number of seconds our futures are willing to wait before timeout.
1678+ // In general everything should resolve in a small number of millis, this is more of a
1679+ // safeguard to prevent the whole suite hanging if there is an issue.
1680+ int timeoutSeconds = 5 ;
1681+ try (BlobReadSession session =
1682+ storage .blobReadSession (id ).get (timeoutSeconds , TimeUnit .SECONDS )) {
16721683
1673- ApiFuture <byte []> shouldSucceedFuture =
1684+ ApiFuture <byte []> expectSuccessFuture =
16741685 session .readAs (
16751686 ReadProjectionConfigs .asFutureBytes ().withRangeSpec (RangeSpec .of (10 , 20 )));
1687+ serverWaitCdl .countDown ();
16761688
1677- ApiFuture <byte []> shouldFailFuture =
1689+ ApiFuture <byte []> expectFailureFuture =
16781690 session .readAs (
16791691 ReadProjectionConfigs .asFutureBytes ().withRangeSpec (RangeSpec .beginAt (37 )));
1680- ExecutionException exceptionFromFuture =
1681- assertThrows (
1682- ExecutionException .class , () -> shouldFailFuture .get (30 , TimeUnit .SECONDS ));
1683- sem .release ();
1684-
1685- Exception exceptionFromChannel ;
1686- byte [] bytesFromFuture = shouldSucceedFuture .get (30 , TimeUnit .SECONDS );
1692+ serverWaitCdl .countDown ();
16871693
1688- AtomicReference < byte []> bytesFromChannel = new AtomicReference <> ();
1689- Future <Long > asyncShouldSucceedChannel =
1694+ ReadAsChannel readAsChannel = ReadProjectionConfigs . asChannel ();
1695+ Future <byte []> expectSuccessChannel =
16901696 exec .submit (
16911697 () -> {
16921698 try (ScatteringByteChannel succeed =
1693- session .readAs (
1694- ReadProjectionConfigs .asChannel ().withRangeSpec (RangeSpec .of (10 , 20 )))) {
1695- sem .acquire ();
1699+ session .readAs (readAsChannel .withRangeSpec (RangeSpec .of (10 , 20 )))) {
1700+ serverWaitCdl .countDown ();
16961701 ByteArrayOutputStream baos = new ByteArrayOutputStream ();
1697- long copy = ByteStreams .copy (succeed , Channels .newChannel (baos ));
1698- bytesFromChannel .set (baos .toByteArray ());
1699- return copy ;
1700- } catch (IOException e ) {
1701- throw new RuntimeException (e );
1702+ ByteStreams .copy (succeed , Channels .newChannel (baos ));
1703+ return baos .toByteArray ();
17021704 }
17031705 });
17041706
1705- try (ScatteringByteChannel fail =
1706- session .readAs (
1707- ReadProjectionConfigs .asChannel ().withRangeSpec (RangeSpec .beginAt (39 )))) {
1708- exceptionFromChannel =
1709- assertThrows (
1710- IOException .class ,
1711- () -> {
1712- int read = 0 ;
1707+ Future <Integer > expectFailureChannel =
1708+ exec .submit (
1709+ () -> {
1710+ try (ScatteringByteChannel fail =
1711+ session .readAs (readAsChannel .withRangeSpec (RangeSpec .beginAt (39 )))) {
1712+ serverWaitCdl .countDown ();
1713+ int read ;
17131714 do {
17141715 read = fail .read (ByteBuffer .allocate (1 ));
17151716 } while (read == 0 );
1716- });
1717- sem .release ();
1718- }
1719- asyncShouldSucceedChannel .get (30 , TimeUnit .SECONDS );
1720- Exception finalExceptionFromChannel = exceptionFromChannel ;
1717+ return read ;
1718+ }
1719+ });
1720+
1721+ boolean await = testWaitCdl .await (timeoutSeconds , TimeUnit .SECONDS );
1722+ assertThat (await ).isTrue ();
1723+ ExecutionException exceptionFromFuture =
1724+ assertThrows (
1725+ ExecutionException .class ,
1726+ () -> expectFailureFuture .get (timeoutSeconds , TimeUnit .SECONDS ));
1727+ byte [] bytesFromFuture = expectSuccessFuture .get (timeoutSeconds , TimeUnit .SECONDS );
1728+ ExecutionException finalExceptionFromChannel =
1729+ assertThrows (
1730+ ExecutionException .class ,
1731+ () -> expectFailureChannel .get (timeoutSeconds , TimeUnit .SECONDS ));
1732+ byte [] bytesFromChannel = expectSuccessChannel .get (timeoutSeconds , TimeUnit .SECONDS );
1733+
17211734 assertAll (
17221735 () ->
1723- assertThat (exceptionFromFuture )
1724- .hasCauseThat ()
1736+ assertThat (exceptionFromFuture ) // ExecutionException
1737+ .hasCauseThat () // StorageException
17251738 .hasCauseThat ()
17261739 .isInstanceOf (OutOfRangeException .class ),
17271740 () ->
1728- assertThat (finalExceptionFromChannel )
1729- .hasCauseThat ()
1741+ assertThat (finalExceptionFromChannel ) // ExecutionException
1742+ .hasCauseThat () // IOException
1743+ .hasCauseThat () // StorageException
17301744 .hasCauseThat ()
17311745 .isInstanceOf (OutOfRangeException .class ),
17321746 () -> assertThat (xxd (bytesFromFuture )).isEqualTo (xxd (expected .getBytes ())),
1733- () -> assertThat (xxd (bytesFromChannel . get () )).isEqualTo (xxd (expected .getBytes ())));
1747+ () -> assertThat (xxd (bytesFromChannel )).isEqualTo (xxd (expected .getBytes ())));
17341748 }
17351749 }
17361750 }
0 commit comments