Skip to content

Commit c39d0cd

Browse files
authored
chore: improve read object range error handling and retry robustness (googleapis#3021)
When a read causes an error it is delivered as a stream error. Depending on the kind of error (DATA_LOSS OUT_OF_RANGE) this could lead to a terminal state for the stream. If the error is for a specific range(s) mark those individual reads as terminally failed, but do not fail any others. Instead, raise a client side retryable ABORTED to restart the stream and any remaining reads for it.
1 parent c857619 commit c39d0cd

4 files changed

Lines changed: 281 additions & 34 deletions

File tree

google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import com.google.api.gax.retrying.BasicResultRetryAlgorithm;
3737
import com.google.api.gax.retrying.ResultRetryAlgorithm;
3838
import com.google.api.gax.retrying.RetrySettings;
39+
import com.google.api.gax.rpc.AbortedException;
3940
import com.google.api.gax.rpc.BidiStreamingCallable;
4041
import com.google.api.gax.rpc.ClientContext;
4142
import com.google.api.gax.rpc.HeaderProvider;
@@ -1309,11 +1310,12 @@ private ReadObjectRangeResultRetryAlgorithmDecorator(ResultRetryAlgorithm<?> del
13091310
}
13101311

13111312
@Override
1312-
public boolean shouldRetry(Throwable previousThrowable, Object previousResponse) {
1313+
public boolean shouldRetry(Throwable t, Object previousResponse) {
13131314
// this is only retryable with read object range, not other requests
1314-
return previousThrowable instanceof UncheckedChecksumMismatchException
1315-
|| previousThrowable instanceof OutOfRangeException
1316-
|| delegate.shouldRetry(StorageException.coalesce(previousThrowable), null);
1315+
return t instanceof UncheckedChecksumMismatchException
1316+
|| (t instanceof OutOfRangeException && ((OutOfRangeException) t).isRetryable())
1317+
|| (t instanceof AbortedException && ((AbortedException) t).isRetryable())
1318+
|| delegate.shouldRetry(StorageException.coalesce(t), null);
13171319
}
13181320
}
13191321
}

google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionStream.java

