Skip to content

Commit 367bd24

Browse files
committed
Unify multipart upload logic between zarrs and blobs
1 parent 0e505bd commit 367bd24

3 files changed

Lines changed: 158 additions & 246 deletions

File tree

dandi/files/bases.py

Lines changed: 125 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from abc import ABC, abstractmethod
44
from collections import deque
5-
from collections.abc import Iterator
5+
from collections.abc import Generator, Iterator
66
from concurrent.futures import ThreadPoolExecutor, as_completed
77
from dataclasses import dataclass
88
from datetime import datetime
@@ -341,118 +341,26 @@ def iter_upload(
341341
``"done"`` and an ``"asset"`` key containing the resulting
342342
`RemoteAsset`.
343343
"""
344-
# Avoid heavy import by importing within function:
345-
from dandi.support.digests import get_dandietag
346-
347344
asset_path = metadata.setdefault("path", self.path)
348345
client = dandiset.client
349-
yield {"status": "calculating etag"}
350-
etagger = get_dandietag(self.filepath)
351-
filetag = etagger.as_str()
352-
lgr.debug("Calculated dandi-etag of %s for %s", filetag, self.filepath)
353-
digest = metadata.get("digest", {})
354-
if "dandi:dandi-etag" in digest:
355-
if digest["dandi:dandi-etag"] != filetag:
356-
raise RuntimeError(
357-
f"{self.filepath}: File etag changed; was originally"
358-
f" {digest['dandi:dandi-etag']} but is now {filetag}"
359-
)
360-
yield {"status": "initiating upload"}
361-
lgr.debug("%s: Beginning upload", asset_path)
362-
total_size = pre_upload_size_check(self.filepath)
346+
expected_etag = metadata.get("digest", {}).get("dandi:dandi-etag")
363347
try:
364-
resp = client.post(
365-
"/uploads/initialize/",
366-
json={
367-
"contentSize": total_size,
368-
"digest": {
369-
"algorithm": "dandi:dandi-etag",
370-
"value": filetag,
371-
},
372-
"dandiset": dandiset.identifier,
373-
},
348+
validate_resp = yield from _multipart_upload(
349+
client=client,
350+
filepath=self.filepath,
351+
asset_path=asset_path,
352+
upload_prefix="/uploads",
353+
extra_init_fields={"dandiset": dandiset.identifier},
354+
expected_etag=expected_etag,
355+
jobs=jobs,
374356
)
357+
blob_id = validate_resp["blob_id"]
375358
except requests.HTTPError as e:
376359
if e.response is not None and e.response.status_code == 409:
377360
lgr.debug("%s: Blob already exists on server", asset_path)
378361
blob_id = e.response.headers["Location"]
379362
else:
380363
raise
381-
else:
382-
try:
383-
upload_id = resp["upload_id"]
384-
parts = resp["parts"]
385-
if len(parts) != etagger.part_qty:
386-
raise RuntimeError(
387-
f"Server and client disagree on number of parts for upload;"
388-
f" server says {len(parts)}, client says {etagger.part_qty}"
389-
)
390-
parts_out = []
391-
bytes_uploaded = 0
392-
lgr.debug("Uploading %s in %d parts", self.filepath, len(parts))
393-
with RESTFullAPIClient("http://nil.nil") as storage:
394-
with self.filepath.open("rb") as fp:
395-
with ThreadPoolExecutor(max_workers=jobs or 5) as executor:
396-
lock = Lock()
397-
futures = [
398-
executor.submit(
399-
_upload_blob_part,
400-
storage_session=storage,
401-
fp=fp,
402-
lock=lock,
403-
etagger=etagger,
404-
asset_path=asset_path,
405-
part=part,
406-
)
407-
for part in parts
408-
]
409-
for fut in as_completed(futures):
410-
out_part = fut.result()
411-
bytes_uploaded += out_part["size"]
412-
yield {
413-
"status": "uploading",
414-
"progress": 100 * bytes_uploaded / total_size,
415-
"current": bytes_uploaded,
416-
}
417-
parts_out.append(out_part)
418-
lgr.debug("%s: Completing upload", asset_path)
419-
resp = client.post(
420-
f"/uploads/{upload_id}/complete/",
421-
json={"parts": parts_out},
422-
)
423-
lgr.debug(
424-
"%s: Announcing completion to %s",
425-
asset_path,
426-
resp["complete_url"],
427-
)
428-
r = storage.post(
429-
resp["complete_url"], data=resp["body"], json_resp=False
430-
)
431-
lgr.debug(
432-
"%s: Upload completed. Response content: %s",
433-
asset_path,
434-
r.content,
435-
)
436-
rxml = fromstring(r.text)
437-
m = re.match(r"\{.+?\}", rxml.tag)
438-
ns = m.group(0) if m else ""
439-
final_etag = rxml.findtext(f"{ns}ETag")
440-
if final_etag is not None:
441-
final_etag = final_etag.strip('"')
442-
if final_etag != filetag:
443-
raise RuntimeError(
444-
"Server and client disagree on final ETag of"
445-
f" uploaded file; server says {final_etag},"
446-
f" client says {filetag}"
447-
)
448-
# else: Error? Warning?
449-
resp = client.post(f"/uploads/{upload_id}/validate/")
450-
blob_id = resp["blob_id"]
451-
except Exception:
452-
post_upload_size_check(self.filepath, total_size, True)
453-
raise
454-
else:
455-
post_upload_size_check(self.filepath, total_size, False)
456364
lgr.debug("%s: Assigning asset blob to dandiset & version", asset_path)
457365
yield {"status": "producing asset"}
458366
if replacing is not None:
@@ -696,6 +604,120 @@ def _upload_blob_part(
696604
}
697605

698606

607+
def _multipart_upload(
608+
client: RESTFullAPIClient,
609+
filepath: Path,
610+
asset_path: str,
611+
upload_prefix: str,
612+
extra_init_fields: dict,
613+
expected_etag: str | None = None,
614+
jobs: int | None = None,
615+
) -> Generator[dict, None, dict]:
616+
"""Perform a full multipart upload: etag calculation, initialization, part upload, and validation.
617+
618+
Yields progress dicts and returns the validate response dict. If
619+
``expected_etag`` is provided and does not match the computed etag, raises
620+
``RuntimeError``. A 409 HTTPError from the initialize call propagates
621+
unchanged so the caller can handle the "blob already exists" case.
622+
"""
623+
# Avoid heavy import by importing within function:
624+
from dandi.support.digests import get_dandietag
625+
626+
yield {"status": "calculating etag"}
627+
etagger = get_dandietag(filepath)
628+
filetag = etagger.as_str()
629+
lgr.debug("Calculated dandi-etag of %s for %s", filetag, filepath)
630+
if expected_etag is not None and filetag != expected_etag:
631+
raise RuntimeError(
632+
f"{filepath}: File etag changed; was originally"
633+
f" {expected_etag} but is now {filetag}"
634+
)
635+
yield {"status": "initiating upload"}
636+
lgr.debug("%s: Beginning upload", asset_path)
637+
total_size = pre_upload_size_check(filepath)
638+
resp = client.post(
639+
f"{upload_prefix}/initialize/",
640+
json={
641+
"contentSize": total_size,
642+
"digest": {"algorithm": "dandi:dandi-etag", "value": filetag},
643+
**extra_init_fields,
644+
},
645+
)
646+
try:
647+
upload_id = resp["upload_id"]
648+
parts = resp["parts"]
649+
if len(parts) != etagger.part_qty:
650+
raise RuntimeError(
651+
f"Server and client disagree on number of parts for upload;"
652+
f" server says {len(parts)}, client says {etagger.part_qty}"
653+
)
654+
parts_out = []
655+
bytes_uploaded = 0
656+
lgr.debug("Uploading %s in %d parts", filepath, len(parts))
657+
with RESTFullAPIClient("http://nil.nil") as storage:
658+
with filepath.open("rb") as fp:
659+
with ThreadPoolExecutor(max_workers=jobs or 5) as executor:
660+
lock = Lock()
661+
futures = [
662+
executor.submit(
663+
_upload_blob_part,
664+
storage_session=storage,
665+
fp=fp,
666+
lock=lock,
667+
etagger=etagger,
668+
asset_path=asset_path,
669+
part=part,
670+
)
671+
for part in parts
672+
]
673+
for fut in as_completed(futures):
674+
out_part = fut.result()
675+
bytes_uploaded += out_part["size"]
676+
yield {
677+
"status": "uploading",
678+
"progress": 100 * bytes_uploaded / total_size,
679+
"current": bytes_uploaded,
680+
}
681+
parts_out.append(out_part)
682+
lgr.debug("%s: Completing upload", asset_path)
683+
resp = client.post(
684+
f"{upload_prefix}/{upload_id}/complete/",
685+
json={"parts": parts_out},
686+
)
687+
lgr.debug(
688+
"%s: Announcing completion to %s",
689+
asset_path,
690+
resp["complete_url"],
691+
)
692+
r = storage.post(resp["complete_url"], data=resp["body"], json_resp=False)
693+
lgr.debug(
694+
"%s: Upload completed. Response content: %s",
695+
asset_path,
696+
r.content,
697+
)
698+
rxml = fromstring(r.text)
699+
m = re.match(r"\{.+?\}", rxml.tag)
700+
ns = m.group(0) if m else ""
701+
final_etag = rxml.findtext(f"{ns}ETag")
702+
if final_etag is not None:
703+
final_etag = final_etag.strip('"')
704+
if final_etag != filetag:
705+
raise RuntimeError(
706+
"Server and client disagree on final ETag of"
707+
f" uploaded file; server says {final_etag},"
708+
f" client says {filetag}"
709+
)
710+
# else: Error? Warning?
711+
validate_resp = client.post(f"{upload_prefix}/{upload_id}/validate/")
712+
except Exception:
713+
post_upload_size_check(filepath, total_size, True)
714+
raise
715+
else:
716+
post_upload_size_check(filepath, total_size, False)
717+
718+
return validate_resp
719+
720+
699721
def _check_required_fields(
700722
d: dict, required: list[str], file_path: str
701723
) -> list[ValidationResult]:

0 commit comments

Comments
 (0)