Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions tests/repository_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@
from __future__ import annotations

import datetime
import hashlib
import logging
import os
import tempfile
from dataclasses import dataclass, field
from typing import TYPE_CHECKING
from urllib import parse

import securesystemslib.hash as sslib_hash
from securesystemslib.signer import CryptoSigner, Signer

from tuf.api.exceptions import DownloadHTTPError
Expand Down Expand Up @@ -80,6 +80,8 @@

SPEC_VER = ".".join(SPECIFICATION_VERSION)

_HASH_ALGORITHM = "sha256"


@dataclass
class FetchTracker:
Expand Down Expand Up @@ -292,9 +294,9 @@ def _compute_hashes_and_length(
self, role: str
) -> tuple[dict[str, str], int]:
data = self.fetch_metadata(role)
digest_object = sslib_hash.digest(sslib_hash.DEFAULT_HASH_ALGORITHM)
digest_object = hashlib.new(_HASH_ALGORITHM)
digest_object.update(data)
hashes = {sslib_hash.DEFAULT_HASH_ALGORITHM: digest_object.hexdigest()}
hashes = {_HASH_ALGORITHM: digest_object.hexdigest()}
return hashes, len(data)

def update_timestamp(self) -> None:
Expand Down
21 changes: 17 additions & 4 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from typing import ClassVar

from securesystemslib import exceptions as sslib_exceptions
from securesystemslib import hash as sslib_hash
from securesystemslib.signer import (
CryptoSigner,
Key,
Expand Down Expand Up @@ -896,6 +895,12 @@ def test_length_and_hash_validation(self) -> None:
# test with data as bytes
snapshot_metafile.verify_length_and_hashes(data)

# test with custom blake algorithm
snapshot_metafile.hashes = {
"blake2b-256": "963a3c31aad8e2a91cfc603fdba12555e48dd0312674ac48cce2c19c243236a1"
}
snapshot_metafile.verify_length_and_hashes(data)

# test exceptions
expected_length = snapshot_metafile.length
snapshot_metafile.length = 2345
Expand Down Expand Up @@ -958,9 +963,7 @@ def test_targetfile_from_file(self) -> None:
# Test with a non-existing file
file_path = os.path.join(self.repo_dir, Targets.type, "file123.txt")
with self.assertRaises(FileNotFoundError):
TargetFile.from_file(
file_path, file_path, [sslib_hash.DEFAULT_HASH_ALGORITHM]
)
TargetFile.from_file(file_path, file_path, ["sha256"])

# Test with an unsupported algorithm
file_path = os.path.join(self.repo_dir, Targets.type, "file1.txt")
Expand Down Expand Up @@ -990,6 +993,12 @@ def test_targetfile_from_data(self) -> None:
targetfile_from_data = TargetFile.from_data(target_file_path, data)
targetfile_from_data.verify_length_and_hashes(data)

# Test with custom blake hash algorithm
targetfile_from_data = TargetFile.from_data(
target_file_path, data, ["blake2b-256"]
)
targetfile_from_data.verify_length_and_hashes(data)

def test_metafile_from_data(self) -> None:
data = b"Inline test content"

Expand All @@ -1013,6 +1022,10 @@ def test_metafile_from_data(self) -> None:
),
)

# Test with custom blake hash algorithm
metafile = MetaFile.from_data(1, data, ["blake2b-256"])
metafile.verify_length_and_hashes(data)

