Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 86 additions & 4 deletions src/portfolio_truth_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from pathlib import Path
from typing import Any

SCHEMA_VERSION = "0.6.0"
SCHEMA_VERSION = "0.7.0"

# The published "latest" portfolio-truth artifact. The producer
# (portfolio_truth_publish) writes it; every reader resolves it through
Expand All @@ -32,7 +32,13 @@ def truth_latest_path(output_dir: Path) -> Path:
"evidence-history",
"manual-only",
}
VALID_LIFECYCLE_STATES = {"active", "maintenance", "dormant", "experimental", "archived"}
VALID_LIFECYCLE_STATES = {
"active",
"maintenance",
"dormant",
"experimental",
"archived",
}
VALID_CATEGORY_TAGS = {
"commercial",
"it-work",
Expand Down Expand Up @@ -129,7 +135,9 @@ class DerivedFields:

def to_dict(self) -> dict[str, Any]:
data = dataclasses.asdict(self)
data["last_meaningful_activity_at"] = _serialize_datetime(self.last_meaningful_activity_at)
data["last_meaningful_activity_at"] = _serialize_datetime(
self.last_meaningful_activity_at
)
return data


Expand Down Expand Up @@ -183,7 +191,9 @@ def open_high_critical(self) -> int:
return self.dependabot_high + self.dependabot_critical

def to_dict(self) -> dict[str, Any]:
return dataclasses.asdict(self)
data = dataclasses.asdict(self)
data["open_high_critical"] = self.open_high_critical
return data


@dataclass(frozen=True)
Expand All @@ -210,6 +220,71 @@ def to_dict(self) -> dict[str, Any]:
}


@dataclass(frozen=True)
class PortfolioTruthRollups:
"""Portfolio-level aggregates derived from the project list, emitted so
downstream consumers (command-center, dashboards) read them instead of
re-deriving the auditor's risk/security logic, which is the #1 drift risk."""

risk_tier_counts: dict[str, int]
security: dict[str, int]
decision: dict[str, int]

@classmethod
def from_projects(
cls, projects: list[PortfolioTruthProject]
) -> PortfolioTruthRollups:
risk_tier_counts = {
"elevated": 0,
"moderate": 0,
"baseline": 0,
"deferred": 0,
}
scanned_count = 0
repos_with_open_high_critical = 0
total_open_high = 0
total_open_critical = 0
decision_needed_count = 0
default_attention_count = 0
for project in projects:
tier = project.risk.risk_tier
if tier in risk_tier_counts:
risk_tier_counts[tier] += 1
security = project.security
if security.alerts_available:
scanned_count += 1
if security.open_high_critical > 0:
repos_with_open_high_critical += 1
total_open_high += security.dependabot_high
total_open_critical += security.dependabot_critical
attention = project.derived.attention_state
if attention == "decision-needed":
decision_needed_count += 1
default_attention_count += 1
elif attention in ("active-product", "active-infra"):
default_attention_count += 1
return cls(
risk_tier_counts=risk_tier_counts,
security={
"scanned_count": scanned_count,
"repos_with_open_high_critical": repos_with_open_high_critical,
"total_open_high": total_open_high,
"total_open_critical": total_open_critical,
},
decision={
"decision_needed_count": decision_needed_count,
"default_attention_count": default_attention_count,
},
)

def to_dict(self) -> dict[str, Any]:
return {
"risk_tier_counts": dict(self.risk_tier_counts),
"security": dict(self.security),
"decision": dict(self.decision),
}


@dataclass(frozen=True)
class PortfolioTruthSnapshot:
schema_version: str
Expand All @@ -219,6 +294,12 @@ class PortfolioTruthSnapshot:
precedence_matrix: dict[str, list[str]]
warnings: list[str]
projects: list[PortfolioTruthProject]
rollups: PortfolioTruthRollups = field(init=False)

def __post_init__(self) -> None:
object.__setattr__(
self, "rollups", PortfolioTruthRollups.from_projects(self.projects)
)

def to_dict(self) -> dict[str, Any]:
return {
Expand All @@ -229,4 +310,5 @@ def to_dict(self) -> dict[str, Any]:
"precedence_matrix": self.precedence_matrix,
"warnings": list(self.warnings),
"projects": [project.to_dict() for project in self.projects],
"rollups": self.rollups.to_dict(),
}
22 changes: 21 additions & 1 deletion tests/test_portfolio_truth.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,10 +370,30 @@ def test_truth_snapshot_respects_declared_and_derived_fields(
assert gamma.identity.section_marker == "iOS Projects"
assert gamma.derived.stack == ["Swift"]

assert result.snapshot.schema_version == "0.6.0"
assert result.snapshot.schema_version == "0.7.0"
assert result.snapshot.source_summary["attention_state_counts"]["active-product"] == 1
assert result.snapshot.source_summary["attention_state_counts"]["parked"] == 1

# Derived rollups are emitted so downstream consumers (command-center) read
# them instead of re-deriving the auditor's risk/security logic.
snapshot_dict = result.snapshot.to_dict()
rollups = snapshot_dict["rollups"]
assert set(rollups["risk_tier_counts"]) == {"elevated", "moderate", "baseline", "deferred"}
assert sum(rollups["risk_tier_counts"].values()) == len(result.snapshot.projects)
assert set(rollups["security"]) == {
"scanned_count",
"repos_with_open_high_critical",
"total_open_high",
"total_open_critical",
}
assert set(rollups["decision"]) == {"decision_needed_count", "default_attention_count"}
assert (
rollups["decision"]["default_attention_count"]
>= rollups["decision"]["decision_needed_count"]
)
# Per-project open_high_critical is emitted in the security block.
assert "open_high_critical" in snapshot_dict["projects"][0]["security"]


def test_attention_state_classifier_separates_activity_from_operator_attention() -> None:
from src.portfolio_truth_reconcile import _attention_state_for
Expand Down