Skip to content

Commit 8daeece

Browse files
authored
feat(truth): emit derived rollups + per-project open_high_critical (#77)
Add a top-level `rollups` block (risk-tier counts, security high+critical totals, decision counts) and the per-project `security.open_high_critical` field to portfolio-truth-latest.json, so downstream consumers read the auditor's derived risk/security logic instead of re-implementing it — the #1 cross-tool drift risk. - New PortfolioTruthRollups dataclass, computed in __post_init__ from the project list so it can never drift from the source data - SecurityFields.to_dict now includes the open_high_critical property - Schema 0.6.0 -> 0.7.0 (additive, backward-compatible) Verified: ruff check clean; 2493 passed, 2 skipped.
1 parent 3530976 commit 8daeece

2 files changed

Lines changed: 107 additions & 5 deletions

File tree

src/portfolio_truth_types.py

Lines changed: 86 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from pathlib import Path
77
from typing import Any
88

9-
SCHEMA_VERSION = "0.6.0"
9+
SCHEMA_VERSION = "0.7.0"
1010

1111
# The published "latest" portfolio-truth artifact. The producer
1212
# (portfolio_truth_publish) writes it; every reader resolves it through
@@ -32,7 +32,13 @@ def truth_latest_path(output_dir: Path) -> Path:
3232
"evidence-history",
3333
"manual-only",
3434
}
35-
VALID_LIFECYCLE_STATES = {"active", "maintenance", "dormant", "experimental", "archived"}
35+
VALID_LIFECYCLE_STATES = {
36+
"active",
37+
"maintenance",
38+
"dormant",
39+
"experimental",
40+
"archived",
41+
}
3642
VALID_CATEGORY_TAGS = {
3743
"commercial",
3844
"it-work",
@@ -129,7 +135,9 @@ class DerivedFields:
129135

130136
def to_dict(self) -> dict[str, Any]:
131137
data = dataclasses.asdict(self)
132-
data["last_meaningful_activity_at"] = _serialize_datetime(self.last_meaningful_activity_at)
138+
data["last_meaningful_activity_at"] = _serialize_datetime(
139+
self.last_meaningful_activity_at
140+
)
133141
return data
134142

135143

@@ -183,7 +191,9 @@ def open_high_critical(self) -> int:
183191
return self.dependabot_high + self.dependabot_critical
184192

185193
def to_dict(self) -> dict[str, Any]:
186-
return dataclasses.asdict(self)
194+
data = dataclasses.asdict(self)
195+
data["open_high_critical"] = self.open_high_critical
196+
return data
187197

188198

189199
@dataclass(frozen=True)
@@ -210,6 +220,71 @@ def to_dict(self) -> dict[str, Any]:
210220
}
211221

212222

