Skip to content

Commit fab6711

Browse files
Retry chunk uploads and allow partial artifacts (#64)
Add robust retry and progress reporting for chunk uploads: import NetworkError, define retryable API codes and retry delays, retry on NetworkError/TimeoutError/APIError (certain codes), and print progress and final success/failure indicators. Update wait_for_completion to use logger.success for completed jobs. Improve log formatting (icons and colored success) via a custom _log_format and TYPE_CHECKING annotations. Change result logging to a clearer info block mapping annotation outputs to adata locations. Refactor artifact build/upload to return (uploaded, errors), handle vars.h5 and obs.duckdb as independent uploads with per-artifact try/except and warnings, and surface errors when require_artifacts=True. Adjust integration test to expect partial uploads behavior.
1 parent bee38ca commit fab6711

6 files changed

Lines changed: 159 additions & 73 deletions

File tree

cytetype/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
__version__ = "0.16.0"
1+
__version__ = "0.16.1"
22

33
import requests
44

cytetype/api/client.py

Lines changed: 55 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
from .transport import HTTPTransport
99
from .progress import ProgressDisplay
10-
from .exceptions import JobFailedError, TimeoutError, APIError
10+
from .exceptions import JobFailedError, TimeoutError, APIError, NetworkError
1111
from .schemas import UploadResponse, UploadFileKind
1212
from ..config import logger
1313

@@ -17,6 +17,9 @@
1717
"vars_h5": 10 * 1024 * 1024 * 1024, # 10GB
1818
}
1919

20+
_CHUNK_RETRY_DELAYS = (1, 5, 20)
21+
_RETRYABLE_API_ERROR_CODES = frozenset({"INTERNAL_ERROR", "HTTP_ERROR"})
22+
2023

