Skip to content

Commit 25324bf

Browse files
committed
oci: add OciCredentials model and harden credential resolution
Introduce a Pydantic OciCredentials model that normalizes empty/whitespace strings and enforces both-or-neither validation. Update all callers to use the new model. Add tests for digest parsing, Docker Hub bare images, and whitespace env var handling. Assisted-by: claude-opus-4.6 Signed-off-by: Benny Zlotnik <bzlotnik@redhat.com>
1 parent e6a7540 commit 25324bf

8 files changed

Lines changed: 681 additions & 183 deletions

File tree

python/packages/jumpstarter-driver-flashers/jumpstarter_driver_flashers/client.py

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from __future__ import annotations
2+
13
import base64
24
import hashlib
35
import json
@@ -12,8 +14,12 @@
1214
from dataclasses import dataclass
1315
from pathlib import Path, PosixPath
1416
from queue import Queue
17+
from typing import TYPE_CHECKING
1518
from urllib.parse import urlparse
1619

20+
if TYPE_CHECKING:
21+
from jumpstarter.common.oci import OciCredentials
22+
1723
import click
1824
import pexpect
1925
import requests
@@ -137,7 +143,9 @@ def flash( # noqa: C901
137143

138144
if headers:
139145
headers = self._validate_header_dict(headers)
140-
oci_username, oci_password = self._resolve_oci_credentials(path, oci_username, oci_password)
146+
oci_creds = self._resolve_oci_credentials(path, oci_username, oci_password)
147+
oci_username = oci_creds.username
148+
oci_password = oci_creds.password.get_secret_value() if oci_creds.password else None
141149
should_download_to_httpd = True
142150
image_url = ""
143151
original_http_url = None
@@ -1291,34 +1299,25 @@ def _validate_bearer_token(self, token: str | None) -> str | None:
12911299

12921300
return token
12931301

1294-
def _validate_oci_credentials(self, username: str | None, password: str | None) -> tuple[str | None, str | None]:
1295-
if username is not None:
1296-
username = username.strip()
1297-
if password is not None:
1298-
password = password.strip()
1302+
def _resolve_oci_credentials(
1303+
self, path: PathBuf, username: str | None, password: str | None
1304+
) -> "OciCredentials":
1305+
from pydantic import SecretStr, ValidationError
12991306

1300-
if username == "":
1301-
username = None
1302-
if password == "":
1303-
password = None
1307+
from jumpstarter.common.oci import OciCredentials, resolve_oci_credentials
13041308

1305-
if bool(username) != bool(password):
1309+
try:
1310+
creds = OciCredentials(username=username, password=SecretStr(password) if password is not None else None)
1311+
except ValidationError as err:
13061312
raise click.ClickException(
13071313
"OCI authentication requires both OCI_USERNAME and OCI_PASSWORD "
13081314
"environment variables (or both oci_username and oci_password arguments)"
1309-
)
1310-
1311-
return username, password
1312-
1313-
def _resolve_oci_credentials(
1314-
self, path: PathBuf, username: str | None, password: str | None
1315-
) -> tuple[str | None, str | None]:
1316-
if username is None and password is None and path.startswith("oci://"):
1317-
from jumpstarter.common.oci import resolve_oci_credentials
1315+
) from err
13181316

1319-
username, password = resolve_oci_credentials(str(path))
1317+
if not creds.is_authenticated and path.startswith("oci://"):
1318+
return resolve_oci_credentials(str(path))
13201319

1321-
return self._validate_oci_credentials(username, password)
1320+
return creds
13221321

13231322
def _fls_oci_auth_env(self, path: PathBuf, creds_file: str | None) -> str:
13241323
if not str(path).startswith("oci://") or not creds_file:

python/packages/jumpstarter-driver-flashers/jumpstarter_driver_flashers/client_test.py

Lines changed: 49 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -47,24 +47,25 @@ def test_validate_bearer_token_fails_invalid():
4747
client._validate_bearer_token('token"with"quotes')
4848

4949

50-
def test_validate_oci_credentials_fails_when_partial():
51-
"""Test OCI credential validation fails when only one value is provided"""
50+
def test_resolve_oci_credentials_fails_when_partial():
51+
"""Test OCI credential resolution fails when only one value is provided"""
5252
client = MockFlasherClient()
5353

