Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
306 changes: 300 additions & 6 deletions flytekit/image_spec/default_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
from pathlib import Path
from string import Template
from subprocess import run
from typing import ClassVar, List, NamedTuple
from typing import ClassVar, List, NamedTuple, Optional
from urllib.parse import parse_qs, urlparse

import click
import toml
Expand Down Expand Up @@ -235,6 +236,10 @@ def _copy_local_packages_and_update_lock(image_spec: ImageSpec, tmp_dir: Path):

# Copy each local package from the lock file and update its path
for package in lock_data["package"]:
if "source" not in package:
non_vendored_packages.append(package)
continue

source = package["source"]

if "directory" in source:
Expand Down Expand Up @@ -485,8 +490,8 @@ def _copy_local_packages_and_update_lock(image_spec: ImageSpec, tmp_dir: Path):

# Export requirements from uv.lock to requirements.txt format
# This excludes editable installs (-e) and local relative path dependencies
requirements_export_cmd = rf"uv export --format requirements-txt {image_spec.uv_export_args} | grep -v '^\(-e\|\.\./\)' > {requirements_path}"
subprocess.run(requirements_export_cmd, shell=True, check=True)
requirements_export_cmd = rf"uv export --frozen --format requirements-txt {image_spec.uv_export_args} | grep -v '^\(-e\|\.\./\)' > {requirements_path}"
subprocess.run(requirements_export_cmd, shell=True, check=True, cwd=tmp_dir)

