Skip to content

Commit 56126dd

Browse files
authored
Merge pull request #1846 from dandi/zarr-multipart-upload-enhs
Zarr multipart upload tune ups: condition on server capability, fix progress reporting, harmonize variable names
2 parents 871294b + 684da5c commit 56126dd

3 files changed

Lines changed: 82 additions & 25 deletions

File tree

dandi/dandiapi.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from datetime import datetime
2020
from enum import Enum
2121
from fnmatch import fnmatchcase
22+
from functools import cached_property
2223
import json
2324
import os.path
2425
from pathlib import Path, PurePosixPath
@@ -565,6 +566,27 @@ def _get_keyring_ids(self) -> tuple[str, str]:
565566
def _instance_id(self) -> str:
566567
return self.dandi_instance.name.upper()
567568

569+
@cached_property
570+
def supports_zarr_multipart_upload(self) -> bool:
571+
"""
572+
Whether the server exposes the zarr multipart upload endpoints
573+
introduced in dandi-archive#2784 (``/uploads/zarr/initialize/`` and
574+
friends).
575+
576+
Probed once per client by POSTing an empty body to
577+
``/uploads/zarr/initialize/``: if the route does not exist the server
578+
returns 404; any other status (400 for the bad payload, 401/403 for
579+
auth, etc.) means the route is present and the server supports
580+
multipart zarr uploads.
581+
"""
582+
try:
583+
self.post("/uploads/zarr/initialize/", json={})
584+
except HTTP404Error:
585+
return False
586+
except requests.HTTPError:
587+
return True
588+
return True
589+
568590
def get_dandiset(
569591
self, dandiset_id: str, version_id: str | None = None, lazy: bool = True
570592
) -> RemoteDandiset:

dandi/files/zarr.py

Lines changed: 48 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -743,22 +743,44 @@ def mkzarr() -> str:
743743
# Items to upload in this batch (may be retried e.g. due to
744744
# 403 errors because of timed-out upload URLs)
745745
all_items = list(items)
746-
large_items = [
746+
multipart_items = [
747747
it for it in all_items if it.size > ZARR_LARGE_CHUNK_THRESHOLD
748748
]
749-
items_to_upload = [
749+
singlepart_items = [
750750
it for it in all_items if it.size <= ZARR_LARGE_CHUNK_THRESHOLD
751751
]
752+
# TODO: remove once all servers are > 0.23.0 (i.e. ship
753+
# dandi-archive#2784) and the multipart zarr upload
754+
# endpoints are universally available; the capability
755+
# check would then be unnecessary.
756+
if multipart_items and not client.supports_zarr_multipart_upload:
757+
largest = max(multipart_items, key=lambda it: it.size)
758+
total_large_size = sum(it.size for it in multipart_items)
759+
raise UploadError(
760+
f"{asset_path}:"
761+
f" {pluralize(len(multipart_items), 'Zarr chunk')}"
762+
f" totaling {total_large_size / 1024**3:.2f} GiB"
763+
f" exceed the S3 single-part upload limit of"
764+
f" {ZARR_LARGE_CHUNK_THRESHOLD / 1024**3:.0f} GiB"
765+
f" (largest: {largest.entry_path},"
766+
f" {largest.size / 1024**3:.2f} GiB);"
767+
f" the server does not support multipart zarr"
768+
f" uploads (dandi-archive#2784)."
769+
)
752770
max_retries = 5
753771
retry_count = 0
754772
# Add all items to checksum tree (only done once)
755773
for it in all_items:
756774
zcc.add_leaf(Path(it.entry_path), it.size, it.digest)
757775

758-
# Upload chunks above 5GB individually via multipart upload
759-
for it in large_items:
760-
# Yield uploading status
761-
yield from _multipart_upload(
776+
# Upload chunks above 5GB individually via multipart upload.
777+
# ``_multipart_upload`` reports ``current`` as bytes within
778+
# the single chunk being uploaded; translate it to bytes
779+
# uploaded across the whole zarr so progress reporting
780+
# stays monotonic for downstream consumers.
781+
for it in multipart_items:
782+
cumulative_before = bytes_uploaded
783+
for status in _multipart_upload(
762784
client=client,
763785
filepath=it.filepath,
764786
asset_path=it.entry_path,
@@ -768,21 +790,26 @@ def mkzarr() -> str:
768790
"chunk_key": it.entry_path,
769791
},
770792
jobs=jobs,
771-
)
772-
773-
# Part is finished uploading, yield final progress
793+
):
794+
if (
795+
status.get("status") == "uploading"
796+
and "current" in status
797+
):
798+
cumulative = cumulative_before + status["current"]
799+
yield {
800+
"status": "uploading",
801+
"progress": 100 * cumulative / to_upload.total_size,
802+
"current": cumulative,
803+
}
804+
else:
805+
yield status
774806
changed = True
775807
bytes_uploaded += it.size
776-
yield {
777-
"status": "uploading",
778-
"progress": 100 * bytes_uploaded / to_upload.total_size,
779-
"current": bytes_uploaded,
780-
}
781808

782809
# Upload the remaining files using single part upload
783-
while items_to_upload and retry_count <= max_retries:
810+
while singlepart_items and retry_count <= max_retries:
784811
# Prepare upload requests for current items
785-
uploading = [it.upload_request() for it in items_to_upload]
812+
uploading = [it.upload_request() for it in singlepart_items]
786813

787814
if retry_count == 0:
788815
lgr.debug(
@@ -814,7 +841,7 @@ def mkzarr() -> str:
814841
upload_url=signed_url,
815842
item=it,
816843
)
817-
for (signed_url, it) in zip(r, items_to_upload)
844+
for (signed_url, it) in zip(r, singlepart_items)
818845
]
819846

