Skip to content

Commit c5bc328

Browse files
author
Gereon Elvers
committed
OSFDownloadMixin: lazy per-file resolution + progress logs
The previous implementation built a global manifest of every file across every configured OSF node before scheduling any download. For multi- component projects like MEG-MASC (4 nodes, ~1300 files) that is a ~3-minute API walk, completely silent. Datasets that target a known subset of files paid the full cost regardless. Now: - resolve_remote_file(rel_path) walks only the folders on the path to a single file (~5 s cold for the first folder; subsequent siblings are served from a per-folder cache). Used by _schedule_download. - get_dataset_manifest is preserved for callers that genuinely need a global file listing (and now logs per-node progress so the wait is visible). - A small _log helper, gated on the same PNPL_OSF_PROGRESS env var as download bars, surfaces what the mixin is doing. Verified on the VM: cold construction of Gwilliams2022 for a single sub/ses/task that previously sat silent for ~3 min before the first download now starts the download within ~5 s, with full e2e dropping from ~3 min to ~36 s (most of which is the 200 MB raw KIT download).
1 parent 0fbaffe commit c5bc328

1 file changed

Lines changed: 162 additions & 13 deletions

File tree

pnpl/datasets/mixins/osf_download.py

Lines changed: 162 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ class OSFDownloadMixin:
5757
_download_futures: ClassVar[dict] = {}
5858
_lock: ClassVar[threading.Lock] = threading.Lock()
5959
_manifest_cache: ClassVar[dict] = {}
60+
_folder_listing_cache: ClassVar[dict] = {}
6061
_print_lock: ClassVar[threading.Lock] = threading.Lock()
6162

6263
OSF_PROJECT_ID: ClassVar[str] = ""
@@ -152,15 +153,17 @@ def _schedule_download(self, fpath: str):
152153

153154
with self._lock:
154155
if fpath not in self._download_futures:
156+
# Lazy lookup: walk only the folders on the path to this file
157+
# rather than building a global manifest of every file in
158+
# every configured node.
159+
entry = type(self).resolve_remote_file(rel_path)
155160
self._download_futures[fpath] = self._executor.submit(
156161
self._download_with_retry_static,
157162
fpath=fpath,
158163
rel_path=rel_path,
159-
nodes=self._project_nodes(),
160-
api_base=self.OSF_API_BASE.rstrip("/"),
164+
entry=entry,
161165
files_base=self.OSF_FILES_BASE.rstrip("/"),
162166
token=self._osf_token(),
163-
manifest=type(self).get_dataset_manifest(),
164167
)
165168
return self._download_futures[fpath]
166169