# Write local packages file
local_packages_path = tmp_dir / "local_packages.txt"
Expand All @@ -498,8 +503,18 @@ def _copy_lock_files_into_context(image_spec: ImageSpec, lock_file: str, tmp_dir
msg = f"Support for {lock_file} files and packages is mutually exclusive"
raise ValueError(msg)

# Copy and update local packages first
_copy_local_packages_and_update_lock(image_spec, tmp_dir)
if lock_file == "uv.lock":
_copy_local_packages_and_update_lock(image_spec, tmp_dir)
return

lock_path = Path(image_spec.requirements)
lock_dir = lock_path.parent
pyproject_path = lock_dir / "pyproject.toml"
if not pyproject_path.exists():
raise ValueError(f"pyproject.toml must exist in the same directory as {lock_file}")

shutil.copy2(lock_path, tmp_dir / lock_file)
shutil.copy2(pyproject_path, tmp_dir / "pyproject.toml")


def prepare_uv_lock_command(image_spec: ImageSpec, pip_install_args: List[str], tmp_dir: Path) -> str:
Expand Down Expand Up @@ -569,6 +584,275 @@ class _PythonInstallTemplate(NamedTuple):
extra_path: str


class _NixRemoteBuilder(NamedTuple):
store_uri: str
system: str
ssh_host: str
ssh_key: Optional[str]


def _split_nix_config_assignments(config_text: str, key: str) -> List[str]:
values: List[str] = []
pattern = re.compile(rf"^\s*{re.escape(key)}\s*=\s*(.+?)\s*$")
for line in config_text.splitlines():
match = pattern.match(line)
if match:
values.append(match.group(1))
return values


def _machine_file_paths_from_config(config_text: str) -> List[Path]:
paths = []
for builders_value in _split_nix_config_assignments(config_text, "builders"):
for builder_ref in builders_value.split():
if builder_ref.startswith("@"):
paths.append(Path(builder_ref[1:]).expanduser())
return paths


def _nix_config_paths() -> List[Path]:
paths = []
xdg_config_home = os.environ.get("XDG_CONFIG_HOME")
if xdg_config_home:
paths.append(Path(xdg_config_home) / "nix" / "nix.conf")
else:
home = os.environ.get("HOME")
if home:
paths.append(Path(home) / ".config" / "nix" / "nix.conf")
paths.append(Path("/etc/nix/nix.conf"))
return paths


def _machine_file_paths() -> List[Path]:
paths = []
explicit_path = os.environ.get("FLYTEKIT_NIX_REMOTE_BUILDERS_FILE")
if explicit_path:
paths.append(Path(explicit_path).expanduser())

nix_config = os.environ.get("NIX_CONFIG")
if nix_config:
paths.extend(_machine_file_paths_from_config(nix_config))

for config_path in _nix_config_paths():
if config_path.exists():
paths.extend(_machine_file_paths_from_config(config_path.read_text()))

unique_paths = []
seen = set()
for path in paths:
path_key = os.fspath(path)
if path_key not in seen:
seen.add(path_key)
unique_paths.append(path)
return unique_paths


def _ssh_host_from_store_uri(store_uri: str) -> Optional[str]:
parsed_uri = urlparse(store_uri)
if parsed_uri.scheme not in {"ssh", "ssh-ng"} or not parsed_uri.netloc:
return None
return parsed_uri.netloc


def _ssh_key_from_store_uri(store_uri: str) -> Optional[str]:
parsed_uri = urlparse(store_uri)
query = parse_qs(parsed_uri.query)
ssh_key_values = query.get("ssh-key")
if not ssh_key_values:
return None
return ssh_key_values[0]


def _local_ssh_key_path(ssh_key: Optional[str]) -> Optional[str]:
if not ssh_key or ssh_key == "-":
return None

home = os.environ.get("HOME")
if home:
home_key = Path(home) / ".ssh" / Path(ssh_key).name
if home_key.exists():
return os.fspath(home_key)

return ssh_key


def _store_uri_with_ssh_key(builder: _NixRemoteBuilder) -> str:
if not builder.ssh_key:
return builder.store_uri

parsed_uri = urlparse(builder.store_uri)
query_parts = parsed_uri.query.split("&") if parsed_uri.query else []
next_query_parts = []
replaced_ssh_key = False
for query_part in query_parts:
if query_part.split("=", 1)[0] == "ssh-key":
next_query_parts.append(f"ssh-key={builder.ssh_key}")
replaced_ssh_key = True
else:
next_query_parts.append(query_part)

if not replaced_ssh_key:
next_query_parts.append(f"ssh-key={builder.ssh_key}")

return parsed_uri._replace(query="&".join(next_query_parts)).geturl()


def _ssh_binary() -> Optional[str]:
candidates = [
shutil.which("ssh"),
"/run/current-system/sw/bin/ssh",
"/usr/bin/ssh",
"/bin/ssh",
]
for candidate in candidates:
if candidate and os.access(candidate, os.X_OK):
return candidate
return None


def _nix_sshopts() -> str:
ssh_options = [
"-o", "StrictHostKeyChecking=no",
"-o", "UserKnownHostsFile=/dev/null",
"-o", "BatchMode=yes",
]
return " ".join(ssh_options)


def _parse_nix_machine_line(line: str) -> Optional[_NixRemoteBuilder]:
stripped = line.strip()
if not stripped or stripped.startswith("#"):
return None

parts = stripped.split()
if len(parts) < 2:
return None

store_uri = parts[0]
ssh_host = _ssh_host_from_store_uri(store_uri)
if ssh_host is None:
return None

ssh_key = _ssh_key_from_store_uri(store_uri)
if ssh_key is None and len(parts) >= 3:
ssh_key = parts[2]

return _NixRemoteBuilder(
store_uri=store_uri,
system=parts[1],
ssh_host=ssh_host,
ssh_key=_local_ssh_key_path(ssh_key),
)


def _configured_nix_remote_builders() -> List[_NixRemoteBuilder]:
builders = []
inline_builders = os.environ.get("FLYTEKIT_NIX_REMOTE_BUILDERS")
if inline_builders:
for line in inline_builders.splitlines():
builder = _parse_nix_machine_line(line)
if builder:
builders.append(builder)

for machine_file_path in _machine_file_paths():
if not machine_file_path.exists():
continue
for line in machine_file_path.read_text().splitlines():
builder = _parse_nix_machine_line(line)
if builder:
builders.append(builder)

return builders


def _select_nix_remote_builder(nix_system: str) -> Optional[_NixRemoteBuilder]:
if os.environ.get("FLYTEKIT_NIX_REMOTE_PUSH", "").lower() in {"0", "false", "no"}:
return None

for builder in _configured_nix_remote_builders():
if nix_system in builder.system.split(","):
return builder
return None


def _ssh_command(builder: _NixRemoteBuilder, remote_command: List[str]) -> List[str]:
command = [
_ssh_binary() or "ssh",
"-o", "StrictHostKeyChecking=no",
"-o", "UserKnownHostsFile=/dev/null",
"-o", "BatchMode=yes",
]
if builder.ssh_key:
command.extend(["-i", builder.ssh_key])
command.append(builder.ssh_host)
command.extend(remote_command)
return command


def _remote_nix_copy_to_ecr(
*,
tmp_dir: str,
nix_system: str,
image_name: str,
ecr_token: str,
builder: _NixRemoteBuilder,
) -> None:
push_attr = f"packages.{nix_system}.push-to-ecr"
build_command = [
"nix", "build",
"--no-link",
"--print-out-paths",
"--eval-store", "auto",
"--store", _store_uri_with_ssh_key(builder),
"--builders", "",
"--builders-use-substitutes",
"--system", nix_system,
f"path:{tmp_dir}#{push_attr}",
]
click.secho(f"Remote nix2container build on {builder.ssh_host}: {' '.join(build_command)}", fg="blue")
ssh_binary = _ssh_binary()
path = os.environ.get("PATH", "")
if ssh_binary:
path = f"{Path(ssh_binary).parent}{os.pathsep}{path}"
build_env = {
**os.environ,
"NIX_SSHOPTS": _nix_sshopts(),
"PATH": path,
}
build_result = run(build_command, capture_output=True, text=True, env=build_env)
if build_result.returncode != 0:
raise RuntimeError(
f"Remote nix2container build failed with exit code {build_result.returncode}: "
f"{' '.join(build_command)}\n{build_result.stderr}"
)

output_paths = [line.strip() for line in build_result.stdout.splitlines() if line.strip()]
if not output_paths:
raise RuntimeError(f"Remote nix2container build produced no output path: {' '.join(build_command)}")

push_to_ecr_bin = f"{output_paths[-1]}/bin/push-to-ecr"
push_command = _ssh_command(
builder,
[
"env",
f"IMAGE_NAME={image_name}",
f"ECR_TOKEN={ecr_token}",
push_to_ecr_bin,
],
)
log_push_command = list(push_command)
for i, arg in enumerate(log_push_command):
if arg.startswith("ECR_TOKEN="):
log_push_command[i] = "ECR_TOKEN=[REDACTED]"
click.secho(f"Remote nix2container push on {builder.ssh_host}: {' '.join(log_push_command)}", fg="blue")
push_result = run(push_command)
if push_result.returncode != 0:
raise RuntimeError(
f"Remote nix2container push failed with exit code {push_result.returncode}: "
f"{' '.join(log_push_command)}"
)


def prepare_python_executable(image_spec: ImageSpec) -> _PythonInstallTemplate:
if image_spec.python_exec:
if image_spec.conda_channels:
Expand Down Expand Up @@ -837,7 +1121,17 @@ def _build_image(self, image_spec: ImageSpec, *, push: bool = True) -> str:
["aws", "ecr", "get-login-password", "--region", "us-west-2"],
capture_output=True, text=True, check=True,
).stdout.strip()
if is_cross_build:
remote_builder = _select_nix_remote_builder(nix_system)
if remote_builder:
_remote_nix_copy_to_ecr(
tmp_dir=tmp_dir,
nix_system=nix_system,
image_name=image_spec.image_name(),
ecr_token=ecr_token,
builder=remote_builder,
)
return image_spec.image_name()
elif is_cross_build:
docker_attr = f"packages.{local_system}.docker-{nix_system}.copyTo"
click.secho(f"Cross-build: {nix_system} image via {local_system} n2c", fg="yellow")
else:
Expand Down
Loading