223+
@dataclass(frozen=True)
224+
class PortfolioTruthRollups:
225+
"""Portfolio-level aggregates derived from the project list, emitted so
226+
downstream consumers (command-center, dashboards) read them instead of
227+
re-deriving the auditor's risk/security logic, which is the #1 drift risk."""
228+
229+
risk_tier_counts: dict[str, int]
230+
security: dict[str, int]
231+
decision: dict[str, int]
232+
233+
@classmethod
234+
def from_projects(
235+
cls, projects: list[PortfolioTruthProject]
236+
) -> PortfolioTruthRollups:
237+
risk_tier_counts = {
238+
"elevated": 0,
239+
"moderate": 0,
240+
"baseline": 0,
241+
"deferred": 0,
242+
}
243+
scanned_count = 0
244+
repos_with_open_high_critical = 0
245+
total_open_high = 0
246+
total_open_critical = 0
247+
decision_needed_count = 0
248+
default_attention_count = 0
249+
for project in projects:
250+
tier = project.risk.risk_tier
251+
if tier in risk_tier_counts:
252+
risk_tier_counts[tier] += 1
253+
security = project.security
254+
if security.alerts_available:
255+
scanned_count += 1
256+
if security.open_high_critical > 0:
257+
repos_with_open_high_critical += 1
258+
total_open_high += security.dependabot_high
259+
total_open_critical += security.dependabot_critical
260+
attention = project.derived.attention_state
261+
if attention == "decision-needed":
262+
decision_needed_count += 1
263+
default_attention_count += 1
264+
elif attention in ("active-product", "active-infra"):
265+
default_attention_count += 1
266+
return cls(
267+
risk_tier_counts=risk_tier_counts,
268+
security={
269+
"scanned_count": scanned_count,
270+
"repos_with_open_high_critical": repos_with_open_high_critical,
271+
"total_open_high": total_open_high,
272+
"total_open_critical": total_open_critical,
273+
},
274+
decision={
275+
"decision_needed_count": decision_needed_count,
276+
"default_attention_count": default_attention_count,
277+
},
278+
)
279+
280+
def to_dict(self) -> dict[str, Any]:
281+
return {
282+
"risk_tier_counts": dict(self.risk_tier_counts),
283+
"security": dict(self.security),
284+
"decision": dict(self.decision),
285+
}
286+
287+
213288
@dataclass(frozen=True)
214289
class PortfolioTruthSnapshot:
215290
schema_version: str
@@ -219,6 +294,12 @@ class PortfolioTruthSnapshot:
219294
precedence_matrix: dict[str, list[str]]
220295
warnings: list[str]
221296
projects: list[PortfolioTruthProject]
297+
rollups: PortfolioTruthRollups = field(init=False)
298+
299+
def __post_init__(self) -> None:
300+
object.__setattr__(
301+
self, "rollups", PortfolioTruthRollups.from_projects(self.projects)
302+
)
222303

223304
def to_dict(self) -> dict[str, Any]:
224305
return {
@@ -229,4 +310,5 @@ def to_dict(self) -> dict[str, Any]:
229310
"precedence_matrix": self.precedence_matrix,
230311
"warnings": list(self.warnings),
231312
"projects": [project.to_dict() for project in self.projects],
313+
"rollups": self.rollups.to_dict(),
232314
}

tests/test_portfolio_truth.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,10 +370,30 @@ def test_truth_snapshot_respects_declared_and_derived_fields(
370370
assert gamma.identity.section_marker == "iOS Projects"
371371
assert gamma.derived.stack == ["Swift"]
372372

373-
assert result.snapshot.schema_version == "0.6.0"
373+
assert result.snapshot.schema_version == "0.7.0"
374374
assert result.snapshot.source_summary["attention_state_counts"]["active-product"] == 1
375375
assert result.snapshot.source_summary["attention_state_counts"]["parked"] == 1
376376

377+
# Derived rollups are emitted so downstream consumers (command-center) read
378+
# them instead of re-deriving the auditor's risk/security logic.
379+
snapshot_dict = result.snapshot.to_dict()
380+
rollups = snapshot_dict["rollups"]
381+
assert set(rollups["risk_tier_counts"]) == {"elevated", "moderate", "baseline", "deferred"}
382+
assert sum(rollups["risk_tier_counts"].values()) == len(result.snapshot.projects)
383+
assert set(rollups["security"]) == {
384+
"scanned_count",
385+
"repos_with_open_high_critical",
386+
"total_open_high",
387+
"total_open_critical",
388+
}
389+
assert set(rollups["decision"]) == {"decision_needed_count", "default_attention_count"}
390+
assert (
391+
rollups["decision"]["default_attention_count"]
392+
>= rollups["decision"]["decision_needed_count"]
393+
)
394+
# Per-project open_high_critical is emitted in the security block.
395+
assert "open_high_critical" in snapshot_dict["projects"][0]["security"]
396+
377397

378398
def test_attention_state_classifier_separates_activity_from_operator_attention() -> None:
379399
from src.portfolio_truth_reconcile import _attention_state_for

0 commit comments

Comments
 (0)