5454
with pytest.raises(click.ClickException, match="OCI authentication requires both"):
55-
client._validate_oci_credentials("myuser", None)
55+
client._resolve_oci_credentials("oci://quay.io/org/image:tag", "myuser", None)
5656

5757
with pytest.raises(click.ClickException, match="OCI authentication requires both"):
58-
client._validate_oci_credentials(None, "mypassword")
58+
client._resolve_oci_credentials("oci://quay.io/org/image:tag", None, "mypassword")
5959

6060

61-
def test_validate_oci_credentials_accepts_pair_and_strips_whitespace():
62-
"""Test OCI credential validation accepts full username/password pair"""
61+
def test_resolve_oci_credentials_accepts_pair_and_strips_whitespace():
62+
"""Test OCI credential resolution accepts full username/password pair and strips whitespace"""
6363
client = MockFlasherClient()
6464

65-
username, password = client._validate_oci_credentials(" myuser ", " mypassword ")
66-
assert username == "myuser"
67-
assert password == "mypassword"
65+
creds = client._resolve_oci_credentials("oci://quay.io/org/image:tag", " myuser ", " mypassword ")
66+
assert creds.username == "myuser"
67+
assert creds.password is not None
68+
assert creds.password.get_secret_value() == "mypassword"
6869

6970

7071
def test_resolve_oci_credentials_reads_env_for_oci_path(monkeypatch):
@@ -73,9 +74,10 @@ def test_resolve_oci_credentials_reads_env_for_oci_path(monkeypatch):
7374
monkeypatch.setenv("OCI_USERNAME", "env-user")
7475
monkeypatch.setenv("OCI_PASSWORD", "env-pass")
7576

76-
username, password = client._resolve_oci_credentials("oci://quay.io/org/image:tag", None, None)
77-
assert username == "env-user"
78-
assert password == "env-pass"
77+
creds = client._resolve_oci_credentials("oci://quay.io/org/image:tag", None, None)
78+
assert creds.username == "env-user"
79+
assert creds.password is not None
80+
assert creds.password.get_secret_value() == "env-pass"
7981

8082

8183
def test_resolve_oci_credentials_ignores_env_for_non_oci_path(monkeypatch):
@@ -84,30 +86,53 @@ def test_resolve_oci_credentials_ignores_env_for_non_oci_path(monkeypatch):
8486
monkeypatch.setenv("OCI_USERNAME", "env-user")
8587
monkeypatch.setenv("OCI_PASSWORD", "env-pass")
8688

87-
username, password = client._resolve_oci_credentials("https://example.com/image.raw.xz", None, None)
88-
assert username is None
89-
assert password is None
89+
creds = client._resolve_oci_credentials("https://example.com/image.raw.xz", None, None)
90+
assert creds.username is None
91+
assert creds.password is None
9092

9193

9294
def test_resolve_oci_credentials_partial_env_falls_through_to_auth_file(monkeypatch):
9395
"""Partial env vars should fall through to auth file lookup, not error."""
9496
from unittest.mock import patch
9597

98+
from pydantic import SecretStr
99+
100+
from jumpstarter.common.oci import OciCredentials
101+
96102
client = MockFlasherClient()
97103
monkeypatch.setenv("OCI_USERNAME", "env-user")
98104
monkeypatch.delenv("OCI_PASSWORD", raising=False)
99105

100-
# When auth file has no match, result is (None, None) — no error
101-
with patch("jumpstarter.common.oci.read_auth_file_credentials", return_value=(None, None)):
102-
username, password = client._resolve_oci_credentials("oci://quay.io/org/image:tag", None, None)
103-
assert username is None
104-
assert password is None
106+
# When auth file has no match, result is unauthenticated — no error
107+
with patch("jumpstarter.common.oci.read_auth_file_credentials", return_value=OciCredentials()):
108+
creds = client._resolve_oci_credentials("oci://quay.io/org/image:tag", None, None)
109+
assert creds.username is None
110+
assert creds.password is None
105111

