Skip to content

Commit 0435830

Browse files
author
Test User
committed
feat: phase 2 implementation, sans-io state machine
1 parent 6308fa1 commit 0435830

File tree

3 files changed

+478
-0
lines changed

3 files changed

+478
-0
lines changed

packages/google-api-core/google/api_core/resumable_media/_common.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,14 @@
4545
# Received from the server, typically in query responses.
4646
UPLOAD_SIZE_RECEIVED_HEADER = "X-Goog-Upload-Size-Received"
4747

48+
# The header specifying the content type of the uploaded content.
49+
# Sent by the client during upload initiation.
50+
UPLOAD_CONTENT_TYPE_HEADER = "X-Goog-Upload-Header-Content-Type"
51+
52+
# The header specifying the total content length of the uploaded content.
53+
# Sent by the client during upload initiation when length is known.
54+
UPLOAD_CONTENT_LENGTH_HEADER = "X-Goog-Upload-Header-Content-Length"
55+
4856
# The header specifying the required block alignment size for chunks.
4957
# Received from the server in the start command response.
5058
UPLOAD_CHUNK_GRANULARITY_HEADER = "X-Goog-Upload-Chunk-Granularity"
@@ -53,16 +61,20 @@
5361
# Sent by the client in the `UPLOAD_PROTOCOL_HEADER`.
5462
UPLOAD_PROTOCOL_VALUE = "resumable"
5563

64+
5665
class UploadCommand:
5766
"""HTTP Header values for the Resumable Upload command header."""
67+
5868
START = "start"
5969
UPLOAD = "upload"
6070
FINALIZE = "finalize"
6171
QUERY = "query"
6272
CANCEL = "cancel"
6373

