-
Notifications
You must be signed in to change notification settings - Fork 6
feat: Support more Data Designer seed sources #413
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
mikeknep
wants to merge
16
commits into
main
Choose a base branch
from
remote-seeds/mknepper
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+707
−273
Open
Changes from all commits
Commits
Show all changes
16 commits
Select commit
Hold shift + click to select a range
ad409e7
Agent initial impl
mikeknep d6cb03a
Specify provider in test ModelConfigs
mikeknep 8f6d68f
Drop stale unit test cases since more seed source types are now suppo…
mikeknep 2980f88
Drop test rejecting provider-less ModelConfig
mikeknep a68bc2b
Drop guard against provider=None, no longer possible in library
mikeknep 6336763
Drop another none-provider test
mikeknep b30e927
No more default field on ModelProviderRegistry
mikeknep 8e5b2b1
Update test assertion to match modified error message content
mikeknep 6913604
One more missing provider on a ModelConfig
mikeknep 03a4815
Impl _FilesetDirFileSystem and some integration tests for the new see…
mikeknep 6eeeb5b
Dedupe some seed validation logic
mikeknep e8e95c8
Style
mikeknep b9d9ff8
Add hybrid fs provider
mikeknep 9f31bdc
Local validation plus some refactoring
mikeknep eb500d8
Tweak error message
mikeknep 698445b
Tweak another error message
mikeknep File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
138 changes: 138 additions & 0 deletions
138
packages/data_designer_nemo/src/data_designer_nemo/fileset_filesystem_provider.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,138 @@ | ||
| # SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| from pathlib import Path, PurePosixPath | ||
|
|
||
| from data_designer.engine.resources.seed_reader import ( | ||
| FileSystemProvider, | ||
| LocalFileSystemProvider, | ||
| SeedReaderConfigError, | ||
| SeedReaderError, | ||
| SeedReaderFileSystemContext, | ||
| ) | ||
| from data_designer_nemo.sdk_translation import async_to_sync_sdk | ||
| from fsspec.implementations.dirfs import DirFileSystem | ||
| from nemo_platform import AsyncNeMoPlatform, NeMoPlatform | ||
| from nemo_platform.filesets import FilesetFileSystem, FilesetPathError, build_fileset_ref, parse_fileset_ref | ||
|
|
||
|
|
||
| class _FilesetDirFileSystem(DirFileSystem): | ||
| """DirFileSystem that handles FilesetFileSystem's '#' path separator. | ||
|
|
||
| FilesetFileSystem returns paths using '#' to separate the fileset name from | ||
| the file path (e.g. "ws/fs#data.parquet"). Standard DirFileSystem._relpath | ||
| builds its strip-prefix with '/' (e.g. "ws/fs/"), so the startswith check | ||
| fails for fileset-root paths. For subdirectory roots (e.g. "ws/fs#subdir"), | ||
| files use '/' after '#' and the standard logic already works; the '#' branch | ||
| below is a no-op in that case. | ||
|
|
||
| All methods besides _relpath are inherited from DirFileSystem unchanged, so | ||
| this remains a complete AbstractFileSystem implementation. | ||
| """ | ||
|
|
||
| def _relpath(self, path: str | list) -> str | list: | ||
| if isinstance(path, list): | ||
| return [self._relpath(p) for p in path] | ||
| if not self.path: | ||
| return path | ||
| if path == self.path: | ||
| return "" | ||
| for sep in ("#", "/"): | ||
| prefix = self.path + sep | ||
| if path.startswith(prefix): | ||
| return path[len(prefix) :] | ||
| raise AssertionError(f"Path {path!r} does not start with root {self.path!r}") | ||
|
|
||
|
|
||
| class FilesetFileSystemProvider: | ||
| """Filesystem provider that roots directory-style seed readers in a fileset.""" | ||
|
|
||
| def __init__( | ||
| self, | ||
| sdk: NeMoPlatform | AsyncNeMoPlatform, | ||
| *, | ||
| workspace: str, | ||
| validated_roots: set[str] | None = None, | ||
| ) -> None: | ||
| if isinstance(sdk, AsyncNeMoPlatform): | ||
| sdk = async_to_sync_sdk(sdk) | ||
| self._sdk = sdk | ||
| self._workspace = workspace | ||
| self._validated_roots = set() if validated_roots is None else validated_roots | ||
|
|
||
| def create_context(self, *, runtime_path: str) -> SeedReaderFileSystemContext: | ||
| root = self._canonical_root(runtime_path) | ||
| rooted_fs = _FilesetDirFileSystem(path=root, fs=FilesetFileSystem(self._sdk)) | ||
| return SeedReaderFileSystemContext(fs=rooted_fs, root_path=PurePosixPath(root)) | ||
|
|
||
| def ensure_root_exists(self, *, runtime_path: str) -> None: | ||
| workspace, fileset, fragment = self._parse(runtime_path) | ||
| root = build_fileset_ref(fragment, workspace=workspace, fileset=fileset) | ||
| if root in self._validated_roots: | ||
| return | ||
|
|
||
| fs = FilesetFileSystem(self._sdk) | ||
| if fs.exists(root): | ||
| self._validated_roots.add(root) | ||
| return | ||
|
|
||
| fileset_root = build_fileset_ref("", workspace=workspace, fileset=fileset) | ||
| fully_qualified_fileset_name = f"{workspace}/{fileset}" | ||
| if not fs.exists(fileset_root): | ||
| raise SeedReaderConfigError(f"🛑 Fileset {fully_qualified_fileset_name!r} not found.") | ||
|
mikeknep marked this conversation as resolved.
|
||
| raise SeedReaderConfigError(f"🛑 Path {fragment!r} not found in fileset {fully_qualified_fileset_name!r}.") | ||
|
|
||
| def _canonical_root(self, runtime_path: str) -> str: | ||
| workspace, fileset, fragment = self._parse(runtime_path) | ||
| return build_fileset_ref(fragment, workspace=workspace, fileset=fileset) | ||
|
|
||
| def _parse(self, runtime_path: str) -> tuple[str, str, str]: | ||
| try: | ||
| return parse_fileset_ref(runtime_path, workspace_fallback=self._workspace) | ||
| except FilesetPathError as error: | ||
| raise SeedReaderError(f"🛑 Invalid fileset seed source path {runtime_path!r}: {error}") from error | ||
|
|
||
|
|
||
| class HybridFileSystemProvider: | ||
|
mikeknep marked this conversation as resolved.
|
||
| """Filesystem provider that resolves a seed path against local disk first, then a fileset. | ||
|
|
||
| In local mode a directory-style seed source may point at either a directory on | ||
| the local filesystem or a NeMo Platform fileset, and the engine only lets us | ||
| inject a single provider per seed reader. We route per path: if the path | ||
| resolves to an existing local directory we serve it from disk, otherwise we | ||
| treat it as a fileset reference. This mirrors the local-first model-provider | ||
| resolution strategy (locally-defined providers first, Inference Gateway as the | ||
| fallback). | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| sdk: NeMoPlatform | AsyncNeMoPlatform, | ||
| *, | ||
| workspace: str, | ||
| validated_roots: set[str] | None = None, | ||
| ) -> None: | ||
| self._local = LocalFileSystemProvider() | ||
| self._fileset = FilesetFileSystemProvider(sdk, workspace=workspace, validated_roots=validated_roots) | ||
|
|
||
| def create_context(self, *, runtime_path: str) -> SeedReaderFileSystemContext: | ||
| return self._route(runtime_path).create_context(runtime_path=runtime_path) | ||
|
|
||
| def ensure_root_exists(self, *, runtime_path: str) -> None: | ||
| self._route(runtime_path).ensure_root_exists(runtime_path=runtime_path) | ||
|
|
||
| def _route(self, runtime_path: str) -> FileSystemProvider: | ||
| return self._local if is_local_directory(runtime_path) else self._fileset | ||
|
|
||
|
|
||
| def is_local_directory(runtime_path: str) -> bool: | ||
| """Whether a seed path resolves to an existing directory on the local filesystem. | ||
|
|
||
| Shared by ``HybridFileSystemProvider`` routing and local-mode seed validation so | ||
| that eager validation and read-time routing always agree on which backend serves | ||
| a given path. | ||
| """ | ||
| try: | ||
| return Path(runtime_path).expanduser().is_dir() | ||
| except (OSError, ValueError, RuntimeError): | ||
| return False | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.