106112
# When auth file has a match, those credentials are used
107-
with patch("jumpstarter.common.oci.read_auth_file_credentials", return_value=("fileuser", "filepass")):
108-
username, password = client._resolve_oci_credentials("oci://quay.io/org/image:tag", None, None)
109-
assert username == "fileuser"
110-
assert password == "filepass"
113+
with patch(
114+
"jumpstarter.common.oci.read_auth_file_credentials",
115+
return_value=OciCredentials(username="fileuser", password=SecretStr("filepass")),
116+
):
117+
creds = client._resolve_oci_credentials("oci://quay.io/org/image:tag", None, None)
118+
assert creds.username == "fileuser"
119+
assert creds.password is not None
120+
assert creds.password.get_secret_value() == "filepass"
121+
122+
123+
def test_resolve_oci_credentials_normalizes_empty_strings():
124+
"""Empty-string username/password should be treated as absent and fall back to resolve."""
125+
from unittest.mock import patch
126+
127+
from jumpstarter.common.oci import OciCredentials
128+
129+
client = MockFlasherClient()
130+
131+
with patch("jumpstarter.common.oci.resolve_oci_credentials", return_value=OciCredentials()) as mock_resolve:
132+
creds = client._resolve_oci_credentials("oci://quay.io/org/image:tag", "", "")
133+
assert creds.username is None
134+
assert creds.password is None
135+
mock_resolve.assert_called_once()
111136

112137

113138
def test_fls_oci_auth_env_sources_credentials_file():