74+
6475
class UploadStatus:
6576
"""HTTP Header values for the Resumable Upload status header."""
77+
6678
ACTIVE = "active"
6779
FINAL = "final"
6880
CANCELLED = "cancelled"
Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
# Copyright 2026 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Sans-I/O Resumable Upload protocol implementation.
16+
17+
This module provides the pure state machine for Resumable Uploads.
18+
"""
19+
20+
import logging
21+
from typing import Any, Dict, Optional, Sequence, Tuple, Union
22+
23+
from google.api_core.resumable_media import _common
24+
25+
_LOGGER = logging.getLogger(__name__)
26+
27+
28+
class ResumableUpload(object):
29+
"""Sans-I/O state machine for a resumable upload.
30+
31+
This class manages the initiation, chunk boundaries, offset calculation,
32+
and recovery states for a Google Resumable Upload. It emits logical
33+
requests (method, url, headers, data) and consumes logical responses
34+
(status_code, headers).
35+
36+
Args:
37+
upload_url (str): The initial URL to start the resumable upload.
38+
chunk_size (Optional[int]): The base size of chunks in bytes. If not
39+
specified, defaults to 10MB (`_common.DEFAULT_CHUNK_SIZE`).
40+
"""
41+
42+
def __init__(self, upload_url: str, chunk_size: int = _common.DEFAULT_CHUNK_SIZE):
43+
self._initial_url = upload_url
44+
self._chunk_size = chunk_size
45+
self._resumable_url: Optional[str] = None
46+
self._chunk_granularity: Optional[int] = None
47+
self._bytes_uploaded = 0
48+
self._total_bytes: Optional[int] = None
49+
self._finished = False
50+
self._invalid = False
51+
52+
@property
53+
def resumable_url(self) -> Optional[str]:
54+
"""Optional[str]: The session URL returned by the initiate request."""
55+
return self._resumable_url
56+
57+
@property
58+
def bytes_uploaded(self) -> int:
59+
"""int: The number of bytes successfully uploaded so far."""
60+
return self._bytes_uploaded
61+
62+
@property
63+
def finished(self) -> bool:
64+
"""bool: Indicates if the upload has completed successfully."""
65+
return self._finished
66+
67+
@property
68+
def invalid(self) -> bool:
69+
"""bool: Indicates if the state machine encountered an unrecoverable state."""
70+
return self._invalid
71+
72+
@property
73+
def chunk_size(self) -> int:
74+
"""int: The block-aligned chunk size informed by server granularity."""
75+
actual_chunk_size = self._chunk_size
76+
if self._chunk_granularity:
77+
if actual_chunk_size % self._chunk_granularity != 0:
78+
actual_chunk_size = (
79+
(actual_chunk_size // self._chunk_granularity) + 1
80+
) * self._chunk_granularity
81+
return actual_chunk_size
82+
83+
def initiate_request(
84+
self,
85+
stream_metadata: Optional[Dict[str, str]] = None,
86+
content_type: Optional[str] = None,
87+
size: Optional[int] = None,
88+
) -> Tuple[str, str, Dict[str, str], bytes]:
89+
"""Constructs an upload initiation request.
90+
91+
Args:
92+
stream_metadata (Optional[Dict[str, str]]): Additional headers for
93+
the upload initiation request. These headers are ONLY applied to
94+
the initial request and will NOT be included in subsequent chunk
95+
upload requests. If not specified, no additional headers will be appended.
96+
content_type (Optional[str]): MIME type of the uploaded content.
97+
If not specified, the `X-Goog-Upload-Header-Content-Type` header
98+
will be omitted.
99+
size (Optional[int]): Total size of the payload in bytes if known.
100+
If not specified, the `X-Goog-Upload-Header-Content-Length` header
101+
will be omitted, indicating an unknown overall payload size.
102+
103+
Returns:
104+
Tuple[str, str, Dict[str, str], bytes]: The method, url, headers, and body.
105+
"""
106+
self._total_bytes = size
107+
headers = {}
108+
109+
# Merge user metadata first
110+
if stream_metadata:
111+
for k, v in stream_metadata.items():
112+
headers[k] = str(v)
113+
114+
# Critical protocol headers overwrite user metadata
115+
headers[_common.UPLOAD_PROTOCOL_HEADER] = _common.UPLOAD_PROTOCOL_VALUE
116+
headers[_common.UPLOAD_COMMAND_HEADER] = _common.UploadCommand.START
117+
118+
if content_type is not None:
119+
headers[_common.UPLOAD_CONTENT_TYPE_HEADER] = content_type
120+
if size is not None:
121+
headers[_common.UPLOAD_CONTENT_LENGTH_HEADER] = str(size)
122+
123+
return "POST", self._initial_url, headers, b""
124+
125+
def process_initiate_response(
126+
self, status_code: int, headers: Dict[str, str]
127+
) -> None:
128+
"""Processes the initiation response from the server.
129+
130+
Args:
131+
status_code (int): HTTP status code of the response.
132+
headers (Dict[str, str]): HTTP headers of the response.
133+
"""
134+
if status_code not in (200, 201):
135+
self._invalid = True
136+
return
137+
138+
headers_lower = {k.lower(): v for k, v in headers.items()}
139+
resumable_url_target = _common.UPLOAD_URL_HEADER.lower()
140+
141+
self._resumable_url = headers_lower.get(resumable_url_target)
142+
if not self._resumable_url:
143+
self._invalid = True
144+
raise ValueError(f"Server did not return {_common.UPLOAD_URL_HEADER}")
145+
146+
granularity = headers_lower.get(_common.UPLOAD_CHUNK_GRANULARITY_HEADER.lower())
147+
if granularity:
148+
self._chunk_granularity = int(granularity)
149+
150+
def build_chunk_request(
151+
self, data: bytes, final: bool = False
152+
) -> Tuple[str, str, Dict[str, str], bytes]:
153+
"""Constructs a request to upload a chunk of data.
154+
155+
Args:
156+
data (bytes): The chunk of data to upload.
157+
final (bool): Whether this is the final chunk. The I/O layer should
158+
set this to True when the stream is exhausted.
159+
160+
Returns:
161+
Tuple[str, str, Dict[str, str], bytes]: The method, url, headers, and body.
162+
"""
163+
if not self._resumable_url:
164+
raise ValueError("Upload not initiated.")
165+
166+
data_len = len(data)
167+
is_last_chunk = final
168+
169+
if self._total_bytes is not None:
170+
if self._bytes_uploaded + data_len >= self._total_bytes:
171+
is_last_chunk = True
172+
elif data_len < self.chunk_size:
173+
is_last_chunk = True
174+
175+
command = _common.UploadCommand.UPLOAD
176+
if is_last_chunk:
177+
command = (
178+
f"{_common.UploadCommand.UPLOAD}, {_common.UploadCommand.FINALIZE}"
179+
)
180+
181+
headers = {
182+
_common.UPLOAD_COMMAND_HEADER: command,
183+
_common.UPLOAD_OFFSET_HEADER: str(self._bytes_uploaded),
184+
}
185+
186+
return "POST", self._resumable_url, headers, data
187+
188+
def process_chunk_response(
189+
self, status_code: int, headers: Dict[str, str], expected_chunk_bytes: int
190+
) -> None:
191+
"""Processes the upload chunk response from the server.
192+
193+
Args:
194+
status_code (int): HTTP status code of the response.
195+
headers (Dict[str, str]): HTTP headers of the response.
196+
expected_chunk_bytes (int): The number of bytes that were uploaded.
197+
"""
198+
if status_code not in (200, 201):
199+
# We do not invalidate the upload here. A non-2xx response (like 503)
200+
# could be transient. The I/O layer can either retry the chunk
201+
# directly or execute a recovery query to sync the state.
202+
return
203+
204+
headers_lower = {k.lower(): v for k, v in headers.items()}
205+
status = headers_lower.get(_common.UPLOAD_STATUS_HEADER.lower())
206+
207+
if status == _common.UploadStatus.ACTIVE:
208+
self._bytes_uploaded += expected_chunk_bytes
209+
elif status == _common.UploadStatus.FINAL:
210+
self._finished = True
211+
self._bytes_uploaded += expected_chunk_bytes
212+
elif status == _common.UploadStatus.CANCELLED:
213+
self._invalid = True
214+
215+
def build_recovery_request(self) -> Tuple[str, str, Dict[str, str], bytes]:
216+
"""Constructs a request to query the server's current upload state.
217+
218+
Returns:
219+
Tuple[str, str, Dict[str, str], bytes]: The method, url, headers, and body.
220+
"""
221+
if not self._resumable_url:
222+
raise ValueError("Upload not initiated.")
223+
224+
headers = {_common.UPLOAD_COMMAND_HEADER: _common.UploadCommand.QUERY}
225+
return "POST", self._resumable_url, headers, b""
226+
227+
def process_recovery_response(
228+
self, status_code: int, headers: Dict[str, str]
229+
) -> int:
230+
"""Processes the recovery query response from the server.
231+
232+
Args:
233+
status_code (int): HTTP status code of the response.
234+
headers (Dict[str, str]): HTTP headers of the response.
235+
236+
Returns:
237+
int: The confirmed number of bytes uploaded so far.
238+
"""
239+
if status_code not in (200, 201):
240+
self._invalid = True
241+
raise ValueError(f"Recovery failed with status {status_code}")
242+
243+
headers_lower = {k.lower(): v for k, v in headers.items()}
244+
status = headers_lower.get(_common.UPLOAD_STATUS_HEADER.lower())
245+
246+
if status == _common.UploadStatus.ACTIVE:
247+
received = int(
248+
headers_lower.get(_common.UPLOAD_SIZE_RECEIVED_HEADER.lower(), "0")
249+
)
250+
self._bytes_uploaded = received
251+
elif status == _common.UploadStatus.FINAL:
252+
self._finished = True
253+
elif status == _common.UploadStatus.CANCELLED:
254+
self._invalid = True
255+
raise RuntimeError("Upload was cancelled by server")
256+
257+
return self._bytes_uploaded

0 commit comments

Comments
 (0)