Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 152 additions & 17 deletions webknossos/webknossos/client/_upload_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,23 @@
from upath import UPath

from .. import LayerToLink
from ..dataset import Dataset
from ..dataset import Attachment, Dataset, MagView
from ..datastore import Datastore
from ..utils import get_rich_progress
from ._resumable import Resumable
from .api_client.models import (
ApiDatasetUploadInformation,
ApiReserveDatasetUploadInformation,
ApiAttachmentProperties,
ApiAttachmentUploadInfo,
ApiDatasetUploadInfo,
ApiMagProperties,
ApiMagUploadInfo,
ApiResumableUploadInfo,
)
from .context import _get_context, _WebknossosContext, webknossos_context

DEFAULT_SIMULTANEOUS_UPLOADS = 5
MAXIMUM_RETRY_COUNT = 4
RESUMABLE_PERMANENT_ERROR_CODES = [400, 403, 404, 409, 415, 500, 501]


@cache
Expand All @@ -43,6 +48,137 @@ def _walk(
yield (path.resolve(), path.relative_to(base_path), path.stat().st_size)


def upload_mag(
dataset_id: str,
layer_name: str,
mag: MagView,
datastore_url: str | None = None,
jobs: int | None = None,
) -> None:
context = _get_context()
file_infos = list(_walk(mag.path))
total_file_size = sum(size for _, _, size in file_infos)
upload_id = _generate_upload_id()
datastore_url = datastore_url or _cached_get_upload_datastore(context)
datastore_api_client = context.get_datastore_api_client(datastore_url)
simultaneous_uploads = jobs if jobs is not None else DEFAULT_SIMULTANEOUS_UPLOADS
if "PYTEST_CURRENT_TEST" in os.environ:
simultaneous_uploads = 1
datastore_api_client.mag_reserve_upload(
mag_upload_info=ApiMagUploadInfo(
resumable_upload_info=ApiResumableUploadInfo(
upload_id=upload_id,
total_file_count=len(file_infos),
total_file_size_in_bytes=total_file_size,
),
dataset_id=dataset_id,
layer_name=layer_name,
mag=ApiMagProperties(
mag=mag.mag.to_tuple(),
channel_index=None,
),
overwritePending=True, # TODO
),
retry_count=MAXIMUM_RETRY_COUNT,
)
with get_rich_progress() as progress:
with Resumable(
f"{datastore_api_client.url_prefix}/datasets/upload/mag",
simultaneous_uploads=simultaneous_uploads,
query={
"totalFileCount": len(file_infos),
},
headers={"X-Auth-Token": context.token},
chunk_size=100 * 1024 * 1024, # 100 MiB
generate_unique_identifier=lambda _, relative_path: (
f"{upload_id}/{relative_path.as_posix()}"
),
test_chunks=False,
permanent_errors=RESUMABLE_PERMANENT_ERROR_CODES,
client=httpx.Client(timeout=None),
) as session:
progress_task = progress.add_task("Mag Upload", total=total_file_size)
for file_path, relative_path, _ in file_infos:
resumable_file = session.add_file(file_path, relative_path)
resumable_file.chunk_completed.register(
lambda chunk: progress.advance(progress_task, chunk.size)
)
datastore_api_client.mag_finish_upload(
upload_id=upload_id,
retry_count=MAXIMUM_RETRY_COUNT,
)


def upload_attachment(
dataset_id: str,
layer_name: str,
attachment: Attachment,
datastore_url: str | None = None,
jobs: int | None = None,
) -> None:
context = _get_context()
file_infos = list(_walk(attachment.path))
total_file_size = sum(size for _, _, size in file_infos)
upload_id = _generate_upload_id()
datastore_url = datastore_url or _cached_get_upload_datastore(context)
datastore_api_client = context.get_datastore_api_client(datastore_url)
simultaneous_uploads = jobs if jobs is not None else DEFAULT_SIMULTANEOUS_UPLOADS
if "PYTEST_CURRENT_TEST" in os.environ:
simultaneous_uploads = 1
datastore_api_client.attachment_reserve_upload(
attachment_upload_info=ApiAttachmentUploadInfo(
resumable_upload_info=ApiResumableUploadInfo(
upload_id=upload_id,
total_file_count=len(file_infos),
total_file_size_in_bytes=total_file_size,
),
dataset_id=dataset_id,
layer_name=layer_name,
attachment_type=attachment.type_name,
attachment=ApiAttachmentProperties(
name=attachment.name,
path="dummy_path",
dataFormat=str(attachment.data_format),
),
overwritePending=True, # TODO
),
retry_count=MAXIMUM_RETRY_COUNT,
)
with get_rich_progress() as progress:
with Resumable(
f"{datastore_api_client.url_prefix}/datasets/upload/attachment",
simultaneous_uploads=simultaneous_uploads,
query={
"totalFileCount": len(file_infos),
},
headers={"X-Auth-Token": context.token},
chunk_size=100 * 1024 * 1024, # 100 MiB
generate_unique_identifier=lambda _, relative_path: (
f"{upload_id}/{relative_path.as_posix()}"
),
test_chunks=False,
permanent_errors=RESUMABLE_PERMANENT_ERROR_CODES,
client=httpx.Client(timeout=None),
) as session:
progress_task = progress.add_task(
"Attachment Upload", total=total_file_size
)
for file_path, relative_path, _ in file_infos:
resumable_file = session.add_file(file_path, relative_path)
resumable_file.chunk_completed.register(
lambda chunk: progress.advance(progress_task, chunk.size)
)
datastore_api_client.attachment_finish_upload(
upload_id=upload_id,
retry_count=MAXIMUM_RETRY_COUNT,
)


def _generate_upload_id() -> str:
time_str = strftime("%Y-%m-%dT%H-%M-%S", gmtime())
return f"{time_str}__{uuid4()}"


def upload_dataset(
dataset: Dataset,
new_dataset_name: str | None = None,
Expand Down Expand Up @@ -76,8 +212,7 @@ def upload_dataset(
file_infos = list(_walk(dataset.path))
total_file_size = sum(size for _, _, size in file_infos)
# replicates https://github.com/scalableminds/webknossos/blob/master/frontend/javascripts/admin/dataset/dataset_upload_view.js
time_str = strftime("%Y-%m-%dT%H-%M-%S", gmtime())
upload_id = f"{time_str}__{uuid4()}"
upload_id = _generate_upload_id()
datastore_url = datastore_url or _cached_get_upload_datastore(context)
datastore_api_client = context.get_datastore_api_client(datastore_url)
simultaneous_uploads = jobs if jobs is not None else DEFAULT_SIMULTANEOUS_UPLOADS
Expand All @@ -95,27 +230,27 @@ def upload_dataset(
)

datastore_api_client.dataset_reserve_upload(
reserve_upload_information=ApiReserveDatasetUploadInformation(
upload_id,
new_dataset_name,
context.organization_id,
total_file_count=len(file_infos),
total_file_size_in_bytes=total_file_size,
dataset_upload_info=ApiDatasetUploadInfo(
resumable_upload_info=ApiResumableUploadInfo(
upload_id=upload_id,
total_file_count=len(file_infos),
total_file_size_in_bytes=total_file_size,
),
dataset_name=new_dataset_name,
organization_id=context.organization_id,
layers_to_link=[
layer._as_api_linked_layer_identifier() for layer in layers_to_link
],
folder_id=folder_id,
initial_teams=[],
initial_team_ids=[],
),
retry_count=MAXIMUM_RETRY_COUNT,
)
with get_rich_progress() as progress:
with Resumable(
f"{datastore_url}/data/datasets",
f"{datastore_api_client.url_prefix}/datasets/upload/dataset",
simultaneous_uploads=simultaneous_uploads,
query={
"owningOrganization": context.organization_id,
"name": new_dataset_name,
"totalFileCount": len(file_infos),
},
headers={"X-Auth-Token": context.token},
Expand All @@ -124,7 +259,7 @@ def upload_dataset(
f"{upload_id}/{relative_path.as_posix()}"
),
test_chunks=False,
permanent_errors=[400, 403, 404, 409, 415, 500, 501],
permanent_errors=RESUMABLE_PERMANENT_ERROR_CODES,
client=httpx.Client(timeout=None),
) as session:
progress_task = progress.add_task("Dataset Upload", total=total_file_size)
Expand All @@ -135,7 +270,7 @@ def upload_dataset(
)

dataset_id = datastore_api_client.dataset_finish_upload(
upload_information=ApiDatasetUploadInformation(upload_id),
upload_id=upload_id,
retry_count=MAXIMUM_RETRY_COUNT,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def __init__(
self,
timeout_seconds: float,
headers: dict[str, str] | None = None,
webknossos_api_version: int = 13,
webknossos_api_version: int = 14,
):
self.headers = headers
self.timeout_seconds = timeout_seconds
Expand Down
65 changes: 55 additions & 10 deletions webknossos/webknossos/client/api_client/datastore_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

from webknossos.client.api_client.models import (
ApiAdHocMeshInfo,
ApiDatasetUploadInformation,
ApiAttachmentUploadInfo,
ApiDatasetUploadInfo,
ApiDatasetUploadSuccess,
ApiMagUploadInfo,
ApiPrecomputedMeshInfo,
ApiReserveDatasetUploadInformation,
)

from ._abstract_api_client import LONG_TIMEOUT_SECONDS, AbstractApiClient, Query
Expand All @@ -31,32 +32,76 @@ def __init__(
def url_prefix(self) -> str:
return f"{self.datastore_base_url}/data/v{self.webknossos_api_version}"

def mag_reserve_upload(
self, *, mag_upload_info: ApiMagUploadInfo, retry_count: int
) -> None:
self._post_json(
"/datasets/upload/mag/reserveUpload",
mag_upload_info,
retry_count=retry_count,
)

def mag_finish_upload(
self,
*,
upload_id: str,
retry_count: int,
) -> None:
self._post(
"/datasets/upload/mag/finishUpload",
query={"uploadId": upload_id},
retry_count=retry_count,
timeout_seconds=LONG_TIMEOUT_SECONDS,
)

def attachment_reserve_upload(
self, *, attachment_upload_info: ApiAttachmentUploadInfo, retry_count: int
) -> None:
self._post_json(
"/datasets/upload/attachment/reserveUpload",
attachment_upload_info,
retry_count=retry_count,
)

def attachment_finish_upload(
self,
*,
upload_id: str,
retry_count: int,
) -> None:
self._post(
"/datasets/upload/attachment/finishUpload",
query={"uploadId": upload_id},
retry_count=retry_count,
timeout_seconds=LONG_TIMEOUT_SECONDS,
)

def dataset_finish_upload(
self,
*,
upload_information: ApiDatasetUploadInformation,
upload_id: str,
retry_count: int,
) -> str:
route = "/datasets/finishUpload"
json = self._post_json_with_json_response(
route = "/datasets/upload/dataset/finishUpload"
json = self._post_with_json_response(
route,
upload_information,
query={"uploadId": upload_id},
retry_count=retry_count,
timeout_seconds=LONG_TIMEOUT_SECONDS,
response_type=ApiDatasetUploadSuccess,
)
return json.new_dataset_id
return json.dataset_id

def dataset_reserve_upload(
self,
*,
reserve_upload_information: ApiReserveDatasetUploadInformation,
dataset_upload_info: ApiDatasetUploadInfo,
retry_count: int,
) -> None:
route = "/datasets/reserveUpload"
route = "/datasets/upload/dataset/reserveUpload"
self._post_json(
route,
reserve_upload_information,
dataset_upload_info,
retry_count=retry_count,
)

Expand Down
Loading
Loading