python/packages/jumpstarter-driver-qemu/jumpstarter_driver_qemu/driver.py

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -105,19 +105,17 @@ async def flash_oci(
105105
if not oci_url.startswith("oci://"):
106106
raise ValueError(f"OCI URL must start with oci://, got: {oci_url}")
107107

108-
# If explicit credentials were provided, validate immediately
108+
from pydantic import SecretStr
109+
110+
from jumpstarter.common.oci import OciCredentials, resolve_oci_credentials
111+
109112
if oci_username or oci_password:
110-
if bool(oci_username) != bool(oci_password):
111-
raise ValueError("OCI authentication requires both username and password")
113+
creds = OciCredentials(username=oci_username, password=SecretStr(oci_password) if oci_password is not None else None)
112114
else:
113-
# Fall back to env vars, then container auth files
114-
from jumpstarter.common.oci import resolve_oci_credentials
115-
116-
oci_username, oci_password = resolve_oci_credentials(oci_url)
117-
if oci_username and oci_password:
118-
self.logger.info("Using OCI registry credentials from environment or auth file")
119-
elif oci_username or oci_password:
120-
raise ValueError("OCI authentication requires both username and password")
115+
creds = resolve_oci_credentials(oci_url)
116+
117+
oci_username = creds.username
118+
oci_password = creds.password.get_secret_value() if creds.password else None
121119

122120
target_path = str(self.parent.validate_partition(partition))
123121

python/packages/jumpstarter-driver-qemu/jumpstarter_driver_qemu/driver_test.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
from jumpstarter_driver_qemu.driver import Qemu, QemuFlasher
1616

17+
from jumpstarter.common.oci import OciCredentials
1718
from jumpstarter.common.utils import serve
1819

1920

@@ -316,7 +317,7 @@ async def test_flash_oci_no_credentials():
316317
# Ensure OCI env vars are not set so driver doesn't pick them up
317318
env_clean = {k: v for k, v in os.environ.items() if k not in ("OCI_USERNAME", "OCI_PASSWORD")}
318319
with patch.dict(os.environ, env_clean, clear=True):
319-
with patch("jumpstarter.common.oci.read_auth_file_credentials", return_value=(None, None)):
320+
with patch("jumpstarter.common.oci.read_auth_file_credentials", return_value=OciCredentials()):
320321
with patch("jumpstarter_driver_qemu.driver.get_fls_binary", return_value="fls"):
321322
with patch(
322323
"asyncio.create_subprocess_exec", new_callable=AsyncMock, return_value=mock_process

python/packages/jumpstarter-driver-ridesx/jumpstarter_driver_ridesx/client.py

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -279,20 +279,15 @@ def _read_oci_credentials(self, oci_url: str):
279279
"""Read OCI registry credentials from environment variables or auth files."""
280280
from jumpstarter.common.oci import resolve_oci_credentials
281281

282-
username, password = resolve_oci_credentials(oci_url)
283-
284-
if bool(username) != bool(password):
285-
raise click.ClickException("OCI authentication requires both username and password")
286-
287-
return username, password
282+
return resolve_oci_credentials(oci_url)
288283

289284
def _flash_oci_auto_impl(
290285
self,
291286
oci_url: str,
292287
partitions: Dict[str, str] | None = None,
293288
):
294289
"""Core implementation of OCI flash without wrapper logic."""
295-
oci_username, oci_password = self._read_oci_credentials(oci_url)
290+
creds = self._read_oci_credentials(oci_url)
296291

297292
self.logger.info("Checking for fastboot devices on Exporter...")
298293
detection_result = self.call("detect_fastboot_device", 5, 2.0)
@@ -307,8 +302,8 @@ def _flash_oci_auto_impl(
307302
"flash_oci_image",
308303
oci_url,
309304
partitions,
310-
oci_username,
311-
oci_password,
305+
creds.username,
306+
creds.password.get_secret_value() if creds.password else None,
312307
)
313308

314309
# Display FLS output to user

python/packages/jumpstarter-driver-ridesx/jumpstarter_driver_ridesx/client_test.py

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from jumpstarter_driver_pyserial.driver import PySerial
77

88
from .driver import RideSXDriver
9+
from jumpstarter.common.oci import OciCredentials
910
from jumpstarter.common.utils import serve
1011

1112

@@ -57,20 +58,21 @@ def test_validate_partition_mappings(ridesx_client):
5758

5859
def test_flash_oci_auto_success(ridesx_client):
5960
"""Test successful flash_oci_auto call"""
60-
with patch.object(ridesx_client, "call") as mock_call:
61-
mock_call.side_effect = [
62-
None, # boot_to_fastboot call
63-
{"status": "device_found", "device_id": "ABC123"},
64-
{"status": "success"},
65-
]
61+
with patch("jumpstarter.common.oci.resolve_oci_credentials", return_value=OciCredentials()):
62+
with patch.object(ridesx_client, "call") as mock_call:
63+
mock_call.side_effect = [
64+
None, # boot_to_fastboot call
65+
{"status": "device_found", "device_id": "ABC123"},
66+
{"status": "success"},
67+
]
6668

67-
result = ridesx_client.flash_oci_auto("oci://quay.io/org/image:tag")
69+
result = ridesx_client.flash_oci_auto("oci://quay.io/org/image:tag")
6870

69-
assert result == {"status": "success"}
70-
# Verify flash_oci_image was called with the OCI URL
71-
flash_call = mock_call.call_args_list[2]
72-
assert flash_call[0][0] == "flash_oci_image"
73-
assert flash_call[0][1] == "oci://quay.io/org/image:tag"
71+
assert result == {"status": "success"}
72+
# Verify flash_oci_image was called with the OCI URL
73+
flash_call = mock_call.call_args_list[2]
74+
assert flash_call[0][0] == "flash_oci_image"
75+
assert flash_call[0][1] == "oci://quay.io/org/image:tag"
7476

7577

7678
def test_flash_oci_auto_error_cases(ridesx_client):
@@ -84,11 +86,12 @@ def test_flash_oci_auto_error_cases(ridesx_client):
8486
ridesx_client.flash_oci_auto("quay.io/org/image:tag")
8587

8688
# No device found
87-
with patch.object(ridesx_client, "call") as mock_call:
88-
mock_call.return_value = {"status": "no_device_found", "device_id": None}
89+
with patch("jumpstarter.common.oci.resolve_oci_credentials", return_value=OciCredentials()):
90+
with patch.object(ridesx_client, "call") as mock_call:
91+
mock_call.return_value = {"status": "no_device_found", "device_id": None}
8992

90-
with pytest.raises(click.ClickException, match="No fastboot devices found"):
91-
ridesx_client.flash_oci_auto("oci://image:tag")
93+
with pytest.raises(click.ClickException, match="No fastboot devices found"):
94+
ridesx_client.flash_oci_auto("oci://image:tag")
9295

9396

9497
# _execute_flash_command Tests

0 commit comments

Comments
 (0)