2124
def _upload_file(
2225
base_url: str,
@@ -59,6 +62,8 @@ def _upload_file(
5962
# Memory is bounded to ~max_workers × chunk_size because each thread
6063
# reads its chunk on demand via seek+read.
6164
_tls = threading.local()
65+
_progress_lock = threading.Lock()
66+
_chunks_done = [0]
6267

6368
def _upload_chunk(chunk_idx: int) -> None:
6469
if not hasattr(_tls, "transport"):
@@ -68,16 +73,57 @@ def _upload_chunk(chunk_idx: int) -> None:
6873
with path_obj.open("rb") as f:
6974
f.seek(offset)
7075
chunk_data = f.read(read_size)
71-
_tls.transport.put_binary(
72-
f"upload/{upload_id}/chunk/{chunk_idx}",
73-
data=chunk_data,
74-
timeout=timeout,
75-
)
76+
77+
last_exc: Exception | None = None
78+
for attempt in range(1 + len(_CHUNK_RETRY_DELAYS)):
79+
try:
80+
_tls.transport.put_binary(
81+
f"upload/{upload_id}/chunk/{chunk_idx}",
82+
data=chunk_data,
83+
timeout=timeout,
84+
)
85+
with _progress_lock:
86+
_chunks_done[0] += 1
87+
done = _chunks_done[0]
88+
pct = 100 * done / n_chunks
89+
print(
90+
f"\r Uploading: {done}/{n_chunks} chunks ({pct:.0f}%)",
91+
end="",
92+
flush=True,
93+
)
94+
return
95+
except (NetworkError, TimeoutError) as exc:
96+
last_exc = exc
97+
except APIError as exc:
98+
if exc.error_code in _RETRYABLE_API_ERROR_CODES:
99+
last_exc = exc
100+
else:
101+
raise
102+
103+
if attempt < len(_CHUNK_RETRY_DELAYS):
104+
delay = _CHUNK_RETRY_DELAYS[attempt]
105+
logger.warning(
106+
"Chunk %d/%d upload failed (attempt %d/%d), retrying in %ds: %s",
107+
chunk_idx + 1,
108+
n_chunks,
109+
attempt + 1,
110+
1 + len(_CHUNK_RETRY_DELAYS),
111+
delay,
112+
last_exc,
113+
)
114+
time.sleep(delay)
115+
116+
raise last_exc # type: ignore[misc]
76117

77118
if n_chunks > 0:
78119
effective_workers = min(max_workers, n_chunks)
79-
with ThreadPoolExecutor(max_workers=effective_workers) as pool:
80-
list(pool.map(_upload_chunk, range(n_chunks)))
120+
try:
121+
with ThreadPoolExecutor(max_workers=effective_workers) as pool:
122+
list(pool.map(_upload_chunk, range(n_chunks)))
123+
print(f"\r \033[92m✓\033[0m Uploaded {n_chunks}/{n_chunks} chunks (100%)")
124+
except BaseException:
125+
print() # ensure newline on failure
126+
raise
81127

82128
# Step 3 – Complete upload (returns same UploadResponse shape as before)
83129
_, complete_data = transport.post_empty(
@@ -293,7 +339,7 @@ def wait_for_completion(
293339
if job_status == "completed":
294340
if progress:
295341
progress.finalize(cluster_status)
296-
logger.info(f"Job {job_id} completed successfully.")
342+
logger.success(f"Job {job_id} completed successfully.")
297343
return fetch_job_results(base_url, auth_token, job_id)
298344

299345
elif job_status == "failed":

cytetype/config.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,26 @@
1-
from loguru import logger
1+
from __future__ import annotations
2+
23
import sys
4+
from typing import TYPE_CHECKING
5+
6+
from loguru import logger
7+
8+
if TYPE_CHECKING:
9+
from loguru import Record
310

411
logger.remove()
512

13+
14+
def _log_format(record: Record) -> str:
15+
if record["level"].name == "WARNING":
16+
return "⚠️ {message}\n"
17+
if record["level"].name == "SUCCESS":
18+
return "\033[92m✓\033[0m {message}\n"
19+
return "{message}\n"
20+
21+
622
logger.add(
723
sys.stdout,
824
level="INFO",
9-
format="{message}",
25+
format=_log_format,
1026
)

cytetype/core/results.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,12 +92,12 @@ def store_annotations(
9292
_check_unannotated_clusters(result_data, clusters)
9393

9494
# Log success
95-
logger.success(
96-
f"Annotations successfully added to `adata.obs['{results_prefix}_annotation_{group_key}']`\n"
97-
f"Ontology terms added to `adata.obs['{results_prefix}_cellOntologyTerm_{group_key}']`\n"
98-
f"Ontology term IDs added to `adata.obs['{results_prefix}_ontologyTermID_{group_key}']`\n"
99-
f"Cell states added to `adata.obs['{results_prefix}_cellState_{group_key}']`\n"
100-
f"Full results added to `adata.uns['{results_prefix}_results']`."
95+
logger.info(
96+
f" Annotation labels → adata.obs['{results_prefix}_annotation_{group_key}']\n"
97+
f" Cell Ontology terms adata.obs['{results_prefix}_cellOntologyTerm_{group_key}']\n"
98+
f"Cell Ontology term IDs adata.obs['{results_prefix}_ontologyTermID_{group_key}']\n"
99+
f" Cell states adata.obs['{results_prefix}_cellState_{group_key}']\n"
100+
f" Full results adata.uns['{results_prefix}_results']"
101101
)
102102

103103

cytetype/main.py

Lines changed: 73 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -199,52 +199,69 @@ def _build_and_upload_artifacts(
199199
obs_duckdb_path: str,
200200
upload_timeout_seconds: int,
201201
upload_max_workers: int = 4,
202-
) -> dict[str, str]:
203-
"""Build local artifacts and upload them before annotate."""
204-
logger.info("Saving vars.h5 artifact from normalized counts...")
205-
save_features_matrix(
206-
out_file=vars_h5_path,
207-
mat=self.adata.X,
208-
var_df=self.adata.var,
209-
var_names=self.adata.var_names,
210-
)
202+
) -> tuple[dict[str, str], list[tuple[str, Exception]]]:
203+
"""Build and upload each artifact as an independent unit.
211204
212-
logger.info("Saving obs.duckdb artifact from observation metadata...")
213-
save_obs_duckdb_file(
214-
out_file=obs_duckdb_path,
215-
obs_df=self.adata.obs,
216-
)
205+
Returns (uploaded_ids, errors) so the caller can decide whether
206+
partial success is acceptable.
207+
"""
208+
uploaded: dict[str, str] = {}
209+
errors: list[tuple[str, Exception]] = []
210+
timeout = (30.0, float(upload_timeout_seconds))
217211

218-
logger.info("Uploading obs.duckdb artifact...")
219-
obs_upload = upload_obs_duckdb_file(
220-
self.api_url,
221-
self.auth_token,
222-
obs_duckdb_path,
223-
timeout=(30.0, float(upload_timeout_seconds)),
224-
max_workers=upload_max_workers,
225-
)
226-
if obs_upload.file_kind != "obs_duckdb":
227-
raise ValueError(
228-
f"Unexpected upload file_kind for obs artifact: {obs_upload.file_kind}"
212+
# --- vars.h5 (save then upload) ---
213+
try:
214+
logger.info("Saving vars.h5 artifact from normalized counts...")
215+
save_features_matrix(
216+
out_file=vars_h5_path,
217+
mat=self.adata.X,
218+
var_df=self.adata.var,
219+
var_names=self.adata.var_names,
220+
)
221+
logger.info("Uploading vars.h5 artifact...")
222+
vars_upload = upload_vars_h5_file(
223+
self.api_url,
224+
self.auth_token,
225+
vars_h5_path,
226+
timeout=timeout,
227+
max_workers=upload_max_workers,
229228
)
229+
if vars_upload.file_kind != "vars_h5":
230+
raise ValueError(
231+
f"Unexpected upload file_kind for vars artifact: {vars_upload.file_kind}"
232+
)
233+
uploaded["vars_h5"] = vars_upload.upload_id
234+
except Exception as exc:
235+
logger.warning(f"vars.h5 artifact failed: {exc}")
236+
errors.append(("vars_h5", exc))
230237

231-
logger.info("Uploading vars.h5 artifact...")
232-
vars_upload = upload_vars_h5_file(
233-
self.api_url,
234-
self.auth_token,
235-
vars_h5_path,
236-
timeout=(30.0, float(upload_timeout_seconds)),
237-
max_workers=upload_max_workers,
238-
)
239-
if vars_upload.file_kind != "vars_h5":
240-
raise ValueError(
241-
f"Unexpected upload file_kind for vars artifact: {vars_upload.file_kind}"
238+
print()
239+
240+
# --- obs.duckdb (save then upload) ---
241+
try:
242+
logger.info("Saving obs.duckdb artifact from observation metadata...")
243+
save_obs_duckdb_file(
244+
out_file=obs_duckdb_path,
245+
obs_df=self.adata.obs,
246+
)
247+
logger.info("Uploading obs.duckdb artifact...")
248+
obs_upload = upload_obs_duckdb_file(
249+
self.api_url,
250+
self.auth_token,
251+
obs_duckdb_path,
252+
timeout=timeout,
253+
max_workers=upload_max_workers,
242254
)
255+
if obs_upload.file_kind != "obs_duckdb":
256+
raise ValueError(
257+
f"Unexpected upload file_kind for obs artifact: {obs_upload.file_kind}"
258+
)
259+
uploaded["obs_duckdb"] = obs_upload.upload_id
260+
except Exception as exc:
261+
logger.warning(f"obs.duckdb artifact failed: {exc}")
262+
errors.append(("obs_duckdb", exc))
243263

244-
return {
245-
"obs_duckdb": obs_upload.upload_id,
246-
"vars_h5": vars_upload.upload_id,
247-
}
264+
return uploaded, errors
248265

249266
@staticmethod
250267
def _cleanup_artifact_files(paths: list[str]) -> None:
@@ -372,26 +389,29 @@ def run(
372389

373390
artifact_paths = [vars_h5_path, obs_duckdb_path]
374391
try:
375-
try:
376-
uploaded_file_refs = self._build_and_upload_artifacts(
377-
vars_h5_path=vars_h5_path,
378-
obs_duckdb_path=obs_duckdb_path,
379-
upload_timeout_seconds=upload_timeout_seconds,
380-
upload_max_workers=upload_max_workers,
381-
)
392+
uploaded_file_refs, artifact_errors = self._build_and_upload_artifacts(
393+
vars_h5_path=vars_h5_path,
394+
obs_duckdb_path=obs_duckdb_path,
395+
upload_timeout_seconds=upload_timeout_seconds,
396+
upload_max_workers=upload_max_workers,
397+
)
398+
if uploaded_file_refs:
382399
payload["uploaded_files"] = uploaded_file_refs
383-
except Exception as exc:
400+
401+
if artifact_errors:
402+
failed_names = ", ".join(name for name, _ in artifact_errors)
384403
if require_artifacts:
385404
logger.error(
386-
"Artifact build/upload failed. "
405+
f"Artifact build/upload failed for: {failed_names}. "
387406
"Rerun with `require_artifacts=False` to skip this error.\n"
388407
"Please report the error below in a new issue at "
389408
"https://github.com/NygenAnalytics/CyteType\n"
390-
f"({type(exc).__name__}: {exc})"
409+
f"({type(artifact_errors[0][1]).__name__}: {str(artifact_errors[0][1]).strip()})"
391410
)
392-
raise
411+
raise artifact_errors[0][1]
393412
logger.warning(
394-
"Artifact build/upload failed. Continuing without artifacts. "
413+
f"Artifact build/upload failed for: {failed_names}. "
414+
"Continuing without those artifacts. "
395415
"Set `require_artifacts=True` to see the full traceback."
396416
)
397417

@@ -400,6 +420,7 @@ def run(
400420
save_query_to_file(payload["input_data"], query_filename)
401421

402422
# Submit job and store details
423+
print()
403424
job_id = submit_annotation_job(self.api_url, self.auth_token, payload)
404425
store_job_details(self.adata, job_id, self.api_url, results_prefix)
405426

tests/test_cytetype_integration.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,10 @@ def test_cytetype_run_artifact_failure_continues_when_not_required(
181181
mock_api_response: dict[str, Any],
182182
monkeypatch: pytest.MonkeyPatch,
183183
) -> None:
184-
"""Test run() proceeds without uploaded_files when require_artifacts=False."""
184+
"""Test run() proceeds with partial uploads when require_artifacts=False.
185+
186+
vars.h5 save fails but obs.duckdb still succeeds independently.
187+
"""
185188
mock_submit.return_value = "job_no_artifacts"
186189
mock_wait.return_value = mock_api_response
187190

@@ -197,9 +200,9 @@ def test_cytetype_run_artifact_failure_continues_when_not_required(
197200
assert result is not None
198201
assert mock_submit.called
199202

200-
# Payload must not contain uploaded_files
203+
# obs.duckdb should have succeeded independently
201204
payload = mock_submit.call_args.args[2]
202-
assert "uploaded_files" not in payload
205+
assert payload["uploaded_files"] == {"obs_duckdb": "obs_upload_123"}
203206

204207

205208
@patch("cytetype.main.wait_for_completion")

0 commit comments

Comments
 (0)