Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions linter_exclusions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3516,3 +3516,9 @@ confcom fragment attach:
signed_fragment:
rule_exclusions:
- no_positional_parameters

confcom fragment references from_image:
parameters:
image:
rule_exclusions:
- no_positional_parameters
25 changes: 25 additions & 0 deletions src/confcom/azext_confcom/_help.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,3 +321,28 @@
- name: Attach the output of acifragmentgen to a registry
text: az confcom acifragmentgen --chain my.cert.pem --key my_key.pem --svn "1" --namespace contoso --feed "test-feed" --input ./fragment_spec.json | az confcom fragment attach --manifest-tag myregistry.azurecr.io/image:latest
"""

helps[
"confcom fragment references"
] = """
type: group
short-summary: Commands which generate Security Policy Fragment References.
"""


helps[
"confcom fragment references from_image"
] = """
type: command
short-summary: Create a Security Policy Fragment Reference based on an image reference.

parameters:
- name: --minimum-svn
type: str
short-summary: 'The value of the minimum SVN field in the generated fragment reference, defaults to current fragment reference'


examples:
- name: Input an image reference and generate fragment reference
text: az confcom fragment references from_image my.azurecr.io/myimage:tag
"""
21 changes: 21 additions & 0 deletions src/confcom/azext_confcom/_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,14 @@ def load_arguments(self, _):
required=False,
help='Container definitions to include in the policy'
)
c.argument(
"fragment_definitions",
options_list=['--with-fragments'],
action='append',
type=json.loads,
required=False,
help='Fragment definitions to include in the policy'
)

with self.argument_context("confcom acifragmentgen") as c:
c.argument(
Expand Down Expand Up @@ -469,3 +477,16 @@ def load_arguments(self, _):
help="Path to containerd socket if not using the default",
validator=validate_katapolicygen_input,
)

with self.argument_context("confcom fragment references from_image") as c:
c.positional(
"image",
type=str,
help="Image to create container definition from",
)
c.argument(
"minimum_svn",
required=False,
type=str,
help="Minimum Allowed Software Version Number for Fragment",
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import json

from typing import Optional

from azext_confcom.lib.fragment_references import from_image as lib_fragment_references_from_image


def fragment_references_from_image(image: str, minimum_svn: Optional[str]) -> str:
return print(json.dumps(list(lib_fragment_references_from_image(image, minimum_svn))))
3 changes: 3 additions & 0 deletions src/confcom/azext_confcom/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,6 @@ def load_command_table(self, _):

with self.command_group("confcom"):
pass

with self.command_group("confcom fragment references") as g:
g.custom_command("from_image", "fragment_references_from_image")
18 changes: 18 additions & 0 deletions src/confcom/azext_confcom/custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
print_existing_policy_from_yaml, print_func, str_to_sha256)
from azext_confcom.command.fragment_attach import fragment_attach as _fragment_attach
from azext_confcom.command.fragment_push import fragment_push as _fragment_push
from azext_confcom.command.fragment_references_from_image import (
fragment_references_from_image as _fragment_references_from_image
)
from knack.log import get_logger
from pkg_resources import parse_version

Expand Down Expand Up @@ -57,6 +60,7 @@ def acipolicygen_confcom(
include_fragments: bool = False,
fragments_json: str = None,
exclude_default_fragments: bool = False,
fragment_definitions: Optional[list] = None,
):
if print_existing_policy or outraw or outraw_pretty_print:
logger.warning(
Expand Down Expand Up @@ -165,6 +169,10 @@ def acipolicygen_confcom(
container_definitions=container_definitions,
)

if fragment_definitions:
for fragment_definition in fragment_definitions:
container_group_policies._fragments.extend(fragment_definition) # pylint: disable=protected-access

exit_code = 0

# standardize the output so we're only operating on arrays
Expand Down Expand Up @@ -552,3 +560,13 @@ def fragment_push(
signed_fragment=signed_fragment,
manifest_tag=manifest_tag
)


def fragment_references_from_image(
image: str,
minimum_svn: Optional[str],
) -> None:
_fragment_references_from_image(
image=image,
minimum_svn=minimum_svn,
)
66 changes: 66 additions & 0 deletions src/confcom/azext_confcom/lib/cose.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import hashlib
import platform
import re
import requests
import subprocess

from typing import Iterable
from pathlib import Path

from azext_confcom.lib.paths import get_binaries_dir


_binaries_dir = get_binaries_dir()
_cosesign1_binaries = {
"Linux": {
"path": _binaries_dir / "sign1util",
"url": "https://github.com/microsoft/cosesign1go/releases/download/v1.4.0/sign1util",
"sha256": "526b54aeb6293fc160e8fa1f81be6857300aba9641d45955f402f8b082a4d4a5",
},
"Windows": {
"path": _binaries_dir / "sign1util.exe",
"url": "https://github.com/microsoft/cosesign1go/releases/download/v1.4.0/sign1util.exe",
"sha256": "f33cccf2b1bb8c3a495c730984b47d0f0715678981dbfe712248a2452dd53303",
},
}


def cose_get():
for binary_info in _cosesign1_binaries.values():
cosesign1_fetch_resp = requests.get(binary_info["url"], verify=True)
cosesign1_fetch_resp.raise_for_status()

assert hashlib.sha256(cosesign1_fetch_resp.content).hexdigest() == binary_info["sha256"]

with open(binary_info["path"], "wb") as f:
f.write(cosesign1_fetch_resp.content)


def cose_run(args: Iterable[str]) -> subprocess.CompletedProcess:
return subprocess.run(
[_cosesign1_binaries[platform.system()]["path"], *args],
check=True,
stdout=subprocess.PIPE,
text=True,
)


def cose_print(file_path: Path):
return cose_run([
"print",
"--in", file_path.as_posix(),
]).stdout.strip()


def cose_get_properties(file_path: Path):
cose_print_output = cose_print(file_path)
return {
"iss": re.search(r"^iss:\s*(.*)$", cose_print_output, re.MULTILINE).group(1),
"feed": re.search(r"^feed:[ \t]*([^\r\n]*)", cose_print_output, re.MULTILINE).group(1),
"payload": re.search(r"^payload:\s*(.*)", cose_print_output, re.MULTILINE | re.DOTALL).group(1),
}
37 changes: 37 additions & 0 deletions src/confcom/azext_confcom/lib/fragment_references.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import tempfile

from typing import Optional

from azext_confcom.lib.cose import cose_get_properties
from azext_confcom.lib.fragments import get_fragments_from_image
from azext_confcom.lib.serialization import rego_eval


def from_image(image: str, minimum_svn: Optional[str]):

for signed_fragment in get_fragments_from_image(image):

cose_properties = cose_get_properties(signed_fragment)

with tempfile.NamedTemporaryFile("w+b") as payload:
payload.write(cose_properties["payload"].encode("utf-8"))
payload.flush()

fragment_properties = rego_eval(payload.name)

yield {
"feed": cose_properties["feed"],
"includes": sorted(list(set(fragment_properties.keys()).intersection({
"containers",
"fragmnents",
"namespace",
"external_processes",
}))),
"issuer": cose_properties["iss"],
"minimum_svn": minimum_svn or fragment_properties["svn"],
}
21 changes: 21 additions & 0 deletions src/confcom/azext_confcom/lib/fragments.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import tempfile

from pathlib import Path

from azext_confcom.lib.images import sanitize_image_reference
from azext_confcom.lib.oras import get_artifact_references, pull


def get_fragments_from_image(image_reference: str):

for reference in get_artifact_references(image_reference):

fragment_path = Path(tempfile.gettempdir()) / sanitize_image_reference(reference)
pull(reference, fragment_path)

yield from fragment_path.glob("*.rego.cose")
70 changes: 70 additions & 0 deletions src/confcom/azext_confcom/lib/images.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import functools
import os
import re
import subprocess
import docker


@functools.lru_cache()
def get_image(image_ref: str) -> docker.models.images.Image:

client = docker.from_env()

try:
image = client.images.get(image_ref)
except docker.errors.ImageNotFound:
client.images.pull(image_ref)

image = client.images.get(image_ref)
return image


def get_image_layers(image: str) -> list[str]:

binary_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", "bin", "dmverity-vhd")

get_image(image)
result = subprocess.run(
[binary_path, "-d", "roothash", "-i", image],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
text=True,
)

return [line.split("hash: ")[-1] for line in result.stdout.splitlines()]


def get_image_config(image: str) -> dict:

image_config = get_image(image).attrs.get("Config")

config = {}

if image_config.get("Cmd") or image_config.get("Entrypoint"):
config["command"] = (
image_config.get("Entrypoint") or [] +
image_config.get("Cmd") or []
)

if image_config.get("Env"):
config["env_rules"] = [{
"pattern": p,
"strategy": "string",
"required": False,
} for p in image_config.get("Env")]

if image_config.get("WorkingDir"):
config["working_dir"] = image_config.get("WorkingDir")

return config


def sanitize_image_reference(image_reference: str) -> str:
illegal = r'<>:"/\\|?*@\0'
return re.sub(f"[{re.escape(illegal)}]", "-", image_reference)
61 changes: 61 additions & 0 deletions src/confcom/azext_confcom/lib/oras.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import shutil
import json
import subprocess

from pathlib import Path
from typing import Iterable

from azext_confcom.errors import eprint


def oras_run(args: Iterable[str]) -> subprocess.CompletedProcess:

# Maintain existing behaviour of requiring the user to install ORAS themselves
if not shutil.which("oras"):
eprint("ORAS CLI not installed. Please install ORAS CLI: https://oras.land/docs/installation")

return subprocess.run(
["oras", *args],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
text=True,
)


def discover(reference: str):
return json.loads(oras_run([
"discover",
"--format",
"json",
reference,
]).stdout.strip())


def pull(reference: str, destination: Path):
return oras_run([
"pull",
"--output",
destination.as_posix(),
reference,
])


def get_artifact_references(reference: str):

def get_references(discover_result):
if "artifactType" in discover_result:
yield discover_result["reference"]
for field in discover_result.values():
if isinstance(field, list):
for item in field:
yield from get_references(item)
if isinstance(field, dict):
yield from get_references(field)

return list(get_references(discover(reference)))
Loading