Lines changed: 42 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -377,37 +377,52 @@ public void onResponse(BidiReadObjectResponse response) {
377377
@Override
378378
public void onError(Throwable t) {
379379
requestStream = null;
380-
try {
381-
BidiReadObjectError error = GrpcUtils.getBidiReadObjectError(t);
382-
if (error == null) {
383-
return;
384-
}
380+
BidiReadObjectError error = GrpcUtils.getBidiReadObjectError(t);
381+
if (error == null) {
382+
// if there isn't a BidiReadObjectError that may contain more narrow failures, propagate
383+
// the failure as is to the stream.
384+
streamRetryContext.recordError(
385+
t, ObjectReadSessionStream.this::restart, ObjectReadSessionStream.this::failAll);
386+
return;
387+
}
385388

386-
List<ReadRangeError> rangeErrors = error.getReadRangeErrorsList();
387-
if (rangeErrors.isEmpty()) {
388-
return;
389-
}
390-
for (ReadRangeError rangeError : rangeErrors) {
391-
Status status = rangeError.getStatus();
392-
long id = rangeError.getReadId();
393-
ObjectReadSessionStreamRead<?> read = state.getOutstandingRead(id);
394-
if (read == null) {
395-
continue;
396-
}
397-
// mark read as failed, but don't resolve its future now. Schedule the delivery of the
398-
// failure in executor to ensure any downstream future doesn't block this IO thread.
399-
read.preFail();
400-
executor.execute(
401-
StorageException.liftToRunnable(
402-
() ->
403-
state
404-
.removeOutstandingReadOnFailure(id, read::fail)
405-
.onFailure(GrpcUtils.statusToApiException(status))));
406-
}
407-
} finally {
389+
List<ReadRangeError> rangeErrors = error.getReadRangeErrorsList();
390+
if (rangeErrors.isEmpty()) {
391+
// if there aren't any specific read id's that contain errors, propagate the error as is to
392+
// the stream.
408393
streamRetryContext.recordError(
409394
t, ObjectReadSessionStream.this::restart, ObjectReadSessionStream.this::failAll);
395+
return;
396+
}
397+
for (ReadRangeError rangeError : rangeErrors) {
398+
Status status = rangeError.getStatus();
399+
long id = rangeError.getReadId();
400+
ObjectReadSessionStreamRead<?> read = state.getOutstandingRead(id);
401+
if (read == null) {
402+
continue;
403+
}
404+
// mark read as failed, but don't resolve its future now. Schedule the delivery of the
405+
// failure in executor to ensure any downstream future doesn't block this IO thread.
406+
read.preFail();
407+
executor.execute(
408+
StorageException.liftToRunnable(
409+
() ->
410+
state
411+
.removeOutstandingReadOnFailure(id, read::fail)
412+
.onFailure(GrpcUtils.statusToApiException(status))));
410413
}
414+
// now that we've failed specific reads, raise a retryable ABORTED error to the stream to
415+
// cause it to retry and pending remaining reads.
416+
ApiException apiException =
417+
ApiExceptionFactory.createException(
418+
"Stream error, reclassifying as ABORTED for reads not specified in BidiReadObjectError",
419+
t,
420+
GrpcStatusCode.of(Code.ABORTED),
421+
true);
422+
streamRetryContext.recordError(
423+
apiException,
424+
ObjectReadSessionStream.this::restart,
425+
ObjectReadSessionStream.this::failAll);
411426
}
412427

413428
private OnSuccess restartReadFromCurrentOffset(long id) {

google-cloud-storage/src/test/java/com/google/cloud/storage/ITObjectReadSessionFakeTest.java

Lines changed: 200 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,11 @@
5353
import com.google.common.io.BaseEncoding;
5454
import com.google.common.io.ByteStreams;
5555
import com.google.common.truth.Correspondence;
56+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
5657
import com.google.protobuf.Any;
5758
import com.google.protobuf.ByteString;
5859
import com.google.protobuf.TextFormat;
60+
import com.google.rpc.DebugInfo;
5961
import com.google.storage.v2.BidiReadHandle;
6062
import com.google.storage.v2.BidiReadObjectError;
6163
import com.google.storage.v2.BidiReadObjectRedirectedError;
@@ -77,6 +79,7 @@
7779
import io.grpc.protobuf.ProtoUtils;
7880
import io.grpc.stub.StreamObserver;
7981
import java.io.ByteArrayOutputStream;
82+
import java.io.IOException;
8083
import java.nio.ByteBuffer;
8184
import java.nio.channels.Channels;
8285
import java.nio.channels.ScatteringByteChannel;
@@ -86,13 +89,20 @@
8689
import java.util.Collections;
8790
import java.util.HashSet;
8891
import java.util.List;
92+
import java.util.Locale;
8993
import java.util.Map;
94+
import java.util.Optional;
9095
import java.util.Set;
9196
import java.util.UUID;
9297
import java.util.concurrent.CountDownLatch;
9398
import java.util.concurrent.ExecutionException;
99+
import java.util.concurrent.ExecutorService;
100+
import java.util.concurrent.Executors;
101+
import java.util.concurrent.Future;
102+
import java.util.concurrent.Semaphore;
94103
import java.util.concurrent.TimeUnit;
95104
import java.util.concurrent.atomic.AtomicInteger;
105+
import java.util.concurrent.atomic.AtomicReference;
96106
import java.util.function.Consumer;
97107
import java.util.stream.Collectors;
98108
import java.util.stream.Stream;
@@ -1376,9 +1386,6 @@ public void gettingSessionFromFastOpenKeepsTheSessionOpenUntilClosed() throws Ex
13761386
.build())
13771387
.build();
13781388

1379-
System.out.println("req1 = " + TextFormat.printer().shortDebugString(req1));
1380-
System.out.println("req2 = " + TextFormat.printer().shortDebugString(req2));
1381-
System.out.println("req3 = " + TextFormat.printer().shortDebugString(req3));
13821389
ImmutableMap<BidiReadObjectRequest, BidiReadObjectResponse> db =
13831390
ImmutableMap.<BidiReadObjectRequest, BidiReadObjectResponse>builder()
13841391
.put(req1, res1)
@@ -1538,6 +1545,196 @@ public void expectRetryForRangeWithFailedChecksumValidation_channel() throws Exc
15381545
}
15391546
}
15401547

