Skip to content

Commit b427dc7

Browse files
committed
WIP
1 parent 31f99b5 commit b427dc7

3 files changed

Lines changed: 66 additions & 24 deletions

File tree

dandi/files/bases.py

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,27 @@
11
from __future__ import annotations
22

3+
import os
4+
import re
35
from abc import ABC, abstractmethod
46
from collections import deque
57
from collections.abc import Iterator
68
from concurrent.futures import ThreadPoolExecutor, as_completed
79
from dataclasses import dataclass
810
from datetime import datetime
9-
import os
1011
from pathlib import Path
11-
import re
1212
from threading import Lock
1313
from typing import IO, Any, Generic
1414
from xml.etree.ElementTree import fromstring
1515

1616
import dandischema
17+
import requests
1718
from dandischema.consts import DANDI_SCHEMA_VERSION
1819
from dandischema.digests.dandietag import DandiETag
19-
from dandischema.models import BareAsset, CommonModel
20+
from dandischema.models import BareAsset, CommonModel, get_schema_version
2021
from dandischema.models import Dandiset as DandisetMeta
21-
from dandischema.models import get_schema_version
2222
from packaging.version import Version
2323
from pydantic import ValidationError
2424
from pydantic_core import ErrorDetails
25-
import requests
2625

2726
import dandi
2827
from dandi.dandiapi import RemoteAsset, RemoteDandiset, RESTFullAPIClient
@@ -320,6 +319,7 @@ def iter_upload(
320319
metadata: dict[str, Any],
321320
jobs: int | None = None,
322321
replacing: RemoteAsset | None = None,
322+
zarr_id: str = "",
323323
) -> Iterator[dict]:
324324
"""
325325
Upload the file as an asset with the given metadata to the given
@@ -335,6 +335,9 @@ def iter_upload(
335335
:param RemoteAsset replacing:
336336
If set, replace the given asset, which must have the same path as
337337
the new asset
338+
:param str zarr_id:
339+
The zarr id being uploaded to, if applicable. If set, this will
340+
modify the request params to the archive.
338341
:returns:
339342
A generator of `dict`\\s containing at least a ``"status"`` key.
340343
Upon successful upload, the last `dict` will have a status of
@@ -361,17 +364,23 @@ def iter_upload(
361364
lgr.debug("%s: Beginning upload", asset_path)
362365
total_size = pre_upload_size_check(self.filepath)
363366
try:
364-
resp = client.post(
365-
"/uploads/initialize/",
366-
json={
367-
"contentSize": total_size,
368-
"digest": {
369-
"algorithm": "dandi:dandi-etag",
370-
"value": filetag,
371-
},
372-
"dandiset": dandiset.identifier,
367+
body = {
368+
"contentSize": total_size,
369+
"digest": {
370+
"algorithm": "dandi:dandi-etag",
371+
"value": filetag,
373372
},
374-
)
373+
}
374+
375+
if zarr_id:
376+
body["zarr"] = {
377+
"chunk_key": asset_path,
378+
"zarr_id": zarr_id,
379+
}
380+
else:
381+
body["dandiset"] = dandiset.identifier
382+
383+
resp = client.post("/uploads/initialize/", json=body)
375384
except requests.HTTPError as e:
376385
if e.response is not None and e.response.status_code == 409:
377386
lgr.debug("%s: Blob already exists on server", asset_path)
@@ -583,8 +592,8 @@ def get_validation_errors(
583592
)
584593

585594
# Avoid circular imports by importing within function:
586-
from .bids import NWBBIDSAsset
587595
from ..organize import validate_organized_path
596+
from .bids import NWBBIDSAsset
588597

589598
if not isinstance(self, NWBBIDSAsset) and self.dandiset_path is not None:
590599
errors.extend(

dandi/files/zarr.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,23 @@
11
from __future__ import annotations
22

3+
import json
4+
import os
5+
import os.path
6+
import urllib.parse
37
from base64 import b64encode
48
from collections.abc import Generator, Iterator
59
from concurrent.futures import Future, ThreadPoolExecutor, as_completed
610
from contextlib import closing
711
from dataclasses import dataclass, field, replace
812
from datetime import datetime
913
from enum import Enum
10-
import json
11-
import os
12-
import os.path
1314
from pathlib import Path
1415
from time import sleep
1516
from typing import Any, Optional
16-
import urllib.parse
1717

18+
import requests
1819
from dandischema.models import BareAsset, DigestType
1920
from pydantic import BaseModel, ConfigDict, ValidationError
20-
import requests
2121
from zarr_checksum.tree import ZarrChecksumTree
2222

2323
from dandi import __version__ as dandi_version
@@ -47,7 +47,6 @@
4747
pre_upload_size_check,
4848
)
4949

50-
from .bases import LocalDirectoryAsset, LocalFileAsset
5150
from ..validate._types import (
5251
ORIGIN_VALIDATION_DANDI_ZARR,
5352
Origin,
@@ -58,6 +57,7 @@
5857
ValidationResult,
5958
Validator,
6059
)
60+
from .bases import LocalDirectoryAsset, LocalFileAsset
6161

6262
lgr = get_logger()
6363

@@ -764,7 +764,10 @@ def mkzarr() -> str:
764764
path=it.entry_path,
765765
)
766766
for status in large_asset.iter_upload(
767-
dandiset, {"path": it.entry_path}, jobs=jobs
767+
dandiset,
768+
{"path": it.entry_path},
769+
jobs=jobs,
770+
zarr_id=zarr_id,
768771
):
769772
if status.get("status") == "done":
770773
changed = True

dandi/tests/test_files.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import os
55
from pathlib import Path
66
import subprocess
7-
from unittest.mock import ANY
7+
from unittest.mock import ANY, patch
88

99
from dandischema.models import get_schema_version
1010
import numpy as np
@@ -536,6 +536,36 @@ def test_upload_zarr_entry_content_type(new_dandiset, tmp_path):
536536
assert r.headers["Content-Type"] == "application/json"
537537

538538

539+
@pytest.mark.ai_generated
540+
def test_upload_zarr_large_chunks(new_dandiset, tmp_path):
541+
"""Chunks above ZARR_LARGE_CHUNK_THRESHOLD are uploaded via LocalFileAsset.iter_upload."""
542+
filepath = tmp_path / "example.zarr"
543+
zarr.save(filepath, np.arange(1000), np.arange(1000, 0, -1))
544+
zf = dandi_file(filepath)
545+
assert isinstance(zf, ZarrAsset)
546+
547+
from ..files.bases import LocalFileAsset
548+
549+
real_iter_upload = LocalFileAsset.iter_upload
550+
called_paths: list[str] = []
551+
552+
def spy_iter_upload(self, *args, **kwargs):
553+
called_paths.append(self.path)
554+
yield from real_iter_upload(self, *args, **kwargs)
555+
556+
# Set threshold to 0 so every chunk is treated as "large"
557+
with (
558+
patch("dandi.files.zarr.ZARR_LARGE_CHUNK_THRESHOLD", 0),
559+
patch.object(LocalFileAsset, "iter_upload", spy_iter_upload),
560+
):
561+
asset = zf.upload(new_dandiset.dandiset, {})
562+
563+
assert isinstance(asset, RemoteZarrAsset)
564+
# Every chunk file in the zarr should have been routed through LocalFileAsset.iter_upload
565+
remote_entries = {str(e) for e in asset.iterfiles()}
566+
assert remote_entries == set(called_paths)
567+
568+
539569
def test_validate_deep_zarr(tmp_path: Path) -> None:
540570
zarr_path = tmp_path / "foo.zarr"
541571
zarr.save(zarr_path, np.arange(1000), np.arange(1000, 0, -1))

0 commit comments

Comments
 (0)