Skip to content

Commit bd0a977

Browse files
Update version to 0.19.0 and enhance file upload functionality (#69)
* Update version to 0.19.0 and enhance file upload functionality - Bump package version to 0.19.0. - Increase maximum workers for file uploads from 4 to 6. - Introduce support for uploading chunks to presigned URLs, including progress reporting and error handling. - Refactor upload logic to accommodate both presigned URL and server uploads, improving overall upload reliability. * Implement validation for presigned URL count during file uploads - Added a check to ensure the number of presigned URLs matches the expected chunk count, raising a ValueError if there is a discrepancy. This enhancement improves error handling and ensures consistency in the file upload process. * Enhance ETag handling in HTTPTransport - Updated the response handling in the HTTPTransport class to raise a NetworkError if the ETag header is missing after a successful presigned URL PUT request. This change improves error reporting and ensures that clients are informed of potential issues with the upload response. * Enhance error handling for presigned URL uploads in HTTPTransport - Added APIError exception handling for client-side errors (HTTP 400-499) during presigned URL uploads. This improvement provides clearer feedback when uploads are rejected, enhancing the robustness of the upload process.
1 parent 1d45fe3 commit bd0a977

3 files changed

Lines changed: 140 additions & 25 deletions

File tree

cytetype/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
__version__ = "0.18.1"
1+
__version__ = "0.19.0"
22

33
import requests
44

cytetype/api/client.py

Lines changed: 84 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def _upload_file(
4040
file_kind: UploadFileKind,
4141
file_path: str,
4242
timeout: float | tuple[float, float] = (60.0, 3600.0),
43-
max_workers: int = 4,
43+
max_workers: int = 6,
4444
) -> UploadResponse:
4545
path_obj = Path(file_path)
4646
if not path_obj.is_file():
@@ -69,6 +69,16 @@ def _upload_file(
6969

7070
n_chunks = math.ceil(size_bytes / chunk_size) if size_bytes > 0 else 0
7171

72+
presigned_urls: list[str] | None = init_data.get("presigned_urls")
73+
r2_upload_id: str | None = init_data.get("r2_upload_id")
74+
use_r2 = presigned_urls is not None and r2_upload_id is not None
75+
76+
if use_r2 and len(presigned_urls) != n_chunks: # type: ignore[arg-type]
77+
raise ValueError(
78+
f"Server returned {len(presigned_urls)} presigned URLs " # type: ignore[arg-type]
79+
f"but expected {n_chunks} (one per chunk)."
80+
)
81+
7282
# Step 2 – Upload chunks in parallel.
7383
# Each worker thread gets its own HTTPTransport (and thus its own
7484
# requests.Session / connection pool) for thread safety.
@@ -83,8 +93,61 @@ def _upload_file(
8393
)
8494
_progress_lock = threading.Lock()
8595
_chunks_done = [0]
96+
_etags: dict[int, str] = {}
97+
_etags_lock = threading.Lock()
98+
99+
def _update_progress() -> None:
100+
if pbar is not None:
101+
pbar.update(1)
102+
else:
103+
with _progress_lock:
104+
_chunks_done[0] += 1
105+
done = _chunks_done[0]
106+
pct = 100 * done / n_chunks
107+
print(
108+
f"\r Uploading: {done}/{n_chunks} chunks ({pct:.0f}%)",
109+
end="",
110+
flush=True,
111+
)
112+
113+
def _upload_chunk_r2(chunk_idx: int) -> None:
114+
if not hasattr(_tls, "transport"):
115+
_tls.transport = HTTPTransport(base_url, auth_token)
116+
offset = chunk_idx * chunk_size
117+
read_size = min(chunk_size, size_bytes - offset)
118+
with path_obj.open("rb") as f:
119+
f.seek(offset)
120+
chunk_data = f.read(read_size)
121+
122+
url = presigned_urls[chunk_idx] # type: ignore[index]
123+
last_exc: Exception | None = None
124+
for attempt in range(1 + len(_CHUNK_RETRY_DELAYS)):
125+
try:
126+
etag = _tls.transport.put_to_presigned_url(
127+
url, chunk_data, timeout=timeout
128+
)
129+
with _etags_lock:
130+
_etags[chunk_idx] = etag
131+
_update_progress()
132+
return
133+
except (NetworkError, TimeoutError) as exc:
134+
last_exc = exc
135+
except APIError as exc:
136+
if exc.error_code in _RETRYABLE_API_ERROR_CODES:
137+
last_exc = exc
138+
else:
139+
raise
140+
if attempt < len(_CHUNK_RETRY_DELAYS):
141+
delay = _CHUNK_RETRY_DELAYS[attempt]
142+
logger.warning(
143+
f"Chunk {chunk_idx + 1}/{n_chunks} upload failed "
144+
f"(attempt {attempt + 1}/{1 + len(_CHUNK_RETRY_DELAYS)}), "
145+
f"retrying in {delay}s: {last_exc}"
146+
)
147+
time.sleep(delay)
148+
raise last_exc # type: ignore[misc]
86149

87-
def _upload_chunk(chunk_idx: int) -> None:
150+
def _upload_chunk_server(chunk_idx: int) -> None:
88151
if not hasattr(_tls, "transport"):
89152
_tls.transport = HTTPTransport(base_url, auth_token)
90153
offset = chunk_idx * chunk_size
@@ -101,18 +164,7 @@ def _upload_chunk(chunk_idx: int) -> None:
101164
data=chunk_data,
102165
timeout=timeout,
103166
)
104-
if pbar is not None:
105-
pbar.update(1)
106-
else:
107-
with _progress_lock:
108-
_chunks_done[0] += 1
109-
done = _chunks_done[0]
110-
pct = 100 * done / n_chunks
111-
print(
112-
f"\r Uploading: {done}/{n_chunks} chunks ({pct:.0f}%)",
113-
end="",
114-
flush=True,
115-
)
167+
_update_progress()
116168
return
117169
except (NetworkError, TimeoutError) as exc:
118170
last_exc = exc
@@ -121,7 +173,6 @@ def _upload_chunk(chunk_idx: int) -> None:
121173
last_exc = exc
122174
else:
123175
raise
124-
125176
if attempt < len(_CHUNK_RETRY_DELAYS):
126177
delay = _CHUNK_RETRY_DELAYS[attempt]
127178
logger.warning(
@@ -130,14 +181,15 @@ def _upload_chunk(chunk_idx: int) -> None:
130181
f"retrying in {delay}s: {last_exc}"
131182
)
132183
time.sleep(delay)
133-
134184
raise last_exc # type: ignore[misc]
135185

186+
upload_fn = _upload_chunk_r2 if use_r2 else _upload_chunk_server
187+
136188
if n_chunks > 0:
137189
effective_workers = min(max_workers, n_chunks)
138190
try:
139191
with ThreadPoolExecutor(max_workers=effective_workers) as pool:
140-
list(pool.map(_upload_chunk, range(n_chunks)))
192+
list(pool.map(upload_fn, range(n_chunks)))
141193
if pbar is not None:
142194
pbar.close()
143195
else:
@@ -151,10 +203,19 @@ def _upload_chunk(chunk_idx: int) -> None:
151203
print()
152204
raise
153205

154-
# Step 3 – Complete upload (returns same UploadResponse shape as before)
155-
_, complete_data = transport.post_empty(
156-
f"upload/{upload_id}/complete", timeout=timeout
157-
)
206+
# Step 3 – Complete upload
207+
if use_r2:
208+
parts = [{"ETag": _etags[i], "PartNumber": i + 1} for i in range(n_chunks)]
209+
_, complete_data = transport.post_json(
210+
f"upload/{upload_id}/complete",
211+
data={"parts": parts},
212+
timeout=timeout,
213+
)
214+
else:
215+
_, complete_data = transport.post_empty(
216+
f"upload/{upload_id}/complete", timeout=timeout
217+
)
218+
158219
return UploadResponse(**complete_data)
159220

160221

@@ -163,7 +224,7 @@ def upload_obs_duckdb(
163224
auth_token: str | None,
164225
file_path: str,
165226
timeout: float | tuple[float, float] = (60.0, 3600.0),
166-
max_workers: int = 4,
227+
max_workers: int = 6,
167228
) -> UploadResponse:
168229
return _upload_file(
169230
base_url,
@@ -180,7 +241,7 @@ def upload_vars_h5(
180241
auth_token: str | None,
181242
file_path: str,
182243
timeout: float | tuple[float, float] = (60.0, 3600.0),
183-
max_workers: int = 4,
244+
max_workers: int = 6,
184245
) -> UploadResponse:
185246
return _upload_file(
186247
base_url,

cytetype/api/transport.py

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import requests
22
from typing import Any, BinaryIO
33

4-
from .exceptions import create_api_exception, NetworkError, TimeoutError
4+
from .exceptions import create_api_exception, APIError, NetworkError, TimeoutError
55
from .schemas import ErrorResponse
66

77

@@ -124,6 +124,60 @@ def put_binary(
124124
self._handle_request_error(e)
125125
raise # For type checker
126126

127+
def put_to_presigned_url(
128+
self,
129+
url: str,
130+
data: bytes,
131+
timeout: float | tuple[float, float] = (30.0, 3600.0),
132+
) -> str:
133+
"""PUT raw bytes to a presigned URL. Returns the ETag header."""
134+
try:
135+
response = self.session.put(
136+
url,
137+
data=data,
138+
headers={"Content-Type": "application/octet-stream"},
139+
timeout=timeout,
140+
)
141+
if 400 <= response.status_code < 500:
142+
raise APIError(
143+
f"Presigned URL upload rejected (HTTP {response.status_code}): "
144+
f"{response.text[:200]}",
145+
error_code="PRESIGNED_URL_REJECTED",
146+
)
147+
response.raise_for_status()
148+
etag = response.headers.get("ETag")
149+
if not etag:
150+
raise NetworkError(
151+
"Presigned URL PUT succeeded but response is missing the ETag header",
152+
error_code="MISSING_ETAG",
153+
)
154+
return etag
155+
except requests.RequestException as e:
156+
self._handle_request_error(e)
157+
raise
158+
159+
def post_json(
160+
self,
161+
endpoint: str,
162+
data: dict[str, Any],
163+
timeout: float | tuple[float, float] = 30.0,
164+
) -> tuple[int, dict[str, Any]]:
165+
"""Make POST request with JSON body."""
166+
url = f"{self.base_url}/{endpoint.lstrip('/')}"
167+
try:
168+
response = self.session.post(
169+
url,
170+
json=data,
171+
headers=self._build_headers(content_type="application/json"),
172+
timeout=timeout,
173+
)
174+
if not response.ok:
175+
self._parse_error(response)
176+
return response.status_code, response.json()
177+
except requests.RequestException as e:
178+
self._handle_request_error(e)
179+
raise
180+
127181
def get(self, endpoint: str, timeout: int = 30) -> tuple[int, dict[str, Any]]:
128182
"""Make GET request and return (status_code, data)."""
129183
url = f"{self.base_url}/{endpoint.lstrip('/')}"

0 commit comments

Comments
 (0)