1548+
/**
1549+
* Define a test where multiple reads for the same session will be performed, and some of those
1550+
* reads cause OUT_OF_RANGE errors.
1551+
*
1552+
* <p>An OUT_OF_RANGE error is delivered as a stream level status, which means any reads which
1553+
* share a stream must be restarted while the read that caused the OUT_OF_RANGE should be failed.
1554+
*
1555+
* <p>Verify this behavior for both channel based and future byte[] based.
1556+
*/
1557+
@Test
1558+
public void serverOutOfRangeIsNotRetried() throws Exception {
1559+
ChecksummedTestContent expected = ChecksummedTestContent.of(ALL_OBJECT_BYTES, 10, 20);
1560+
Semaphore sem = new Semaphore(1);
1561+
1562+
BidiReadObjectResponse dataResp =
1563+
BidiReadObjectResponse.newBuilder()
1564+
.addObjectDataRanges(
1565+
ObjectRangeData.newBuilder()
1566+
.setChecksummedData(expected.asChecksummedData())
1567+
.setReadRange(getReadRange(0, 10, 20))
1568+
.setRangeEnd(true)
1569+
.build())
1570+
.build();
1571+
1572+
AtomicInteger bidiReadObjectCount = new AtomicInteger();
1573+
ExecutorService exec =
1574+
Executors.newCachedThreadPool(
1575+
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("test-server-%d").build());
1576+
1577+
StorageImplBase fakeStorage =
1578+
new StorageImplBase() {
1579+
@Override
1580+
public StreamObserver<BidiReadObjectRequest> bidiReadObject(
1581+
StreamObserver<BidiReadObjectResponse> respond) {
1582+
bidiReadObjectCount.getAndIncrement();
1583+
return new StreamObserver<BidiReadObjectRequest>() {
1584+
@Override
1585+
public void onNext(BidiReadObjectRequest request) {
1586+
if (request.equals(REQ_OPEN)) {
1587+
respond.onNext(RES_OPEN);
1588+
} else if (request.getReadRangesList().get(0).getReadOffset() == 10) {
1589+
exec.submit(
1590+
() -> {
1591+
try {
1592+
sem.acquire();
1593+
BidiReadObjectResponse.Builder b = dataResp.toBuilder();
1594+
ReadRange readRange = request.getReadRangesList().get(0);
1595+
ObjectRangeData.Builder bb = dataResp.getObjectDataRanges(0).toBuilder();
1596+
bb.getReadRangeBuilder().setReadId(readRange.getReadId());
1597+
b.setObjectDataRanges(0, bb.build());
1598+
respond.onNext(b.build());
1599+
} catch (InterruptedException e) {
1600+
respond.onError(
1601+
TestUtils.apiException(Code.UNIMPLEMENTED, e.getMessage()));
1602+
}
1603+
});
1604+
} else if (bidiReadObjectCount.getAndIncrement() >= 1) {
1605+
Optional<ReadRange> readRange = request.getReadRangesList().stream().findFirst();
1606+
String message =
1607+
String.format(
1608+
Locale.US,
1609+
"OUT_OF_RANGE read_offset = %d",
1610+
readRange.map(ReadRange::getReadOffset).orElse(0L));
1611+
long readId = readRange.map(ReadRange::getReadId).orElse(0L);
1612+
1613+
BidiReadObjectError err2 =
1614+
BidiReadObjectError.newBuilder()
1615+
.addReadRangeErrors(
1616+
ReadRangeError.newBuilder()
1617+
.setReadId(readId)
1618+
.setStatus(
1619+
com.google.rpc.Status.newBuilder()
1620+
.setCode(com.google.rpc.Code.OUT_OF_RANGE_VALUE)
1621+
.build())
1622+
.build())
1623+
.build();
1624+
1625+
com.google.rpc.Status grpcStatusDetails =
1626+
com.google.rpc.Status.newBuilder()
1627+
.setCode(com.google.rpc.Code.UNAVAILABLE_VALUE)
1628+
.setMessage("fail read_id: " + readId)
1629+
.addDetails(Any.pack(err2))
1630+
.addDetails(
1631+
Any.pack(
1632+
DebugInfo.newBuilder()
1633+
.setDetail(message)
1634+
.addStackEntries(
1635+
TextFormat.printer().shortDebugString(request))
1636+
.build()))
1637+
.build();
1638+
1639+
Metadata trailers = new Metadata();
1640+
trailers.put(GRPC_STATUS_DETAILS_KEY, grpcStatusDetails);
1641+
StatusRuntimeException statusRuntimeException =
1642+
Status.OUT_OF_RANGE.withDescription(message).asRuntimeException(trailers);
1643+
respond.onError(statusRuntimeException);
1644+
} else {
1645+
respond.onError(
1646+
apiException(
1647+
Code.UNIMPLEMENTED,
1648+
"Unexpected request { "
1649+
+ TextFormat.printer().shortDebugString(request)
1650+
+ " }"));
1651+
}
1652+
}
1653+
1654+
@Override
1655+
public void onError(Throwable t) {
1656+
respond.onError(t);
1657+
}
1658+
1659+
@Override
1660+
public void onCompleted() {
1661+
respond.onCompleted();
1662+
}
1663+
};
1664+
}
1665+
};
1666+
try (FakeServer fakeServer = FakeServer.of(fakeStorage);
1667+
Storage storage = fakeServer.getGrpcStorageOptions().getService()) {
1668+
1669+
BlobId id = BlobId.of("b", "o");
1670+
1671+
try (BlobReadSession session = storage.blobReadSession(id).get(5, TimeUnit.SECONDS)) {
1672+
1673+
ApiFuture<byte[]> shouldSucceedFuture =
1674+
session.readAs(
1675+
ReadProjectionConfigs.asFutureBytes().withRangeSpec(RangeSpec.of(10, 20)));
1676+
1677+
ApiFuture<byte[]> shouldFailFuture =
1678+
session.readAs(
1679+
ReadProjectionConfigs.asFutureBytes().withRangeSpec(RangeSpec.beginAt(37)));
1680+
ExecutionException exceptionFromFuture =
1681+
assertThrows(
1682+
ExecutionException.class, () -> shouldFailFuture.get(30, TimeUnit.SECONDS));
1683+
sem.release();
1684+
1685+
Exception exceptionFromChannel;
1686+
byte[] bytesFromFuture = shouldSucceedFuture.get(30, TimeUnit.SECONDS);
1687+
1688+
AtomicReference<byte[]> bytesFromChannel = new AtomicReference<>();
1689+
Future<Long> asyncShouldSucceedChannel =
1690+
exec.submit(
1691+
() -> {
1692+
try (ScatteringByteChannel succeed =
1693+
session.readAs(
1694+
ReadProjectionConfigs.asChannel().withRangeSpec(RangeSpec.of(10, 20)))) {
1695+
sem.acquire();
1696+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
1697+
long copy = ByteStreams.copy(succeed, Channels.newChannel(baos));
1698+
bytesFromChannel.set(baos.toByteArray());
1699+
return copy;
1700+
} catch (IOException e) {
1701+
throw new RuntimeException(e);
1702+
}
1703+
});
1704+
1705+
try (ScatteringByteChannel fail =
1706+
session.readAs(
1707+
ReadProjectionConfigs.asChannel().withRangeSpec(RangeSpec.beginAt(39)))) {
1708+
exceptionFromChannel =
1709+
assertThrows(
1710+
IOException.class,
1711+
() -> {
1712+
int read = 0;
1713+
do {
1714+
read = fail.read(ByteBuffer.allocate(1));
1715+
} while (read == 0);
1716+
});
1717+
sem.release();
1718+
}
1719+
asyncShouldSucceedChannel.get(30, TimeUnit.SECONDS);
1720+
Exception finalExceptionFromChannel = exceptionFromChannel;
1721+
assertAll(
1722+
() ->
1723+
assertThat(exceptionFromFuture)
1724+
.hasCauseThat()
1725+
.hasCauseThat()
1726+
.isInstanceOf(OutOfRangeException.class),
1727+
() ->
1728+
assertThat(finalExceptionFromChannel)
1729+
.hasCauseThat()
1730+
.hasCauseThat()
1731+
.isInstanceOf(OutOfRangeException.class),
1732+
() -> assertThat(xxd(bytesFromFuture)).isEqualTo(xxd(expected.getBytes())),
1733+
() -> assertThat(xxd(bytesFromChannel.get())).isEqualTo(xxd(expected.getBytes())));
1734+
}
1735+
}
1736+
}
1737+
15411738
private static void runTestAgainstFakeServer(
15421739
FakeStorage fakeStorage, RangeSpec range, ChecksummedTestContent expected) throws Exception {
15431740

0 commit comments

Comments
 (0)