Skip to content

Commit c1843cd

Browse files
committed
Unify multipart upload logic between zarrs and blobs
1 parent 0e505bd commit c1843cd

2 files changed

Lines changed: 161 additions & 206 deletions

File tree

dandi/files/bases.py

Lines changed: 152 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from abc import ABC, abstractmethod
44
from collections import deque
5-
from collections.abc import Iterator
5+
from collections.abc import Generator, Iterator
66
from concurrent.futures import ThreadPoolExecutor, as_completed
77
from dataclasses import dataclass
88
from datetime import datetime
@@ -341,118 +341,26 @@ def iter_upload(
341341
``"done"`` and an ``"asset"`` key containing the resulting
342342
`RemoteAsset`.
343343
"""
344-
# Avoid heavy import by importing within function:
345-
from dandi.support.digests import get_dandietag
346-
347344
asset_path = metadata.setdefault("path", self.path)
348345
client = dandiset.client
349-
yield {"status": "calculating etag"}
350-
etagger = get_dandietag(self.filepath)
351-
filetag = etagger.as_str()
352-
lgr.debug("Calculated dandi-etag of %s for %s", filetag, self.filepath)
353-
digest = metadata.get("digest", {})
354-
if "dandi:dandi-etag" in digest:
355-
if digest["dandi:dandi-etag"] != filetag:
356-
raise RuntimeError(
357-
f"{self.filepath}: File etag changed; was originally"
358-
f" {digest['dandi:dandi-etag']} but is now {filetag}"
359-
)
360-
yield {"status": "initiating upload"}
361-
lgr.debug("%s: Beginning upload", asset_path)
362-
total_size = pre_upload_size_check(self.filepath)
346+
expected_etag = metadata.get("digest", {}).get("dandi:dandi-etag")
363347
try:
364-
resp = client.post(
365-
"/uploads/initialize/",
366-
json={
367-
"contentSize": total_size,
368-
"digest": {
369-
"algorithm": "dandi:dandi-etag",
370-
"value": filetag,
371-
},
372-
"dandiset": dandiset.identifier,
373-
},
348+
validate_resp = yield from _multipart_upload(
349+
client=client,
350+
filepath=self.filepath,
351+
asset_path=asset_path,
352+
upload_prefix="/uploads",
353+
extra_init_fields={"dandiset": dandiset.identifier},
354+
expected_etag=expected_etag,
355+
jobs=jobs,
374356
)
357+
blob_id = validate_resp["blob_id"]
375358
except requests.HTTPError as e:
376359
if e.response is not None and e.response.status_code == 409:
377360
lgr.debug("%s: Blob already exists on server", asset_path)
378361
blob_id = e.response.headers["Location"]
379362
else:
380363
raise
381-
else:
382-
try:
383-
upload_id = resp["upload_id"]
384-
parts = resp["parts"]
385-
if len(parts) != etagger.part_qty:
386-
raise RuntimeError(
387-
f"Server and client disagree on number of parts for upload;"
388-
f" server says {len(parts)}, client says {etagger.part_qty}"
389-
)
390-
parts_out = []
391-
bytes_uploaded = 0
392-
lgr.debug("Uploading %s in %d parts", self.filepath, len(parts))
393-
with RESTFullAPIClient("http://nil.nil") as storage:
394-
with self.filepath.open("rb") as fp:
395-
with ThreadPoolExecutor(max_workers=jobs or 5) as executor:
396-
lock = Lock()
397-
futures = [
398-
executor.submit(
399-
_upload_blob_part,
400-
storage_session=storage,
401-
fp=fp,
402-
lock=lock,
403-
etagger=etagger,
404-
asset_path=asset_path,
405-
part=part,
406-
)
407-
for part in parts
408-
]
409-
for fut in as_completed(futures):
410-
out_part = fut.result()
411-
bytes_uploaded += out_part["size"]
412-
yield {
413-
"status": "uploading",
414-
"progress": 100 * bytes_uploaded / total_size,
415-
"current": bytes_uploaded,
416-
}
417-
parts_out.append(out_part)
418-
lgr.debug("%s: Completing upload", asset_path)
419-
resp = client.post(
420-
f"/uploads/{upload_id}/complete/",
421-
json={"parts": parts_out},
422-
)
423-
lgr.debug(
424-
"%s: Announcing completion to %s",
425-
asset_path,
426-
resp["complete_url"],
427-
)
428-
r = storage.post(
429-
resp["complete_url"], data=resp["body"], json_resp=False
430-
)
431-
lgr.debug(
432-
"%s: Upload completed. Response content: %s",
433-
asset_path,
434-
r.content,
435-
)
436-
rxml = fromstring(r.text)
437-
m = re.match(r"\{.+?\}", rxml.tag)
438-
ns = m.group(0) if m else ""
439-
final_etag = rxml.findtext(f"{ns}ETag")
440-
if final_etag is not None:
441-
final_etag = final_etag.strip('"')
442-
if final_etag != filetag:
443-
raise RuntimeError(
444-
"Server and client disagree on final ETag of"
445-
f" uploaded file; server says {final_etag},"
446-
f" client says {filetag}"
447-
)
448-
# else: Error? Warning?
449-
resp = client.post(f"/uploads/{upload_id}/validate/")
450-
blob_id = resp["blob_id"]
451-
except Exception:
452-
post_upload_size_check(self.filepath, total_size, True)
453-
raise
454-
else:
455-
post_upload_size_check(self.filepath, total_size, False)
456364
lgr.debug("%s: Assigning asset blob to dandiset & version", asset_path)
457365
yield {"status": "producing asset"}
458366
if replacing is not None:
@@ -696,6 +604,147 @@ def _upload_blob_part(
696604
}
697605

698606

607+
def _execute_multipart_upload(
608+
client: RESTFullAPIClient,
609+
upload_id: str,
610+
parts: list[dict],
611+
etagger: DandiETag,
612+
filepath: Path,
613+
asset_path: str,
614+
total_size: int,
615+
upload_prefix: str,
616+
jobs: int | None = None,
617+
) -> Generator[dict, None, dict]:
618+
"""Execute the parallel part-upload phase of a multipart upload.
619+
620+
Yields progress dicts and returns the validate response, allowing the
621+
caller to extract ``blob_id`` or any other field as needed.
622+
"""
623+
if len(parts) != etagger.part_qty:
624+
raise RuntimeError(
625+
f"Server and client disagree on number of parts for upload;"
626+
f" server says {len(parts)}, client says {etagger.part_qty}"
627+
)
628+
filetag = etagger.as_str()
629+
parts_out = []
630+
bytes_uploaded = 0
631+
lgr.debug("Uploading %s in %d parts", filepath, len(parts))
632+
with RESTFullAPIClient("http://nil.nil") as storage:
633+
with filepath.open("rb") as fp:
634+
with ThreadPoolExecutor(max_workers=jobs or 5) as executor:
635+
lock = Lock()
636+
futures = [
637+
executor.submit(
638+
_upload_blob_part,
639+
storage_session=storage,
640+
fp=fp,
641+
lock=lock,
642+
etagger=etagger,
643+
asset_path=asset_path,
644+
part=part,
645+
)
646+
for part in parts
647+
]
648+
for fut in as_completed(futures):
649+
out_part = fut.result()
650+
bytes_uploaded += out_part["size"]
651+
yield {
652+
"status": "uploading",
653+
"progress": 100 * bytes_uploaded / total_size,
654+
"current": bytes_uploaded,
655+
}
656+
parts_out.append(out_part)
657+
lgr.debug("%s: Completing upload", asset_path)
658+
resp = client.post(
659+
f"{upload_prefix}/{upload_id}/complete/",
660+
json={"parts": parts_out},
661+
)
662+
lgr.debug(
663+
"%s: Announcing completion to %s",
664+
asset_path,
665+
resp["complete_url"],
666+
)
667+
r = storage.post(resp["complete_url"], data=resp["body"], json_resp=False)
668+
lgr.debug(
669+
"%s: Upload completed. Response content: %s",
670+
asset_path,
671+
r.content,
672+
)
673+
rxml = fromstring(r.text)
674+
m = re.match(r"\{.+?\}", rxml.tag)
675+
ns = m.group(0) if m else ""
676+
final_etag = rxml.findtext(f"{ns}ETag")
677+
if final_etag is not None:
678+
final_etag = final_etag.strip('"')
679+
if final_etag != filetag:
680+
raise RuntimeError(
681+
"Server and client disagree on final ETag of"
682+
f" uploaded file; server says {final_etag},"
683+
f" client says {filetag}"
684+
)
685+
# else: Error? Warning?
686+
return client.post(f"{upload_prefix}/{upload_id}/validate/")
687+
688+
689+
def _multipart_upload(
690+
client: RESTFullAPIClient,
691+
filepath: Path,
692+
asset_path: str,
693+
upload_prefix: str,
694+
extra_init_fields: dict,
695+
jobs: int | None = None,
696+
expected_etag: str | None = None,
697+
) -> Generator[dict, None, dict]:
698+
"""Perform a full multipart upload: etag calculation, initialization, part upload, and validation.
699+
700+
Yields progress dicts and returns the validate response dict. If
701+
``expected_etag`` is provided and does not match the computed etag, raises
702+
``RuntimeError``. A 409 HTTPError from the initialize call propagates
703+
unchanged so the caller can handle the "blob already exists" case.
704+
"""
705+
# Avoid heavy import by importing within function:
706+
from dandi.support.digests import get_dandietag
707+
708+
yield {"status": "calculating etag"}
709+
etagger = get_dandietag(filepath)
710+
filetag = etagger.as_str()
711+
lgr.debug("Calculated dandi-etag of %s for %s", filetag, filepath)
712+
if expected_etag is not None and filetag != expected_etag:
713+
raise RuntimeError(
714+
f"{filepath}: File etag changed; was originally"
715+
f" {expected_etag} but is now {filetag}"
716+
)
717+
yield {"status": "initiating upload"}
718+
lgr.debug("%s: Beginning upload", asset_path)
719+
total_size = pre_upload_size_check(filepath)
720+
resp = client.post(
721+
f"{upload_prefix}/initialize/",
722+
json={
723+
"contentSize": total_size,
724+
"digest": {"algorithm": "dandi:dandi-etag", "value": filetag},
725+
**extra_init_fields,
726+
},
727+
)
728+
try:
729+
validate_resp = yield from _execute_multipart_upload(
730+
client=client,
731+
upload_id=resp["upload_id"],
732+
parts=resp["parts"],
733+
etagger=etagger,
734+
filepath=filepath,
735+
asset_path=asset_path,
736+
total_size=total_size,
737+
upload_prefix=upload_prefix,
738+
jobs=jobs,
739+
)
740+
except Exception:
741+
post_upload_size_check(filepath, total_size, True)
742+
raise
743+
else:
744+
post_upload_size_check(filepath, total_size, False)
745+
return validate_resp
746+
747+
699748
def _check_required_fields(
700749
d: dict, required: list[str], file_path: str
701750
) -> list[ValidationResult]:

0 commit comments

Comments
 (0)