4949import java .nio .channels .ScatteringByteChannel ;
5050import java .util .List ;
5151import java .util .Locale ;
52+ import java .util .OptionalLong ;
5253import java .util .concurrent .ArrayBlockingQueue ;
5354import java .util .concurrent .CancellationException ;
5455import java .util .concurrent .ExecutionException ;
@@ -91,7 +92,15 @@ final class GapicUnbufferedReadableByteChannel
9192 this .result = result ;
9293 this .read = read ;
9394 this .req = req ;
94- this .hasher = hasher ;
95+ this .hasher =
96+ (req .getReadOffset () == 0 && !(hasher instanceof Hasher .NoOpHasher ))
97+ ? new CumulativeHasher (
98+ hasher ,
99+ 0 ,
100+ req .getReadLimit () <= 0
101+ ? OptionalLong .empty ()
102+ : OptionalLong .of (req .getReadLimit ()))
103+ : hasher ;
95104 this .fetchOffset = new AtomicLong (req .getReadOffset ());
96105 this .blobOffset = req .getReadOffset ();
97106 this .retrier = retrier ;
@@ -154,7 +163,7 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
154163 if (take instanceof IOException ) {
155164 IOException ioe = (IOException ) take ;
156165 if (alg .shouldRetry (ioe , null )) {
157- readObjectObserver = null ;
166+ cancelAndDrainCurrentObserver () ;
158167 continue ;
159168 } else {
160169 ioe .addSuppressed (new AsyncStorageTaskException ());
@@ -165,7 +174,7 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
165174 Throwable throwable = (Throwable ) take ;
166175 BaseServiceException coalesce = StorageException .coalesce (throwable );
167176 if (alg .shouldRetry (coalesce , null )) {
168- readObjectObserver = null ;
177+ cancelAndDrainCurrentObserver () ;
169178 continue ;
170179 } else {
171180 close ();
@@ -174,6 +183,7 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
174183 }
175184 if (take == EOF_MARKER ) {
176185 complete = true ;
186+ validateCumulativeChecksum ();
177187 break ;
178188 }
179189
@@ -240,7 +250,9 @@ private void drainQueue() throws IOException {
240250 while (queue .nonEmpty ()) {
241251 try {
242252 java .lang .Object queueValue = queue .poll ();
243- if (queueValue instanceof ReadObjectResponse ) {
253+ if (queueValue instanceof java .io .Closeable ) {
254+ ((java .io .Closeable ) queueValue ).close ();
255+ } else if (queueValue instanceof ReadObjectResponse ) {
244256 ReadObjectResponse resp = (ReadObjectResponse ) queueValue ;
245257 ResponseContentLifecycleHandle <ReadObjectResponse > handle =
246258 read .getResponseContentLifecycleManager ().get (resp );
@@ -273,6 +285,19 @@ private void drainQueue() throws IOException {
273285 }
274286 }
275287
288+ private void cancelAndDrainCurrentObserver () {
289+ if (readObjectObserver != null ) {
290+ readObjectObserver .cancel ();
291+ try {
292+ drainQueue ();
293+ } catch (IOException e ) {
294+ // drainQueue() in this context can be ignored because we are resetting the
295+ // stream.
296+ }
297+ readObjectObserver = null ;
298+ }
299+ }
300+
276301 ApiFuture <Object > getResult () {
277302 return result ;
278303 }
@@ -311,14 +336,27 @@ private IOException createError(String message) throws IOException {
311336 return new IOException (message , cause );
312337 }
313338
339+ private void validateCumulativeChecksum () throws IOException {
340+ if (hasher instanceof CumulativeHasher ) {
341+ CumulativeHasher cumulativeHasher = (CumulativeHasher ) hasher ;
342+ try {
343+ cumulativeHasher .validateCumulativeChecksum (metadata );
344+ } catch (UncheckedCumulativeChecksumMismatchException exception ) {
345+ throw new IOException (StorageException .coalesce (exception ));
346+ }
347+ }
348+ }
349+
314350 private final class ReadObjectObserver extends StateCheckingResponseObserver <ReadObjectResponse > {
315351
316352 private final SettableApiFuture <Void > open = SettableApiFuture .create ();
317353 private final SettableApiFuture <Throwable > cancellation = SettableApiFuture .create ();
318354
319355 private volatile StreamController controller ;
356+ private volatile boolean cancelled = false ;
320357
321358 void cancel () {
359+ cancelled = true ;
322360 controller .cancel ();
323361 }
324362
@@ -331,10 +369,13 @@ protected void onStartImpl(StreamController controller) {
331369
332370 @ Override
333371 protected void onResponseImpl (ReadObjectResponse response ) {
334- controller .request (1 );
335- open .set (null );
336372 try (ResponseContentLifecycleHandle <ReadObjectResponse > handle =
337373 read .getResponseContentLifecycleManager ().get (response )) {
374+ if (cancelled ) {
375+ return ;
376+ }
377+ controller .request (1 );
378+ open .set (null );
338379 ChecksummedData checksummedData = response .getChecksummedData ();
339380 ByteString content = checksummedData .getContent ();
340381 int contentSize = content .size ();
@@ -348,6 +389,8 @@ protected void onResponseImpl(ReadObjectResponse response) {
348389 queue .offer (e );
349390 return ;
350391 }
392+ } else if (hasher instanceof CumulativeHasher ) {
393+ hasher .validateUnchecked (null , content );
351394 }
352395 if (response .hasMetadata ()) {
353396 Object respMetadata = response .getMetadata ();
@@ -380,6 +423,12 @@ protected void onResponseImpl(ReadObjectResponse response) {
380423
381424 @ Override
382425 protected void onErrorImpl (Throwable t ) {
426+ if (t instanceof CancellationException ) {
427+ cancellation .set (t );
428+ }
429+ if (cancelled ) {
430+ return ;
431+ }
383432 if (t instanceof OutOfRangeException ) {
384433 try {
385434 queue .offer (EOF_MARKER );
@@ -389,17 +438,15 @@ protected void onErrorImpl(Throwable t) {
389438 throw Code .ABORTED .toStatus ().withCause (e ).asRuntimeException ();
390439 }
391440 }
392- if (t instanceof CancellationException ) {
393- cancellation .set (t );
394- }
395441 if (!open .isDone ()) {
396442 open .setException (t );
397- }
398- try {
399- queue .offer (t );
400- } catch (InterruptedException e ) {
401- Thread .currentThread ().interrupt ();
402- throw Code .ABORTED .toStatus ().withCause (e ).asRuntimeException ();
443+ } else {
444+ try {
445+ queue .offer (t );
446+ } catch (InterruptedException e ) {
447+ Thread .currentThread ().interrupt ();
448+ throw Code .ABORTED .toStatus ().withCause (e ).asRuntimeException ();
449+ }
403450 }
404451 }
405452
0 commit comments