@@ -178,6 +181,11 @@ def get_dataset_manifest(cls, refresh: bool = False) -> dict:
178181
},
179182
...
180183
}
184+
185+
Walks the entire folder tree across every configured node, which
186+
can take minutes for large datasets. When the caller knows which
187+
files it needs, prefer :meth:`resolve_remote_file` — it walks only
188+
the folders along the path to a single file.
181189
"""
182190
nodes = cls._project_nodes()
183191
if not nodes:
@@ -193,7 +201,16 @@ def get_dataset_manifest(cls, refresh: bool = False) -> dict:
193201
token = cls._osf_token()
194202
manifest: dict[str, dict] = {}
195203

204+
# Surface what we're doing — the walk is silent otherwise and can
205+
# take minutes for multi-component projects like MEG-MASC.
206+
cls._log(
207+
f"OSF: fetching manifest from {len(nodes)} node(s) "
208+
f"({', '.join(nodes)})… one-time, may take a while."
209+
)
210+
196211
for node_id in nodes:
212+
t0 = time.monotonic()
213+
count_before = len(manifest)
197214
for entry in cls._walk_node(node_id, api_base, token):
198215
rel_path = entry["path"]
199216
if rel_path in manifest:
@@ -204,16 +221,156 @@ def get_dataset_manifest(cls, refresh: bool = False) -> dict:
204221
"file_id": entry["file_id"],
205222
"size": entry["size"],
206223
}
224+
cls._log(
225+
f"OSF: {node_id}{len(manifest) - count_before} new files "
226+
f"in {time.monotonic() - t0:.1f}s ({len(manifest)} total)"
227+
)
207228

208229
cls._manifest_cache[cache_key] = manifest
209230
return manifest
210231

232+
@classmethod
233+
def _log(cls, message: str) -> None:
234+
"""Best-effort progress line, gated on PNPL_OSF_PROGRESS like the
235+
download progress bar. Goes to stderr so it interleaves with
236+
download bars cleanly."""
237+
if not cls._progress_enabled():
238+
return
239+
with cls._print_lock:
240+
sys.stderr.write(message + "\n")
241+
sys.stderr.flush()
242+
211243
@classmethod
212244
def list_remote_files(cls, refresh: bool = False) -> list[str]:
213245
"""Return dataset-relative file paths advertised by the OSF manifest."""
214246
manifest = cls.get_dataset_manifest(refresh=refresh)
215247
return sorted(manifest.keys())
216248

249+
# ------------------------------------------------------------------
250+
# Lazy per-file resolution (no global manifest)
251+
# ------------------------------------------------------------------
252+
253+
@classmethod
254+
def resolve_remote_file(cls, rel_path: str) -> dict:
255+
"""Resolve a single file's OSF location by walking only the
256+
folders on its path (~3-4 API calls per *new* folder, served
257+
from cache thereafter).
258+
259+
Returns ``{"node_id", "file_id", "size"}``.
260+
261+
If a global manifest has already been fetched, that entry wins —
262+
callers are still free to call :meth:`get_dataset_manifest` for
263+
bulk operations and benefit from the cache.
264+
"""
265+
# Manifest hit?
266+
nodes = cls._project_nodes()
267+
if not nodes:
268+
raise RuntimeError(
269+
"OSF_PROJECT_ID is not set on the dataset class; cannot resolve from OSF."
270+
)
271+
cache_key = (cls.OSF_API_BASE.rstrip("/"), tuple(nodes), cls._osf_token() or "")
272+
cached_manifest = cls._manifest_cache.get(cache_key)
273+
if cached_manifest is not None and rel_path in cached_manifest:
274+
return cached_manifest[rel_path]
275+
276+
api_base = cls.OSF_API_BASE.rstrip("/")
277+
token = cls._osf_token()
278+
parts = [p for p in rel_path.split("/") if p]
279+
if not parts:
280+
raise FileNotFoundError(f"Empty OSF path: {rel_path!r}")
281+
282+
last_exc: Optional[BaseException] = None
283+
for node_id in nodes:
284+
try:
285+
entry = cls._lookup_in_node(node_id, parts, api_base, token)
286+
except FileNotFoundError as exc:
287+
last_exc = exc
288+
continue
289+
if entry is not None:
290+
return entry
291+
292+
raise FileNotFoundError(
293+
f"OSF file not found in any of {nodes}: '{rel_path}'."
294+
) from last_exc
295+
296+
@classmethod
297+
def _lookup_in_node(
298+
cls,
299+
node_id: str,
300+
parts: list[str],
301+
api_base: str,
302+
token: Optional[str],
303+
) -> Optional[dict]:
304+
"""Walk down ``node_id``'s osfstorage to find ``parts[0]/parts[1]/...``.
305+
Returns the file entry or None if any path component is missing.
306+
Folder listings are cached per (node, url) so subsequent lookups
307+
of siblings under the same parent are free."""
308+
folder_url = f"{api_base}/nodes/{quote(node_id, safe='')}/files/osfstorage/"
309+
310+
for i, part in enumerate(parts):
311+
listing = cls._list_folder(node_id, folder_url, token)
312+
match = next((e for e in listing if e["name"] == part), None)
313+
if match is None:
314+
return None
315+
if i == len(parts) - 1:
316+
if match["kind"] != "file":
317+
return None
318+
return {
319+
"node_id": node_id,
320+
"file_id": match["file_id"],
321+
"size": match["size"],
322+
}
323+
if match["kind"] != "folder" or not match["files_url"]:
324+
return None
325+
folder_url = match["files_url"]
326+
return None
327+
328+
@classmethod
329+
def _list_folder(
330+
cls,
331+
node_id: str,
332+
url: str,
333+
token: Optional[str],
334+
) -> list[dict]:
335+
"""List immediate children of an OSF folder URL. Cached per
336+
(node, url, token)."""
337+
cache = cls._folder_listing_cache
338+
cache_key = (node_id, url, token or "")
339+
cached = cache.get(cache_key)
340+
if cached is not None:
341+
return cached
342+
343+
headers = {"Accept": "application/vnd.api+json"}
344+
if token:
345+
headers["Authorization"] = f"Bearer {token}"
346+
347+
entries: list[dict] = []
348+
next_url: Optional[str] = (
349+
url + ("?" if "?" not in url else "&") + "page%5Bsize%5D=100"
350+
)
351+
while next_url:
352+
payload = cls._get_with_retry(next_url, headers)
353+
for entry in payload.get("data", []):
354+
attrs = entry.get("attributes", {}) or {}
355+
name = attrs.get("name")
356+
if not name:
357+
continue
358+
files_url = (
359+
((entry.get("relationships") or {}).get("files") or {})
360+
.get("links", {}).get("related", {}).get("href")
361+
)
362+
entries.append({
363+
"kind": attrs.get("kind"),
364+
"name": name,
365+
"file_id": entry.get("id"),
366+
"size": attrs.get("size"),
367+
"files_url": files_url,
368+
})
369+
next_url = (payload.get("links") or {}).get("next")
370+
371+
cache[cache_key] = entries
372+
return entries
373+
217374
# ------------------------------------------------------------------
218375
# Manifest construction — recursive folder walk
219376
# ------------------------------------------------------------------
@@ -304,23 +461,15 @@ def _download_with_retry_static(
304461
cls,
305462
fpath: str,
306463
rel_path: str,
307-
nodes: list[str],
308-
api_base: str,
464+
entry: dict,
309465
files_base: str,
310466
token: Optional[str],
311-
manifest: dict,
312467
max_retries: int = 5,
313468
timeout_download_s: int = 120,
314469
) -> str:
315-
if not nodes:
316-
raise RuntimeError(
317-
"OSF_PROJECT_ID is not set on the dataset class; cannot download from OSF."
318-
)
319-
320-
entry = manifest.get(rel_path)
321470
if entry is None:
322471
raise FileNotFoundError(
323-
f"OSF file not found in manifest for any of {nodes}: '{rel_path}'."
472+
f"OSF file not resolved before download: '{rel_path}'."
324473
)
325474

326475
node_id = entry["node_id"]

0 commit comments

Comments
 (0)