Skip to content

Commit 54e5173

Browse files
jjnesbittyarikoptic
authored andcommitted
Use multi-part upload on files larger than 5GB
1 parent 4a14a1f commit 54e5173

3 files changed

Lines changed: 184 additions & 8 deletions

File tree

dandi/consts.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,9 @@ def urls(self) -> Iterator[str]:
199199
#: Maximum number of Zarr directory entries to delete at once
200200
ZARR_DELETE_BATCH_SIZE = 100
201201

202+
#: Zarr chunks above this size (bytes) are uploaded via multipart upload
203+
ZARR_LARGE_CHUNK_THRESHOLD = 5 * 1024 * 1024 * 1024 # 5 GB
204+
202205
BIDS_DATASET_DESCRIPTION = "dataset_description.json"
203206

204207
BIDS_IGNORE_FILE = ".bidsignore"

dandi/files/zarr.py

Lines changed: 143 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,13 @@
1010
import json
1111
import os
1212
import os.path
13+
import re
1314
from pathlib import Path
15+
from threading import Lock
1416
from time import sleep
1517
from typing import Any, Optional
1618
import urllib.parse
19+
from xml.etree.ElementTree import fromstring
1720

1821
from dandischema.models import BareAsset, DigestType
1922
from pydantic import BaseModel, ConfigDict, ValidationError
@@ -25,6 +28,7 @@
2528
from dandi.consts import (
2629
MAX_ZARR_DEPTH,
2730
ZARR_DELETE_BATCH_SIZE,
31+
ZARR_LARGE_CHUNK_THRESHOLD,
2832
ZARR_MIME_TYPE,
2933
ZARR_UPLOAD_BATCH_SIZE,
3034
)
@@ -46,7 +50,7 @@
4650
pre_upload_size_check,
4751
)
4852

49-
from .bases import LocalDirectoryAsset
53+
from .bases import LocalDirectoryAsset, _upload_blob_part
5054
from ..validate._types import (
5155
ORIGIN_VALIDATION_DANDI_ZARR,
5256
Origin,
@@ -742,13 +746,40 @@ def mkzarr() -> str:
742746
):
743747
# Items to upload in this batch (may be retried e.g. due to
744748
# 403 errors because of timed-out upload URLs)
745-
items_to_upload = list(items)
749+
all_items = list(items)
750+
large_items = [
751+
it for it in all_items if it.size > ZARR_LARGE_CHUNK_THRESHOLD
752+
]
753+
items_to_upload = [
754+
it for it in all_items if it.size <= ZARR_LARGE_CHUNK_THRESHOLD
755+
]
746756
max_retries = 5
747757
retry_count = 0
748758
# Add all items to checksum tree (only done once)
749-
for it in items_to_upload:
759+
for it in all_items:
750760
zcc.add_leaf(Path(it.entry_path), it.size, it.digest)
751761

