Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions linter_exclusions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3544,6 +3544,12 @@ neon postgres project:
rule_exclusions:
- require_wait_command_if_no_wait

confcom containers from_vn2:
parameters:
template:
rule_exclusions:
- no_positional_parameters

confcom fragment push:
parameters:
signed_fragment:
Expand Down
4 changes: 4 additions & 0 deletions src/confcom/HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
Release History
===============

1.7.0
++++++
* Add containers from_vn2 command to generate container definitions from a VN2 template.

1.6.0
++++++
* Added confcom containers from_image command to generate container definitions from an image reference
Expand Down
20 changes: 19 additions & 1 deletion src/confcom/azext_confcom/_help.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,6 @@
short-summary: Commands which generate Security Policy Container Definitions.
"""


helps[
"confcom containers from_image"
] = """
Expand All @@ -346,3 +345,22 @@
- name: Input an image reference and generate container definitions
text: az confcom containers from_image my.azurecr.io/myimage:tag
"""

helps[
"confcom containers from_vn2"
] = """
type: command
short-summary: Create Security Policy Container Definitions based on a VN2 template.

parameters:
- name: --name -n
type: string
short-summary: 'The name of the container to generate the policy for. If omitted, all containers are returned.'


examples:
- name: Input a VN2 Template and generate container definitions
text: az confcom containers from_vn2 vn2.yaml --name mycontainer
- name: Input a VN2 Template and generate container definitions for all containers
text: az confcom containers from_vn2 vn2.yaml
"""
14 changes: 14 additions & 0 deletions src/confcom/azext_confcom/_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,3 +484,17 @@ def load_arguments(self, _):
type=str,
help="Platform to create container definition for",
)

with self.argument_context("confcom containers from_vn2") as c:
c.positional(
"template",
type=str,
help="Template to create container definitions from",
)
c.argument(
"container_name",
options_list=['--name', "-n"],
required=False,
type=str,
help='The name of the container in the template to use. If omitted, all containers are returned.'
)
265 changes: 265 additions & 0 deletions src/confcom/azext_confcom/command/containers_from_vn2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import base64
from dataclasses import asdict
from hashlib import sha256
import json
from pathlib import Path
import re
from typing import Optional
import yaml

from azext_confcom import config
from azext_confcom.lib.platform import (
PRIVILEDGED_CAPABILITIES,
Comment thread
DomAyre marked this conversation as resolved.
VN2_PRIVILEGED_MOUNTS,
VN2_WORKLOAD_IDENTITY_ENV_RULES,
VN2_WORKLOAD_IDENTITY_MOUNTS,
)
from azext_confcom.lib.policy import ContainerUser
from azext_confcom.lib.containers import (
from_image as container_from_image,
merge_containers,
)


def find_vn2_containers(vn2_template):
for key, value in vn2_template.items():
if key in ("containers", "initContainers"):
yield from value
elif isinstance(value, dict):
result = find_vn2_containers(value)
if result is not None:
yield from result
elif isinstance(value, list):
for item in value:
if isinstance(item, dict):
result = find_vn2_containers(item)
if result is not None:
yield from result


def vn2_container_env_rules(template: dict, container: dict, template_variables: dict):

for env_var in container.get("env", []):

if "value" in env_var:
is_special = re.match('^===VIRTUALNODE2.CC.THIM.(.+)===$', env_var.get('value'))
yield {
"pattern": f"{env_var.get('name')}={'.*' if is_special else env_var.get('value')}",
"strategy": "re2" if is_special else "string",
"required": False,
}

elif "valueFrom" in env_var:

if "configMapKeyRef" in env_var.get('valueFrom') or "secretKeyRef" in env_var.get('valueFrom'):
var_ref = (
env_var.get('valueFrom').get("configMapKeyRef", None) or
env_var.get('valueFrom').get("secretKeyRef", None)
)
yield {
"pattern": f"{env_var.get('name')}={template_variables[var_ref.get('name')][var_ref.get('key')]}",
Comment thread
DomAyre marked this conversation as resolved.
"strategy": "string",
"required": False,
}

elif "fieldRef" in env_var.get('valueFrom'):
# Existing behaviour is to wildcard this, there is a correct implementation below
yield {
"pattern": f"{env_var.get('name')}=.*",
"strategy": "re2",
"required": False,
}
# value = template
# for part in env_var.get('valueFrom').get("fieldRef", {}).get("fieldPath", "").split("."):
# value = value.get(part, {})
# yield {
# "pattern": f"{env_var.get('name')}={value}",
# "strategy": "string",
# "required": False,
# })

elif "resourceFieldRef" in env_var.get('valueFrom'):
ref = env_var.get('valueFrom').get("resourceFieldRef", {})
ref_container_name = ref.get("containerName") or container.get("name")
ref_container = next(
(
c for c in template["spec"]["containers"]
if c.get("name") == ref_container_name
),
None,
)
if ref_container is None:
continue
value = ref_container.get("resources", {})
for part in ref["resource"].split("."):
value = value.get(part, {})
yield {
"pattern": f"{env_var.get('name')}={value}",
"strategy": "string",
"required": False,
}


def vn2_container_mounts(template: dict, container: dict) -> list[dict]:

volume_claim_access = {
v["metadata"]["name"]: v.get("spec", {}).get("accessModes", [])
for v in template.get("spec", {}).get("volumeClaimTemplates", [])
}
volume_defs = {
v["name"]: [k for k in v.keys() if k != "name"][0]
for v in template.get("spec", {}).get("volumes", [])
}

return [
{
"destination": m.get("mountPath"),
"options": [
"rbind",
"rshared",
"ro" if (
m.get("readOnly") or
"ReadOnlyMany" in volume_claim_access.get(m.get("name"), []) or
volume_defs.get(m.get("name")) in {"configMap", "secret", "downwardAPI", "projected"}
) else "rw"
],
"source": "sandbox:///tmp/atlas/emptydir/.+",
"type": "bind",
}
for m in container.get("volumeMounts", [])
]


def containers_from_vn2(
template: str,
container_name: Optional[str] = None
) -> str:

with Path(template).open("r") as f:
template_yaml = list(yaml.safe_load_all(f))

# Find containers matching the specified name (if provided)
template_containers = []
variables = {}
for doc in template_yaml:
if not isinstance(doc, dict):
continue
kind = doc.get("kind")
if kind == "ConfigMap":
variables[doc["metadata"]["name"]] = {
**doc.get("data", {}),
**{k: base64.b64decode(v).decode("utf-8") for k, v in doc.get("binaryData", {}).items()},
}
elif kind == "Secret":
variables[doc["metadata"]["name"]] = {
**{k: base64.b64decode(v).decode("utf-8") for k, v in doc.get("data", {}).items()},
**doc.get("stringData", {}),
}
elif kind in ["Pod", "Deployment", "StatefulSet", "DaemonSet", "Job", "CronJob", "ReplicaSet"]:
for container in find_vn2_containers(doc):
if container_name and container.get("name") != container_name:
continue
template_containers.append((container, doc))

if container_name:
if not template_containers:
raise AssertionError(f"No containers with name {container_name} found.")
if len(template_containers) > 1:
raise AssertionError(
f"Multiple containers with name {container_name} found."
)
elif not template_containers:
raise AssertionError("No containers found.")

container_defs = []
for template_container, template_doc in template_containers:
image_container_def = container_from_image(template_container.get("image"), platform="vn2")

template_container_def = {
"name": template_container.get("name"),
"command": template_container.get("command", []) + template_container.get("args", []),
"env_rules": (
[
{
"pattern": rule.get("pattern") or f"{rule.get('name')}={rule.get('value')}",
"strategy": rule.get("strategy", "string"),
"required": rule.get("required", False),
}
for rule in (
config.OPENGCS_ENV_RULES
+ config.FABRIC_ENV_RULES
+ config.MANAGED_IDENTITY_ENV_RULES
+ config.ENABLE_RESTART_ENV_RULE
+ config.VIRTUAL_NODE_ENV_RULES
)
]
+ list(vn2_container_env_rules(template_doc, template_container, variables))
),
"mounts": vn2_container_mounts(template_doc, template_container),
}

# Parse security context
security_context = (
template_doc.get("spec", {}).get("securityContext", {})
| template_container.get("securityContext", {})
)
if security_context.get("privileged", False):
template_container_def["allow_elevated"] = True
template_container_def["mounts"] += VN2_PRIVILEGED_MOUNTS
template_container_def["capabilities"] = PRIVILEDGED_CAPABILITIES

if security_context.get("runAsUser") or security_context.get("runAsGroup"):
template_container_def["user"] = asdict(ContainerUser())
if security_context.get("runAsUser"):
template_container_def["user"]["user_idname"] = {
"pattern": str(security_context.get("runAsUser")),
"strategy": "id",
}
if security_context.get("runAsGroup"):
template_container_def["user"]["group_idnames"] = [{
"pattern": str(security_context.get("runAsGroup")),
"strategy": "id",
}]

if security_context.get("seccompProfile"):
template_container_def["seccomp_profile_sha256"] = sha256(
base64.b64decode(security_context.get("seccompProfile"))
).hexdigest()

if security_context.get("allowPrivilegeEscalation") is False:
template_container_def["no_new_privileges"] = True

# Check for workload identity
labels = template_doc.get("metadata", {}).get("labels", {}) or {}
if labels.get("azure.workload.identity/use", "false") == "true":
template_container_def["env_rules"].extend(VN2_WORKLOAD_IDENTITY_ENV_RULES)
template_container_def["mounts"].extend(VN2_WORKLOAD_IDENTITY_MOUNTS)

exec_processes = [
{
"command": process.get("exec", {}).get("command", []),
"signals": []
}
for process in [
template_container.get("livenessProbe"),
template_container.get("readinessProbe"),
template_container.get("startupProbe"),
template_container.get("lifecycle", {}).get("postStart"),
template_container.get("lifecycle", {}).get("preStop"),
]
if process is not None
]
if exec_processes:
template_container_def["exec_processes"] = exec_processes

container_defs.append(merge_containers(
image_container_def,
template_container_def,
))

return json.dumps(container_defs)
1 change: 1 addition & 0 deletions src/confcom/azext_confcom/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ def load_command_table(self, _):
pass

with self.command_group("confcom containers") as g:
g.custom_command("from_vn2", "containers_from_vn2")
g.custom_command("from_image", "containers_from_image")
11 changes: 11 additions & 0 deletions src/confcom/azext_confcom/custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from azext_confcom.command.fragment_attach import fragment_attach as _fragment_attach
from azext_confcom.command.fragment_push import fragment_push as _fragment_push
from azext_confcom.command.containers_from_image import containers_from_image as _containers_from_image
from azext_confcom.command.containers_from_vn2 import containers_from_vn2 as _containers_from_vn2
from knack.log import get_logger
from pkg_resources import parse_version

Expand Down Expand Up @@ -563,3 +564,13 @@ def containers_from_image(
image=image,
platform=platform,
)


def containers_from_vn2(
template: str,
container_name: Optional[str] = None,
) -> None:
print(_containers_from_vn2(
template=template,
container_name=container_name,
))
Loading
Loading