820847
changed = True
@@ -846,20 +873,20 @@ def mkzarr() -> str:
846873
)
847874

848875
# Prepare for next iteration with retry items
849-
if items_to_upload := retry_items:
876+
if singlepart_items := retry_items:
850877
retry_count += 1
851878
if retry_count <= max_retries:
852879
lgr.info(
853880
"%s: %s got 403 errors, requesting new URLs",
854881
asset_path,
855-
pluralize(len(items_to_upload), "file"),
882+
pluralize(len(singlepart_items), "file"),
856883
)
857884
# Small delay before retry
858885
sleep(1 * retry_count)
859886

860887
# Check if we exhausted retries
861-
if items_to_upload:
862-
nfiles_str = pluralize(len(items_to_upload), "file")
888+
if singlepart_items:
889+
nfiles_str = pluralize(len(singlepart_items), "file")
863890
raise UploadError(
864891
f"{asset_path}: failed to upload {nfiles_str} "
865892
f"after {max_retries} retries due to repeated 403 errors"

dandi/tests/test_files.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
from ..consts import ZARR_MIME_TYPE, dandiset_metadata_file
1717
from ..dandiapi import AssetType, RemoteZarrAsset
1818
from ..exceptions import UnknownAssetError
19-
from ..files.bases import _multipart_upload as real_multipart_upload
2019
from ..files import (
2120
BIDSDatasetDescriptionAsset,
2221
DandisetMetadataFile,
@@ -30,6 +29,7 @@
3029
dandi_file,
3130
find_dandi_files,
3231
)
32+
from ..files.bases import _multipart_upload as real_multipart_upload
3333

3434
lgr = get_logger()
3535

@@ -540,6 +540,11 @@ def test_upload_zarr_entry_content_type(new_dandiset, tmp_path):
540540
@pytest.mark.ai_generated
541541
def test_upload_zarr_large_chunks(new_dandiset, tmp_path):
542542
"""Chunks above ZARR_LARGE_CHUNK_THRESHOLD are uploaded via multipart upload."""
543+
if not new_dandiset.client.supports_zarr_multipart_upload:
544+
pytest.skip(
545+
"Server does not expose the zarr multipart upload endpoints"
546+
" (dandi-archive#2784)"
547+
)
543548
filepath = tmp_path / "example.zarr"
544549
zarr.save(filepath, np.arange(1000), np.arange(1000, 0, -1))
545550
zf = dandi_file(filepath)
@@ -571,6 +576,11 @@ def spy_multipart_upload(**kwargs):
571576
@pytest.mark.ai_generated
572577
def test_upload_zarr_mixed_chunks(new_dandiset, tmp_path):
573578
"""Chunks above ZARR_LARGE_CHUNK_THRESHOLD go multipart; smaller ones use single-part upload."""
579+
if not new_dandiset.client.supports_zarr_multipart_upload:
580+
pytest.skip(
581+
"Server does not expose the zarr multipart upload endpoints"
582+
" (dandi-archive#2784)"
583+
)
574584
filepath = tmp_path / "mixed.zarr"
575585
store = zarr.open_group(str(filepath), mode="w")
576586
# small array: 10 int64 elements, produces a ~96-byte chunk (compressed)
@@ -601,9 +611,7 @@ def spy_multipart_upload(**kwargs):
601611
remote_entries = {str(e) for e in asset.iterfiles()}
602612
# Only chunk files whose on-disk size exceeds the threshold should be multipart-uploaded
603613
large_chunks = {
604-
p
605-
for p in remote_entries
606-
if (filepath / p).stat().st_size > mixed_threshold
614+
p for p in remote_entries if (filepath / p).stat().st_size > mixed_threshold
607615
}
608616
assert set(multipart_paths) == large_chunks
609617
# At least one chunk must have gone each path so the test is meaningful

0 commit comments

Comments
 (0)