This document proposes the architecture and design for integrating the Resumable Upload Protocol (RUP) into gax-java and the GAPIC code generator.
- Veneer and GAPIC Aligned: The solution must integrate cleanly into the existing
Callableframework ofgax-java. - Stream-Safe Retries: Java
InputStreamis forward-only. The design must provide a clean abstraction (InputStreamProvider) to recreate or seek the stream during recovery. - Double-Loop Retry & Recovery: Implements the precise Category 1 (transient) and Category 2 (state consistency) error classification with backoffs as described in the RUP specifications.
- Progress Reporting: Supports asynchronous progress updates via a simple callback mechanism.
- No unnecessary chunking: By default, uploads send the remaining bytes in one request (avoiding unnecessary memory buffering or chunk management).
We introduce a new callable type and request/response wrappers in com.google.api.gax.rpc.
To support seeking/rewinding, the stream source is wrapped in a functional interface that can supply fresh streams on retry:
package com.google.api.gax.rpc;
import java.io.IOException;
import java.io.InputStream;
/** Provides a fresh {@link InputStream} for retriable upload operations. */
@FunctionalInterface
public interface InputStreamProvider {
/** Returns a new {@link InputStream}. */
InputStream get() throws IOException;
}package com.google.api.gax.rpc;
/** Listener for tracking progress of a resumable upload. */
public interface ResumableUploadProgressListener {
enum State {
NOT_STARTED,
IN_PROGRESS,
RECOVERING,
COMPLETED,
FAILED,
CANCELLED
}
void onProgress(ResumableUploadStatus status);
}
/** Status details for progress updates. */
public final class ResumableUploadStatus {
private final long bytesUploaded;
private final long totalBytes;
private final ResumableUploadProgressListener.State state;
public ResumableUploadStatus(long bytesUploaded, long totalBytes, ResumableUploadProgressListener.State state) {
this.bytesUploaded = bytesUploaded;
this.totalBytes = totalBytes;
this.state = state;
}
public long getBytesUploaded() { return bytesUploaded; }
public long getTotalBytes() { return totalBytes; }
public ResumableUploadProgressListener.State getState() { return state; }
}package com.google.api.gax.rpc;
import com.google.common.base.Preconditions;
public final class ResumableUploadRequest<RequestT> {
private final RequestT request;
private final InputStreamProvider streamProvider;
private final long totalBytes; // -1 if unknown
private final ResumableUploadProgressListener progressListener;
private ResumableUploadRequest(Builder<RequestT> builder) {
this.request = Preconditions.checkNotNull(builder.request);
this.streamProvider = Preconditions.checkNotNull(builder.streamProvider);
this.totalBytes = builder.totalBytes;
this.progressListener = builder.progressListener;
}
public RequestT getRequest() { return request; }
public InputStreamProvider getStreamProvider() { return streamProvider; }
public long getTotalBytes() { return totalBytes; }
public ResumableUploadProgressListener getProgressListener() { return progressListener; }
public static <RequestT> Builder<RequestT> newBuilder() {
return new Builder<>();
}
public static class Builder<RequestT> {
private RequestT request;
private InputStreamProvider streamProvider;
private long totalBytes = -1;
private ResumableUploadProgressListener progressListener;
public Builder<RequestT> setRequest(RequestT request) {
this.request = request;
return this;
}
public Builder<RequestT> setStreamProvider(InputStreamProvider streamProvider) {
this.streamProvider = streamProvider;
return this;
}
public Builder<RequestT> setTotalBytes(long totalBytes) {
this.totalBytes = totalBytes;
return this;
}
public Builder<RequestT> setProgressListener(ResumableUploadProgressListener progressListener) {
this.progressListener = progressListener;
return this;
}
public ResumableUploadRequest<RequestT> build() {
return new ResumableUploadRequest<>(this);
}
}
}package com.google.api.gax.rpc;
import com.google.api.core.ApiFuture;
public abstract class ResumableUploadCallable<RequestT, ResponseT> {
protected ResumableUploadCallable() {}
public abstract ApiFuture<ResponseT> futureCall(
ResumableUploadRequest<RequestT> request, ApiCallContext context);
public ResponseT call(ResumableUploadRequest<RequestT> request, ApiCallContext context) {
return ApiExceptions.callAndTranslateCharSequenceException(futureCall(request, context));
}
public ResponseT call(ResumableUploadRequest<RequestT> request) {
return call(request, null);
}
}The transport layer executes the actual HTTP protocol calls using the Google HTTP Client.
We introduce HttpJsonResumableUploadCall to coordinate the resumable upload state machine.
private enum ErrorCategory {
CATEGORY_1_TRANSIENT, // 429, 500, 502, 503, 504, TCP/Socket Timeout
CATEGORY_2_MISMATCH, // 400, 412, 416
CATEGORY_3_FATAL // 401, 403, 404, etc.
}
private ErrorCategory getErrorCategory(Throwable t) {
if (t instanceof HttpResponseException) {
int statusCode = ((HttpResponseException) t).getStatusCode();
if (statusCode == 429 || statusCode >= 500) {
return ErrorCategory.CATEGORY_1_TRANSIENT;
}
if (statusCode == 400 || statusCode == 412 || statusCode == 416) {
return ErrorCategory.CATEGORY_2_MISMATCH;
}
}
if (t instanceof IOException) {
// Socket timeouts, connection drops
return ErrorCategory.CATEGORY_1_TRANSIENT;
}
return ErrorCategory.CATEGORY_3_FATAL;
}The HttpJsonResumableUploadCall runs inside the user's thread (or client executor pool for future execution) and implements the following flow:
stateDiagram-v2
[*] --> StartSession
StartSession --> UploadLoop : Success (200 OK + active)
StartSession --> StartSession : Cat 1 Transient (Backoff)
StartSession --> [*] : Cat 3 Fatal / Deadline Exceeded
state UploadLoop {
[*] --> OpenStream
OpenStream --> SkipToOffset
SkipToOffset --> TransmitChunk
TransmitChunk --> [*] : Success (final)
TransmitChunk --> QueryState : Cat 2 Mismatch / Socket Drop
TransmitChunk --> TransmitChunk : Cat 1 Transient (Backoff)
}
UploadLoop --> [*] : Success
UploadLoop --> [*] : Cat 3 Fatal / Deadline Exceeded
state QueryState {
[*] --> SendQuery
SendQuery --> ResumeUpload : Success (active + new offset)
SendQuery --> [*] : Success (final)
SendQuery --> SendQuery : Cat 1 Transient (Backoff)
SendQuery --> [*] : Cat 3 Fatal
}
QueryState --> UploadLoop : Resume
- Build standard headers + merge user-provided metadata.
- Pre-emptively prefix headers that affect physical bodies (
Content-Length,Content-Type, etc.) withX-Goog-Upload-Header-. - Set
X-Goog-Upload-Protocol: resumableandX-Goog-Upload-Command: start. - Execute POST with the request JSON body.
- Extract
X-Goog-Upload-URLheader value to obtain theuploadUrl.
- Check absolute global deadline.
- Call
streamProvider.get(). - Skip/seek to current
offset. - Set
X-Goog-Upload-Command: upload, finalizeandX-Goog-Upload-Offset: offset. - Stream payload using a chunked output stream, updating the progress listener during writes.
- If response is
finalwith2xx: parse response and return. - If exception occurs: Categorize exception. If Category 2 (Mismatch) or connection drop, transition to Query State.
- Execute POST to
uploadUrlwithX-Goog-Upload-Command: query. - If response is
active:- Extract
X-Goog-Upload-Size-Received->newOffset. - If
newOffset == offset: apply backoff (to avoid spamming server). - Update
offset = newOffsetand transition back to Upload Loop.
- Extract
- If response is
final: return response. - If Category 1 (Transient) error: retry query with backoff.
- If Category 3 (Fatal) error: fail immediately.