762+
# Upload chunks above 5GB individually via multipart upload
763+
for it in large_items:
764+
for status in upload_zarr_file_multipart(
765+
item=it,
766+
zarr_id=zarr_id,
767+
dandiset=dandiset,
768+
jobs=jobs,
769+
):
770+
if status.get("status") == "done":
771+
changed = True
772+
bytes_uploaded += it.size
773+
yield {
774+
"status": "uploading",
775+
"progress": 100
776+
* bytes_uploaded
777+
/ to_upload.total_size,
778+
"current": bytes_uploaded,
779+
}
780+
else:
781+
yield status
782+
752783
while items_to_upload and retry_count <= max_retries:
753784
# Prepare upload requests for current items
754785
uploading = [it.upload_request() for it in items_to_upload]
@@ -903,6 +934,115 @@ def _handle_failed_items_and_raise(
903934
raise failed_items[0][1]
904935

905936

937+
def upload_zarr_file_multipart(
938+
item: UploadItem,
939+
zarr_id: str,
940+
dandiset: RemoteDandiset,
941+
jobs: int | None = None,
942+
):
943+
# Avoid heavy import by importing within function:
944+
from dandi.support.digests import get_dandietag
945+
946+
client = dandiset.client
947+
948+
yield {"status": "calculating etag"}
949+
etagger = get_dandietag(item.filepath)
950+
filetag = etagger.as_str()
951+
952+
yield {"status": "initiating upload"}
953+
lgr.debug("%s: Beginning upload", item.filepath)
954+
total_size = pre_upload_size_check(item.filepath)
955+
956+
resp = client.post(
957+
"/uploads/zarr/initialize/",
958+
json={
959+
"contentSize": total_size,
960+
"digest": {
961+
"algorithm": "dandi:dandi-etag",
962+
"value": filetag,
963+
},
964+
"zarr": {
965+
"chunk_key": item.entry_path,
966+
"zarr_id": zarr_id,
967+
},
968+
},
969+
)
970+
971+
try:
972+
upload_id = resp["upload_id"]
973+
parts = resp["parts"]
974+
if len(parts) != etagger.part_qty:
975+
raise RuntimeError(
976+
f"Server and client disagree on number of parts for upload;"
977+
f" server says {len(parts)}, client says {etagger.part_qty}"
978+
)
979+
parts_out = []
980+
bytes_uploaded = 0
981+
lgr.debug("Uploading %s in %d parts", item.filepath, len(parts))
982+
with RESTFullAPIClient("http://nil.nil") as storage:
983+
with item.filepath.open("rb") as fp:
984+
with ThreadPoolExecutor(max_workers=jobs or 5) as executor:
985+
lock = Lock()
986+
futures = [
987+
executor.submit(
988+
_upload_blob_part,
989+
storage_session=storage,
990+
fp=fp,
991+
lock=lock,
992+
etagger=etagger,
993+
asset_path=item.entry_path,
994+
part=part,
995+
)
996+
for part in parts
997+
]
998+
for fut in as_completed(futures):
999+
out_part = fut.result()
1000+
bytes_uploaded += out_part["size"]
1001+
yield {
1002+
"status": "uploading",
1003+
"progress": 100 * bytes_uploaded / total_size,
1004+
"current": bytes_uploaded,
1005+
}
1006+
parts_out.append(out_part)
1007+
1008+
lgr.debug("%s: Completing upload", item.entry_path)
1009+
resp = client.post(
1010+
f"/uploads/zarr/{upload_id}/complete/",
1011+
json={"parts": parts_out},
1012+
)
1013+
lgr.debug(
1014+
"%s: Announcing completion to %s",
1015+
item.entry_path,
1016+
resp["complete_url"],
1017+
)
1018+
r = storage.post(resp["complete_url"], data=resp["body"], json_resp=False)
1019+
lgr.debug(
1020+
"%s: Upload completed. Response content: %s",
1021+
item.entry_path,
1022+
r.content,
1023+
)
1024+
rxml = fromstring(r.text)
1025+
m = re.match(r"\{.+?\}", rxml.tag)
1026+
ns = m.group(0) if m else ""
1027+
final_etag = rxml.findtext(f"{ns}ETag")
1028+
if final_etag is not None:
1029+
final_etag = final_etag.strip('"')
1030+
if final_etag != filetag:
1031+
raise RuntimeError(
1032+
"Server and client disagree on final ETag of"
1033+
f" uploaded file; server says {final_etag},"
1034+
f" client says {filetag}"
1035+
)
1036+
# else: Error? Warning?
1037+
resp = client.post(f"/uploads/zarr/{upload_id}/validate/")
1038+
yield {"status": "done"}
1039+
except Exception:
1040+
post_upload_size_check(item.filepath, total_size, True)
1041+
raise
1042+
else:
1043+
post_upload_size_check(item.filepath, total_size, False)
1044+
1045+
9061046
def _upload_zarr_file(
9071047
storage_session: RESTFullAPIClient,
9081048
dandiset: RemoteDandiset,

dandi/tests/test_files.py

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,16 @@
11
from __future__ import annotations
22

3-
from operator import attrgetter
43
import os
5-
from pathlib import Path
64
import subprocess
7-
from unittest.mock import ANY
5+
from operator import attrgetter
6+
from pathlib import Path
7+
from unittest.mock import ANY, patch
88

9-
from dandischema.models import get_schema_version
109
import numpy as np
1110
import pytest
1211
import zarr
12+
from dandischema.models import get_schema_version
1313

14-
from .fixtures import SampleDandiset
1514
from .. import get_logger
1615
from ..consts import ZARR_MIME_TYPE, dandiset_metadata_file
1716
from ..dandiapi import AssetType, RemoteZarrAsset
@@ -29,6 +28,7 @@
2928
dandi_file,
3029
find_dandi_files,
3130
)
31+
from .fixtures import SampleDandiset
3232

3333
lgr = get_logger()
3434

@@ -536,6 +536,39 @@ def test_upload_zarr_entry_content_type(new_dandiset, tmp_path):
536536
assert r.headers["Content-Type"] == "application/json"
537537

538538

539+
@pytest.mark.ai_generated
540+
def test_upload_zarr_large_chunks(new_dandiset, tmp_path):
541+
"""Chunks above ZARR_LARGE_CHUNK_THRESHOLD are uploaded via upload_zarr_file_multipart."""
542+
filepath = tmp_path / "example.zarr"
543+
zarr.save(filepath, np.arange(1000), np.arange(1000, 0, -1))
544+
zf = dandi_file(filepath)
545+
assert isinstance(zf, ZarrAsset)
546+
547+
from ..files.zarr import upload_zarr_file_multipart
548+
549+
real_upload_zarr_file_multipart = upload_zarr_file_multipart
550+
called_paths: list[str] = []
551+
552+
def spy_upload_zarr_file_multipart(item, *args, **kwargs):
553+
called_paths.append(item.entry_path)
554+
yield from real_upload_zarr_file_multipart(item, *args, **kwargs)
555+
556+
# Set threshold to 0 so every chunk is treated as "large"
557+
with (
558+
patch("dandi.files.zarr.ZARR_LARGE_CHUNK_THRESHOLD", 0),
559+
patch(
560+
"dandi.files.zarr.upload_zarr_file_multipart",
561+
spy_upload_zarr_file_multipart,
562+
),
563+
):
564+
asset = zf.upload(new_dandiset.dandiset, {})
565+
566+
assert isinstance(asset, RemoteZarrAsset)
567+
# Every chunk file in the zarr should have been routed through upload_zarr_file_multipart
568+
remote_entries = {str(e) for e in asset.iterfiles()}
569+
assert remote_entries == set(called_paths)
570+
571+
539572
def test_validate_deep_zarr(tmp_path: Path) -> None:
540573
zarr_path = tmp_path / "foo.zarr"
541574
zarr.save(zarr_path, np.arange(1000), np.arange(1000, 0, -1))

0 commit comments

Comments
 (0)