def test_targetfile_get_prefixed_paths(self) -> None:
target = TargetFile(100, {"sha256": "abc", "md5": "def"}, "a/b/f.ext")
self.assertEqual(
Expand Down
72 changes: 47 additions & 25 deletions tuf/api/_payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@

import abc
import fnmatch
import hashlib
import io
import logging
import sys
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import (
Expand All @@ -21,7 +23,6 @@
)

from securesystemslib import exceptions as sslib_exceptions
from securesystemslib import hash as sslib_hash
from securesystemslib.signer import Key, Signature

from tuf.api.exceptions import LengthOrHashMismatchError, UnsignedMetadataError
Expand All @@ -34,6 +35,9 @@
_TARGETS = "targets"
_TIMESTAMP = "timestamp"

_DEFAULT_HASH_ALGORITHM = "sha256"
_BLAKE_HASH_ALGORITHM = "blake2b-256"

# We aim to support SPECIFICATION_VERSION and require the input metadata
# files to have the same major version (the first number) as ours.
SPECIFICATION_VERSION = ["1", "0", "31"]
Expand All @@ -45,6 +49,38 @@
T = TypeVar("T", "Root", "Timestamp", "Snapshot", "Targets")


def _get_digest(algo: str) -> Any: # noqa: ANN401
"""New digest helper to support custom "blake2b-256" algo name."""
if algo == _BLAKE_HASH_ALGORITHM:
return hashlib.blake2b(digest_size=32)

return hashlib.new(algo)


def _hash_bytes(data: bytes, algo: str) -> str:
"""Returns hexdigest for data using algo."""
digest = _get_digest(algo)
digest.update(data)

return digest.hexdigest()


def _hash_file(f: IO[bytes], algo: str) -> str:
"""Returns hexdigest for file using algo."""
f.seek(0)
if sys.version_info >= (3, 11):
digest = hashlib.file_digest(f, lambda: _get_digest(algo)) # type: ignore[arg-type]

else:
# Fallback for older Pythons. Chunk size is taken from the previously
# used and now deprecated `securesystemslib.hash.digest_fileobject`.
digest = _get_digest(algo)
for chunk in iter(lambda: f.read(4096), b""):
digest.update(chunk)

return digest.hexdigest()


class Signed(metaclass=abc.ABCMeta):
"""A base class for the signed part of TUF metadata.

Expand Down Expand Up @@ -664,24 +700,18 @@ def _verify_hashes(
data: bytes | IO[bytes], expected_hashes: dict[str, str]
) -> None:
"""Verify that the hash of ``data`` matches ``expected_hashes``."""
is_bytes = isinstance(data, bytes)
Comment thread
jku marked this conversation as resolved.
for algo, exp_hash in expected_hashes.items():
try:
if is_bytes:
digest_object = sslib_hash.digest(algo)
digest_object.update(data)
if isinstance(data, bytes):
observed_hash = _hash_bytes(data, algo)
else:
# if data is not bytes, assume it is a file object
digest_object = sslib_hash.digest_fileobject(data, algo)
except (
sslib_exceptions.UnsupportedAlgorithmError,
sslib_exceptions.FormatError,
) as e:
observed_hash = _hash_file(data, algo)
except (ValueError, TypeError) as e:
raise LengthOrHashMismatchError(
f"Unsupported algorithm '{algo}'"
) from e

observed_hash = digest_object.hexdigest()
if observed_hash != exp_hash:
raise LengthOrHashMismatchError(
f"Observed hash {observed_hash} does not match "
Expand Down Expand Up @@ -731,25 +761,17 @@ def _get_length_and_hashes(
hashes = {}

if hash_algorithms is None:
hash_algorithms = [sslib_hash.DEFAULT_HASH_ALGORITHM]
hash_algorithms = [_DEFAULT_HASH_ALGORITHM]

for algorithm in hash_algorithms:
try:
if isinstance(data, bytes):
digest_object = sslib_hash.digest(algorithm)
digest_object.update(data)
hashes[algorithm] = _hash_bytes(data, algorithm)
else:
digest_object = sslib_hash.digest_fileobject(
data, algorithm
)
except (
sslib_exceptions.UnsupportedAlgorithmError,
sslib_exceptions.FormatError,
) as e:
hashes[algorithm] = _hash_file(data, algorithm)
except (ValueError, TypeError) as e:
raise ValueError(f"Unsupported algorithm '{algorithm}'") from e

hashes[algorithm] = digest_object.hexdigest()

return (length, hashes)


Expand Down Expand Up @@ -1150,7 +1172,7 @@ def is_delegated_path(self, target_filepath: str) -> bool:
if self.path_hash_prefixes is not None:
# Calculate the hash of the filepath
# to determine in which bin to find the target.
digest_object = sslib_hash.digest(algorithm="sha256")
digest_object = hashlib.new(name="sha256")
digest_object.update(target_filepath.encode("utf-8"))
target_filepath_hash = digest_object.hexdigest()

Expand Down Expand Up @@ -1269,7 +1291,7 @@ def get_role_for_target(self, target_filepath: str) -> str:
target_filepath: URL path to a target file, relative to a base
targets URL.
"""
hasher = sslib_hash.digest(algorithm="sha256")
hasher = hashlib.new(name="sha256")
hasher.update(target_filepath.encode("utf-8"))

# We can't ever need more than 4 bytes (32 bits).
Expand Down