Skip to content

Commit c52c30c

Browse files
committed
feat(gax): implement Scotty Resumable Upload protocol
1 parent 847c892 commit c52c30c

11 files changed

Lines changed: 1610 additions & 0 deletions

File tree

Lines changed: 252 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,252 @@
1+
# Resumable Upload Protocol (Scotty) for Java: Design Document
2+
3+
This document proposes the architecture and design for integrating the Scotty Resumable Upload Protocol (RUP) into `gax-java` and the GAPIC code generator.
4+
5+
---
6+
7+
## 1. Design Principles and Requirements
8+
9+
1. **Veneer and GAPIC Aligned**: The solution must integrate cleanly into the existing `Callable` framework of `gax-java`.
10+
2. **Stream-Safe Retries**: Java `InputStream` is forward-only. The design must provide a clean abstraction (`InputStreamProvider`) to recreate or seek the stream during recovery.
11+
3. **Double-Loop Retry & Recovery**: Implements the precise Category 1 (transient) and Category 2 (state consistency) error classification with backoffs as described in the RUP specifications.
12+
4. **Progress Reporting**: Supports asynchronous progress updates via a simple callback mechanism.
13+
5. **No unnecessary chunking**: By default, uploads send the remaining bytes in one request (avoiding unnecessary memory buffering or chunk management).
14+
15+
---
16+
17+
## 2. API Design (`gax` Changes)
18+
19+
We introduce a new callable type and request/response wrappers in `com.google.api.gax.rpc`.
20+
21+
### 2.1. `InputStreamProvider`
22+
23+
To support seeking/rewinding, the stream source is wrapped in a functional interface that can supply fresh streams on retry:
24+
25+
```java
26+
package com.google.api.gax.rpc;
27+
28+
import java.io.IOException;
29+
import java.io.InputStream;
30+
31+
/** Provides a fresh {@link InputStream} for retriable upload operations. */
32+
@FunctionalInterface
33+
public interface InputStreamProvider {
34+
/** Returns a new {@link InputStream}. */
35+
InputStream get() throws IOException;
36+
}
37+
```
38+
39+
### 2.2. Progress Listener and Status
40+
41+
```java
42+
package com.google.api.gax.rpc;
43+
44+
/** Listener for tracking progress of a resumable upload. */
45+
public interface ResumableUploadProgressListener {
46+
47+
enum State {
48+
NOT_STARTED,
49+
IN_PROGRESS,
50+
RECOVERING,
51+
COMPLETED,
52+
FAILED,
53+
CANCELLED
54+
}
55+
56+
void onProgress(ResumableUploadStatus status);
57+
}
58+
59+
/** Status details for progress updates. */
60+
public final class ResumableUploadStatus {
61+
private final long bytesUploaded;
62+
private final long totalBytes;
63+
private final ResumableUploadProgressListener.State state;
64+
65+
public ResumableUploadStatus(long bytesUploaded, long totalBytes, ResumableUploadProgressListener.State state) {
66+
this.bytesUploaded = bytesUploaded;
67+
this.totalBytes = totalBytes;
68+
this.state = state;
69+
}
70+
71+
public long getBytesUploaded() { return bytesUploaded; }
72+
public long getTotalBytes() { return totalBytes; }
73+
public ResumableUploadProgressListener.State getState() { return state; }
74+
}
75+
```
76+
77+
### 2.3. Request Wrapper: `ResumableUploadRequest`
78+
79+
```java
80+
package com.google.api.gax.rpc;
81+
82+
import com.google.common.base.Preconditions;
83+
84+
public final class ResumableUploadRequest<RequestT> {
85+
private final RequestT request;
86+
private final InputStreamProvider streamProvider;
87+
private final long totalBytes; // -1 if unknown
88+
private final ResumableUploadProgressListener progressListener;
89+
90+
private ResumableUploadRequest(Builder<RequestT> builder) {
91+
this.request = Preconditions.checkNotNull(builder.request);
92+
this.streamProvider = Preconditions.checkNotNull(builder.streamProvider);
93+
this.totalBytes = builder.totalBytes;
94+
this.progressListener = builder.progressListener;
95+
}
96+
97+
public RequestT getRequest() { return request; }
98+
public InputStreamProvider getStreamProvider() { return streamProvider; }
99+
public long getTotalBytes() { return totalBytes; }
100+
public ResumableUploadProgressListener getProgressListener() { return progressListener; }
101+
102+
public static <RequestT> Builder<RequestT> newBuilder() {
103+
return new Builder<>();
104+
}
105+
106+
public static class Builder<RequestT> {
107+
private RequestT request;
108+
private InputStreamProvider streamProvider;
109+
private long totalBytes = -1;
110+
private ResumableUploadProgressListener progressListener;
111+
112+
public Builder<RequestT> setRequest(RequestT request) {
113+
this.request = request;
114+
return this;
115+
}
116+
public Builder<RequestT> setStreamProvider(InputStreamProvider streamProvider) {
117+
this.streamProvider = streamProvider;
118+
return this;
119+
}
120+
public Builder<RequestT> setTotalBytes(long totalBytes) {
121+
this.totalBytes = totalBytes;
122+
return this;
123+
}
124+
public Builder<RequestT> setProgressListener(ResumableUploadProgressListener progressListener) {
125+
this.progressListener = progressListener;
126+
return this;
127+
}
128+
public ResumableUploadRequest<RequestT> build() {
129+
return new ResumableUploadRequest<>(this);
130+
}
131+
}
132+
}
133+
```
134+
135+
### 2.4. Callable Wrapper: `ResumableUploadCallable`
136+
137+
```java
138+
package com.google.api.gax.rpc;
139+
140+
import com.google.api.core.ApiFuture;
141+
142+
public abstract class ResumableUploadCallable<RequestT, ResponseT> {
143+
144+
protected ResumableUploadCallable() {}
145+
146+
public abstract ApiFuture<ResponseT> futureCall(
147+
ResumableUploadRequest<RequestT> request, ApiCallContext context);
148+
149+
public ResponseT call(ResumableUploadRequest<RequestT> request, ApiCallContext context) {
150+
return ApiExceptions.callAndTranslateCharSequenceException(futureCall(request, context));
151+
}
152+
153+
public ResponseT call(ResumableUploadRequest<RequestT> request) {
154+
return call(request, null);
155+
}
156+
}
157+
```
158+
159+
---
160+
161+
## 3. Transport Implementation (`gax-httpjson`)
162+
163+
The transport layer executes the actual HTTP protocol calls using the Google HTTP Client.
164+
165+
We introduce `HttpJsonResumableUploadCall` to coordinate the Scotty state machine.
166+
167+
### 3.1. Error Categorization in Java
168+
169+
```java
170+
private enum ErrorCategory {
171+
CATEGORY_1_TRANSIENT, // 429, 500, 502, 503, 504, TCP/Socket Timeout
172+
CATEGORY_2_MISMATCH, // 400, 412, 416
173+
CATEGORY_3_FATAL // 401, 403, 404, etc.
174+
}
175+
176+
private ErrorCategory getErrorCategory(Throwable t) {
177+
if (t instanceof HttpResponseException) {
178+
int statusCode = ((HttpResponseException) t).getStatusCode();
179+
if (statusCode == 429 || statusCode >= 500) {
180+
return ErrorCategory.CATEGORY_1_TRANSIENT;
181+
}
182+
if (statusCode == 400 || statusCode == 412 || statusCode == 416) {
183+
return ErrorCategory.CATEGORY_2_MISMATCH;
184+
}
185+
}
186+
if (t instanceof IOException) {
187+
// Socket timeouts, connection drops
188+
return ErrorCategory.CATEGORY_1_TRANSIENT;
189+
}
190+
return ErrorCategory.CATEGORY_3_FATAL;
191+
}
192+
```
193+
194+
### 3.2. Detailed Execution Flow (State Machine)
195+
196+
The `HttpJsonResumableUploadCall` runs inside the user's thread (or client executor pool for future execution) and implements the following flow:
197+
198+
```mermaid
199+
stateDiagram-v2
200+
[*] --> StartSession
201+
StartSession --> UploadLoop : Success (200 OK + active)
202+
StartSession --> StartSession : Cat 1 Transient (Backoff)
203+
StartSession --> [*] : Cat 3 Fatal / Deadline Exceeded
204+
205+
state UploadLoop {
206+
[*] --> OpenStream
207+
OpenStream --> SkipToOffset
208+
SkipToOffset --> TransmitChunk
209+
TransmitChunk --> [*] : Success (final)
210+
TransmitChunk --> QueryState : Cat 2 Mismatch / Socket Drop
211+
TransmitChunk --> TransmitChunk : Cat 1 Transient (Backoff)
212+
}
213+
214+
UploadLoop --> [*] : Success
215+
UploadLoop --> [*] : Cat 3 Fatal / Deadline Exceeded
216+
217+
state QueryState {
218+
[*] --> SendQuery
219+
SendQuery --> ResumeUpload : Success (active + new offset)
220+
SendQuery --> [*] : Success (final)
221+
SendQuery --> SendQuery : Cat 1 Transient (Backoff)
222+
SendQuery --> [*] : Cat 3 Fatal
223+
}
224+
225+
QueryState --> UploadLoop : Resume
226+
```
227+
228+
#### Step 1: Start Session
229+
- Build standard headers + merge user-provided metadata.
230+
- Pre-emptively prefix headers that affect physical bodies (`Content-Length`, `Content-Type`, etc.) with `X-Goog-Upload-Header-`.
231+
- Set `X-Goog-Upload-Protocol: resumable` and `X-Goog-Upload-Command: start`.
232+
- Execute POST with the request JSON body.
233+
- Extract `X-Goog-Upload-URL` header value to obtain the `uploadUrl`.
234+
235+
#### Step 2: Upload Loop (Transmit)
236+
- Check absolute global deadline.
237+
- Call `streamProvider.get()`.
238+
- Skip/seek to current `offset`.
239+
- Set `X-Goog-Upload-Command: upload, finalize` and `X-Goog-Upload-Offset: offset`.
240+
- Stream payload using a chunked output stream, updating the progress listener during writes.
241+
- If response is `final` with `2xx`: parse response and return.
242+
- If exception occurs: Categorize exception. If Category 2 (Mismatch) or connection drop, transition to **Query State**.
243+
244+
#### Step 3: Query State
245+
- Execute POST to `uploadUrl` with `X-Goog-Upload-Command: query`.
246+
- If response is `active`:
247+
- Extract `X-Goog-Upload-Size-Received` -> `newOffset`.
248+
- If `newOffset == offset`: apply backoff (to avoid spamming server).
249+
- Update `offset = newOffset` and transition back to **Upload Loop**.
250+
- If response is `final`: return response.
251+
- If Category 1 (Transient) error: retry query with backoff.
252+
- If Category 3 (Fatal) error: fail immediately.

