Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions flytekit/image_spec/default_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
ImageSpecBuilder,
_find_git_root,
discover_nix_flake_path_inputs,
discover_transitive_nix_flake_path_inputs,
)
from flytekit.tools.ignore import DockerIgnore, GitIgnore, IgnoreGroup, StandardIgnore
from flytekit.tools.script_mode import is_vendorable_repo, ls_files
Expand Down Expand Up @@ -430,6 +431,40 @@ def _copy_local_packages_and_update_lock(image_spec: ImageSpec, tmp_dir: Path):
# Track mapping for flake.lock updates
flake_path_replacements[old_rel_path] = new_rel_path

# Copy transitive path dependencies from flake.lock (dependencies of dependencies).
# Direct inputs are already handled above; transitive ones have a non-empty parent chain.
# Since local_packages/ mirrors the monorepo structure, relative paths between
# transitive deps are preserved automatically — no flake.lock path rewriting needed.
already_resolved = {inp.src_path.resolve() for inp in inputs}
transitive_inputs = discover_transitive_nix_flake_path_inputs(
lock_dir, flake_lock_content, already_resolved
)
for t_inp in transitive_inputs:
t_src = t_inp.src_path
t_git_root = _find_git_root(str(t_src))
if t_git_root is None:
continue

t_rel = os.path.relpath(path=str(t_src), start=str(t_git_root))
t_target = local_packages_dir / t_rel
t_target.parent.mkdir(parents=True, exist_ok=True)

if t_src.is_dir():
t_ignore = IgnoreGroup(str(t_src), [GitIgnore, DockerIgnore, StandardIgnore])
t_files, _ = ls_files(
str(t_src),
CopyFileDetection.ALL,
deref_symlinks=False,
ignore_group=t_ignore,
)
for t_file in t_files:
t_file_rel = os.path.relpath(t_file, start=str(t_src))
t_file_dst = t_target / t_file_rel
t_file_dst.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(t_file, t_file_dst)
else:
shutil.copy2(str(t_src), str(t_target))

# Update flake.lock JSON for nodes that reference local path inputs
if flake_path_replacements:
try:
Expand Down
95 changes: 95 additions & 0 deletions flytekit/image_spec/image_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,101 @@ def discover_nix_flake_path_inputs(
return inputs


def discover_transitive_nix_flake_path_inputs(
lock_dir: typing.Union[str, pathlib.Path],
flake_lock_content: str,
already_resolved: typing.Optional[typing.Set[pathlib.Path]] = None,
) -> List[NixFlakePathInput]:
"""Discover transitive Nix flake path inputs from flake.lock.

Direct path inputs (parent=[]) are handled by discover_nix_flake_path_inputs
via flake.nix parsing. This function finds transitive path inputs — those with
a non-empty parent chain in flake.lock — which are dependencies of dependencies.

Path resolution: each node's ``locked.path`` is relative to the directory of its
declaring parent (the last element in the ``parent`` chain). We walk the chain
recursively to compute the absolute source path.

Args:
lock_dir: Directory containing flake.lock (same as flake.nix location).
flake_lock_content: Contents of flake.lock as a string.
already_resolved: Set of already-resolved absolute paths to skip (from direct inputs).

Returns:
List of NixFlakePathInput entries for transitive path dependencies only.
"""
import json as _json

lock_dir_path = pathlib.Path(lock_dir)
already_resolved = already_resolved or set()

try:
lock_obj = _json.loads(flake_lock_content)
except Exception:
return []

nodes = lock_obj.get("nodes", {})
abs_cache: typing.Dict[str, typing.Optional[pathlib.Path]] = {}

def _resolve(node_name: str) -> typing.Optional[pathlib.Path]:
if node_name in abs_cache:
return abs_cache[node_name]

node = nodes.get(node_name)
if not node:
abs_cache[node_name] = None
return None

locked = node.get("locked", {})
if locked.get("type") != "path":
abs_cache[node_name] = None
return None

rel_path = locked.get("path", "")
parent_chain = node.get("parent", [])

if not parent_chain:
result = (lock_dir_path / rel_path).resolve()
else:
declaring_parent = parent_chain[-1]
parent_abs = _resolve(declaring_parent)
if parent_abs is None:
abs_cache[node_name] = None
return None
result = (parent_abs / rel_path).resolve()

abs_cache[node_name] = result
return result

transitive: List[NixFlakePathInput] = []
for node_name, node in nodes.items():
if node_name == "root":
continue
locked = node.get("locked", {})
if locked.get("type") != "path":
continue
parent_chain = node.get("parent", [])
if not parent_chain:
continue

src_path = _resolve(node_name)
if src_path is None or not src_path.exists():
continue
if src_path in already_resolved:
continue

rel_path = locked.get("path", "")
transitive.append(
NixFlakePathInput(
full_spec=f"path:{rel_path}",
old_rel_path=rel_path,
src_path=src_path,
)
)

return transitive


def _compute_directory_digest(dir_path: pathlib.Path) -> str:
"""Compute a stable digest for a directory mirroring builder copy logic (respects .gitignore/.dockerignore)."""
# Imports here to avoid circular imports at module load time
Expand Down