Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 37 additions & 12 deletions packages/data_designer_nemo/src/data_designer_nemo/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
)
from data_designer_nemo.errors import NDDError
from data_designer_nemo.fileset_file_seed_reader import FilesetFileSeedReader
from data_designer_nemo.fileset_filesystem_provider import (
FilesetFileSystemProvider,
HybridFileSystemProvider,
)
from data_designer_nemo.model_provider import (
make_local_first_model_provider_registry,
make_model_provider_registry,
Expand All @@ -33,10 +37,7 @@
from data_designer_nemo.sdk_translation import sync_to_async_sdk
from data_designer_nemo.secret_resolver import NMPSecretResolver
from data_designer_nemo.seed import validate_seed
from data_designer_nemo.unsupported_features import (
validate_no_tool_configs,
validate_seed_config_for_execution_context,
)
from data_designer_nemo.tool_configs import validate_no_tool_configs
from nemo_platform import AsyncNeMoPlatform, NeMoPlatform


Expand Down Expand Up @@ -64,6 +65,7 @@ class LocalDataDesignerContext:
def __init__(self, sdk: AsyncNeMoPlatform | NeMoPlatform, workspace: str):
self._sdk = sdk
self._workspace = workspace
self._validated_filesystem_roots: set[str] = set()

def get_secret_resolver(self) -> SecretResolver:
return CompositeResolver(
Expand All @@ -75,20 +77,31 @@ def get_secret_resolver(self) -> SecretResolver:
)

async def validate(self, config: dd.DataDesignerConfig) -> list[NDDError]:
sdk = self._async_sdk()
errors: list[NDDError] = []

try:
validate_seed_config_for_execution_context(config, is_local=True)
if validated_root := await validate_seed(config, self._workspace, sdk, is_local=True):
self._validated_filesystem_roots.add(validated_root)
except NDDError as e:
errors.append(e)

return errors

def get_seed_readers(self) -> list[SeedReader]:
# Directory- and FileContents-style seeds may reference either a local
# directory or a NeMo Platform fileset in local mode. The engine only
# accepts one provider per reader, so we inject a hybrid provider that
# resolves each seed path against local disk first, then a fileset.
fs_provider = HybridFileSystemProvider(
self._sdk, workspace=self._workspace, validated_roots=self._validated_filesystem_roots
)
return [
HuggingFaceSeedReader(),
LocalFileSeedReader(),
DataFrameSeedReader(),
DirectorySeedReader(),
FileContentsSeedReader(),
DirectorySeedReader(fs_provider=fs_provider),
FileContentsSeedReader(fs_provider=fs_provider),
AgentRolloutSeedReader(),
FilesetFileSeedReader(self._sdk),
]
Expand All @@ -108,11 +121,17 @@ async def get_model_providers(self, model_configs: list[dd.ModelConfig]) -> list

return [make_noop_provider()]

def _async_sdk(self) -> AsyncNeMoPlatform:
if isinstance(self._sdk, NeMoPlatform):
return sync_to_async_sdk(self._sdk)
return self._sdk


class RemoteDataDesignerContext:
def __init__(self, sdk: AsyncNeMoPlatform | NeMoPlatform, workspace: str):
self._sdk = sdk
self._workspace = workspace
self._validated_filesystem_roots: set[str] = set()

def get_secret_resolver(self) -> SecretResolver:
return NMPSecretResolver(self._sdk, self._workspace)
Expand All @@ -125,14 +144,13 @@ async def validate(self, config: dd.DataDesignerConfig) -> list[NDDError]:
validate_no_tool_configs(config)
except NDDError as e:
errors.append(e)

try:
validate_seed_config_for_execution_context(config, is_local=False)
except NDDError as e:
errors.append(e)
try:
await validate_seed(config, self._workspace, sdk)
if validated_root := await validate_seed(config, self._workspace, sdk, is_local=False):
self._validated_filesystem_roots.add(validated_root)
except NDDError as e:
errors.append(e)

try:
await ensure_nemotron_personas_filesets(config, sdk)
except NDDError as e:
Expand All @@ -141,9 +159,16 @@ async def validate(self, config: dd.DataDesignerConfig) -> list[NDDError]:
return errors

def get_seed_readers(self) -> list[SeedReader]:
provider = FilesetFileSystemProvider(
self._sdk,
workspace=self._workspace,
validated_roots=self._validated_filesystem_roots,
)
return [
HuggingFaceSeedReader(),
FilesetFileSeedReader(self._sdk),
DirectorySeedReader(fs_provider=provider),
FileContentsSeedReader(fs_provider=provider),
]

def get_person_reader(self) -> PersonReader:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

from pathlib import Path, PurePosixPath

from data_designer.engine.resources.seed_reader import (
FileSystemProvider,
LocalFileSystemProvider,
SeedReaderConfigError,
SeedReaderError,
SeedReaderFileSystemContext,
)
from data_designer_nemo.sdk_translation import async_to_sync_sdk
from fsspec.implementations.dirfs import DirFileSystem
from nemo_platform import AsyncNeMoPlatform, NeMoPlatform
from nemo_platform.filesets import FilesetFileSystem, FilesetPathError, build_fileset_ref, parse_fileset_ref


class _FilesetDirFileSystem(DirFileSystem):
"""DirFileSystem that handles FilesetFileSystem's '#' path separator.
Comment thread
mikeknep marked this conversation as resolved.

FilesetFileSystem returns paths using '#' to separate the fileset name from
the file path (e.g. "ws/fs#data.parquet"). Standard DirFileSystem._relpath
builds its strip-prefix with '/' (e.g. "ws/fs/"), so the startswith check
fails for fileset-root paths. For subdirectory roots (e.g. "ws/fs#subdir"),
files use '/' after '#' and the standard logic already works; the '#' branch
below is a no-op in that case.

All methods besides _relpath are inherited from DirFileSystem unchanged, so
this remains a complete AbstractFileSystem implementation.
"""

def _relpath(self, path: str | list) -> str | list:
if isinstance(path, list):
return [self._relpath(p) for p in path]
if not self.path:
return path
if path == self.path:
return ""
for sep in ("#", "/"):
prefix = self.path + sep
if path.startswith(prefix):
return path[len(prefix) :]
raise AssertionError(f"Path {path!r} does not start with root {self.path!r}")


class FilesetFileSystemProvider:
"""Filesystem provider that roots directory-style seed readers in a fileset."""

def __init__(
self,
sdk: NeMoPlatform | AsyncNeMoPlatform,
*,
workspace: str,
validated_roots: set[str] | None = None,
) -> None:
if isinstance(sdk, AsyncNeMoPlatform):
sdk = async_to_sync_sdk(sdk)
self._sdk = sdk
self._workspace = workspace
self._validated_roots = set() if validated_roots is None else validated_roots

def create_context(self, *, runtime_path: str) -> SeedReaderFileSystemContext:
root = self._canonical_root(runtime_path)
rooted_fs = _FilesetDirFileSystem(path=root, fs=FilesetFileSystem(self._sdk))
return SeedReaderFileSystemContext(fs=rooted_fs, root_path=PurePosixPath(root))

def ensure_root_exists(self, *, runtime_path: str) -> None:
workspace, fileset, fragment = self._parse(runtime_path)
root = build_fileset_ref(fragment, workspace=workspace, fileset=fileset)
if root in self._validated_roots:
return

fs = FilesetFileSystem(self._sdk)
if fs.exists(root):
self._validated_roots.add(root)
return

fileset_root = build_fileset_ref("", workspace=workspace, fileset=fileset)
fully_qualified_fileset_name = f"{workspace}/{fileset}"
if not fs.exists(fileset_root):
raise SeedReaderConfigError(f"🛑 Fileset {fully_qualified_fileset_name!r} not found.")
Comment thread
mikeknep marked this conversation as resolved.
raise SeedReaderConfigError(f"🛑 Path {fragment!r} not found in fileset {fully_qualified_fileset_name!r}.")

def _canonical_root(self, runtime_path: str) -> str:
workspace, fileset, fragment = self._parse(runtime_path)
return build_fileset_ref(fragment, workspace=workspace, fileset=fileset)

def _parse(self, runtime_path: str) -> tuple[str, str, str]:
try:
return parse_fileset_ref(runtime_path, workspace_fallback=self._workspace)
except FilesetPathError as error:
raise SeedReaderError(f"🛑 Invalid fileset seed source path {runtime_path!r}: {error}") from error


class HybridFileSystemProvider:
Comment thread
mikeknep marked this conversation as resolved.
"""Filesystem provider that resolves a seed path against local disk first, then a fileset.

In local mode a directory-style seed source may point at either a directory on
the local filesystem or a NeMo Platform fileset, and the engine only lets us
inject a single provider per seed reader. We route per path: if the path
resolves to an existing local directory we serve it from disk, otherwise we
treat it as a fileset reference. This mirrors the local-first model-provider
resolution strategy (locally-defined providers first, Inference Gateway as the
fallback).
"""

def __init__(
self,
sdk: NeMoPlatform | AsyncNeMoPlatform,
*,
workspace: str,
validated_roots: set[str] | None = None,
) -> None:
self._local = LocalFileSystemProvider()
self._fileset = FilesetFileSystemProvider(sdk, workspace=workspace, validated_roots=validated_roots)

def create_context(self, *, runtime_path: str) -> SeedReaderFileSystemContext:
return self._route(runtime_path).create_context(runtime_path=runtime_path)

def ensure_root_exists(self, *, runtime_path: str) -> None:
self._route(runtime_path).ensure_root_exists(runtime_path=runtime_path)

def _route(self, runtime_path: str) -> FileSystemProvider:
return self._local if is_local_directory(runtime_path) else self._fileset


def is_local_directory(runtime_path: str) -> bool:
"""Whether a seed path resolves to an existing directory on the local filesystem.

Shared by ``HybridFileSystemProvider`` routing and local-mode seed validation so
that eager validation and read-time routing always agree on which backend serves
a given path.
"""
try:
return Path(runtime_path).expanduser().is_dir()
except (OSError, ValueError, RuntimeError):
return False
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ def make_null_registry() -> ModelProviderRegistry:
# is semantically valid. The library requires a non-empty ModelProviderRegistry, so in this scenario
# we can provide this dummy null registry.
return ModelProviderRegistry(
default=_NO_OP,
providers=[make_noop_provider()],
)

Expand All @@ -74,12 +73,6 @@ async def make_local_first_model_provider_registry(
if len(model_configs) == 0:
return None

missing_providers = [model_config for model_config in model_configs if model_config.provider is None]
if len(missing_providers) > 0:
raise NDDInvalidConfigError(
f"Error: following model configs do not have an explicit provider defined: {missing_providers}"
)

logger.info("Building model provider registry. First checking locally-defined providers.")

local_registry = _make_local_model_provider_registry()
Expand Down
Loading
Loading