sdk-platform-java/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonCallableFactory.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import com.google.api.gax.rpc.PagedCallSettings;
4242
import com.google.api.gax.rpc.ServerStreamingCallSettings;
4343
import com.google.api.gax.rpc.ServerStreamingCallable;
44+
import com.google.api.gax.rpc.ResumableUploadCallable;
4445
import com.google.api.gax.rpc.UnaryCallSettings;
4546
import com.google.api.gax.rpc.UnaryCallable;
4647
import com.google.api.gax.tracing.ApiTracerContext;
@@ -220,6 +221,19 @@ ServerStreamingCallable<RequestT, ResponseT> createServerStreamingCallable(
220221
return callable.withDefaultCallContext(clientContext.getDefaultCallContext());
221222
}
222223

224+
/**
225+
* Create a resumable upload callable object. Designed for use by generated code.
226+
*
227+
* @param httpJsonCallSettings the http/json call settings
228+
* @param clientContext {@link ClientContext} to use to connect to the service.
229+
* @return {@link ResumableUploadCallable} callable object.
230+
*/
231+
public static <RequestT, ResponseT> ResumableUploadCallable<RequestT, ResponseT> createResumableUploadCallable(
232+
HttpJsonCallSettings<RequestT, ResponseT> httpJsonCallSettings,
233+
ClientContext clientContext) {
234+
return new HttpJsonResumableUploadCallable<>(httpJsonCallSettings, clientContext);
235+
}
236+
223237
static ApiTracerContext getApiTracerContext(@Nonnull ApiMethodDescriptor<?, ?> methodDescriptor) {
224238
return ApiTracerContext.newBuilder()
225239
.setFullMethodName(methodDescriptor.getFullMethodName())

0 commit comments

Comments
 (0)