Skip to content

Commit bee38ca

Browse files
Support parallel chunked uploads and bump version (#63)
Bump package version to 0.16.0 and add support for chunked, parallel file uploads. Introduces a chunked upload flow (initiate -> parallel PUT of chunks -> complete) with server-side size checks and an adjustable max_workers parameter (default 4). HTTPTransport gained post_empty() for no-body POSTs and put_binary() to upload raw binary via PUT. Client upload helpers (upload_obs_duckdb, upload_vars_h5) and CyteType entry points now accept/upload_max_workers and pass it through. Parallel uploads use ThreadPoolExecutor and per-thread HTTPTransport (threading.local) to keep memory bounded (~max_workers × chunk_size) and maintain connection safety. The final complete response is parsed into the existing UploadResponse shape.
1 parent fdf94c2 commit bee38ca

4 files changed

Lines changed: 101 additions & 13 deletions

File tree

cytetype/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
__version__ = "0.15.0"
1+
__version__ = "0.16.0"
22

33
import requests
44

cytetype/api/client.py

Lines changed: 66 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1+
import math
12
import time
3+
import threading
24
from pathlib import Path
35
from typing import Any
6+
from concurrent.futures import ThreadPoolExecutor
47

58
from .transport import HTTPTransport
69
from .progress import ProgressDisplay
@@ -21,6 +24,7 @@ def _upload_file(
2124
file_kind: UploadFileKind,
2225
file_path: str,
2326
timeout: float | tuple[float, float] = (30.0, 3600.0),
27+
max_workers: int = 4,
2428
) -> UploadResponse:
2529
path_obj = Path(file_path)
2630
if not path_obj.is_file():
@@ -34,33 +38,86 @@ def _upload_file(
3438
)
3539

3640
transport = HTTPTransport(base_url, auth_token)
37-
with path_obj.open("rb") as f:
38-
_, response = transport.post_binary(
39-
f"upload/{file_kind}",
40-
data=f,
41+
42+
# Step 1 – Initiate chunked upload
43+
_, init_data = transport.post_empty(f"upload/{file_kind}/initiate", timeout=timeout)
44+
upload_id: str = init_data["upload_id"]
45+
chunk_size: int = init_data["chunk_size_bytes"]
46+
47+
server_max = init_data.get("max_size_bytes")
48+
if server_max is not None and size_bytes > server_max:
49+
raise ValueError(
50+
f"{file_kind} exceeds server upload limit: "
51+
f"{size_bytes} bytes > {server_max} bytes"
52+
)
53+
54+
n_chunks = math.ceil(size_bytes / chunk_size) if size_bytes > 0 else 0
55+
56+
# Step 2 – Upload chunks in parallel.
57+
# Each worker thread gets its own HTTPTransport (and thus its own
58+
# requests.Session / connection pool) for thread safety.
59+
# Memory is bounded to ~max_workers × chunk_size because each thread
60+
# reads its chunk on demand via seek+read.
61+
_tls = threading.local()
62+
63+
def _upload_chunk(chunk_idx: int) -> None:
64+
if not hasattr(_tls, "transport"):
65+
_tls.transport = HTTPTransport(base_url, auth_token)
66+
offset = chunk_idx * chunk_size
67+
read_size = min(chunk_size, size_bytes - offset)
68+
with path_obj.open("rb") as f:
69+
f.seek(offset)
70+
chunk_data = f.read(read_size)
71+
_tls.transport.put_binary(
72+
f"upload/{upload_id}/chunk/{chunk_idx}",
73+
data=chunk_data,
4174
timeout=timeout,
4275
)
43-
return UploadResponse(**response)
76+
77+
if n_chunks > 0:
78+
effective_workers = min(max_workers, n_chunks)
79+
with ThreadPoolExecutor(max_workers=effective_workers) as pool:
80+
list(pool.map(_upload_chunk, range(n_chunks)))
81+
82+
# Step 3 – Complete upload (returns same UploadResponse shape as before)
83+
_, complete_data = transport.post_empty(
84+
f"upload/{upload_id}/complete", timeout=timeout
85+
)
86+
return UploadResponse(**complete_data)
4487

4588

4689
def upload_obs_duckdb(
4790
base_url: str,
4891
auth_token: str | None,
4992
file_path: str,
5093
timeout: float | tuple[float, float] = (30.0, 3600.0),
94+
max_workers: int = 4,
5195
) -> UploadResponse:
52-
"""Upload obs duckdb file and return upload metadata."""
53-
return _upload_file(base_url, auth_token, "obs_duckdb", file_path, timeout=timeout)
96+
return _upload_file(
97+
base_url,
98+
auth_token,
99+
"obs_duckdb",
100+
file_path,
101+
timeout=timeout,
102+
max_workers=max_workers,
103+
)
54104

55105

56106
def upload_vars_h5(
57107
base_url: str,
58108
auth_token: str | None,
59109
file_path: str,
60110
timeout: float | tuple[float, float] = (30.0, 3600.0),
111+
max_workers: int = 4,
61112
) -> UploadResponse:
62-
"""Upload vars h5 file and return upload metadata."""
63-
return _upload_file(base_url, auth_token, "vars_h5", file_path, timeout=timeout)
113+
return _upload_file(
114+
base_url,
115+
auth_token,
116+
"vars_h5",
117+
file_path,
118+
timeout=timeout,
119+
max_workers=max_workers,
120+
)
64121

65122

66123
def submit_annotation_job(

cytetype/api/transport.py

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,17 +74,41 @@ def post(
7474
self._handle_request_error(e)
7575
raise # For type checker
7676

77-
def post_binary(
77+
def post_empty(
78+
self,
79+
endpoint: str,
80+
timeout: float | tuple[float, float] = 30.0,
81+
) -> tuple[int, dict[str, Any]]:
82+
"""Make POST request with no body."""
83+
url = f"{self.base_url}/{endpoint.lstrip('/')}"
84+
85+
try:
86+
response = self.session.post(
87+
url,
88+
headers=self._build_headers(),
89+
timeout=timeout,
90+
)
91+
92+
if not response.ok:
93+
self._parse_error(response)
94+
95+
return response.status_code, response.json()
96+
97+
except requests.RequestException as e:
98+
self._handle_request_error(e)
99+
raise # For type checker
100+
101+
def put_binary(
78102
self,
79103
endpoint: str,
80104
data: bytes | BinaryIO,
81105
timeout: float | tuple[float, float] = (30.0, 3600.0),
82106
) -> tuple[int, dict[str, Any]]:
83-
"""Make POST request with raw binary body (application/octet-stream)."""
107+
"""Make PUT request with raw binary body (application/octet-stream)."""
84108
url = f"{self.base_url}/{endpoint.lstrip('/')}"
85109

86110
try:
87-
response = self.session.post(
111+
response = self.session.put(
88112
url,
89113
data=data,
90114
headers=self._build_headers(content_type="application/octet-stream"),

cytetype/main.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ def _build_and_upload_artifacts(
198198
vars_h5_path: str,
199199
obs_duckdb_path: str,
200200
upload_timeout_seconds: int,
201+
upload_max_workers: int = 4,
201202
) -> dict[str, str]:
202203
"""Build local artifacts and upload them before annotate."""
203204
logger.info("Saving vars.h5 artifact from normalized counts...")
@@ -220,6 +221,7 @@ def _build_and_upload_artifacts(
220221
self.auth_token,
221222
obs_duckdb_path,
222223
timeout=(30.0, float(upload_timeout_seconds)),
224+
max_workers=upload_max_workers,
223225
)
224226
if obs_upload.file_kind != "obs_duckdb":
225227
raise ValueError(
@@ -232,6 +234,7 @@ def _build_and_upload_artifacts(
232234
self.auth_token,
233235
vars_h5_path,
234236
timeout=(30.0, float(upload_timeout_seconds)),
237+
max_workers=upload_max_workers,
235238
)
236239
if vars_upload.file_kind != "vars_h5":
237240
raise ValueError(
@@ -267,6 +270,7 @@ def run(
267270
vars_h5_path: str = "vars.h5",
268271
obs_duckdb_path: str = "obs.duckdb",
269272
upload_timeout_seconds: int = 3600,
273+
upload_max_workers: int = 4,
270274
cleanup_artifacts: bool = False,
271275
require_artifacts: bool = True,
272276
show_progress: bool = True,
@@ -309,6 +313,8 @@ def run(
309313
Defaults to "obs.duckdb".
310314
upload_timeout_seconds (int, optional): Socket read timeout used for each artifact upload.
311315
Defaults to 3600.
316+
upload_max_workers (int, optional): Number of parallel threads used to upload file
317+
chunks. Each worker holds one chunk in memory (~100 MB). Defaults to 4.
312318
cleanup_artifacts (bool, optional): Whether to delete generated artifact files after run
313319
completes or fails. Defaults to False.
314320
require_artifacts (bool, optional): Whether to raise an error if artifact building or
@@ -371,6 +377,7 @@ def run(
371377
vars_h5_path=vars_h5_path,
372378
obs_duckdb_path=obs_duckdb_path,
373379
upload_timeout_seconds=upload_timeout_seconds,
380+
upload_max_workers=upload_max_workers,
374381
)
375382
payload["uploaded_files"] = uploaded_file_refs
376383
except Exception as exc:

0 commit comments

Comments
 (0)