Skip to content

Commit c4c8044

Browse files
feat(risk): Implement multi-dimensional risk scoring with churn and coverage
- Added `GitMiner` for file churn analysis. - Added `CoverageScorer` for parsing coverage reports. - Added `RiskPropagator` for risk propagation through dependencies. - Updated `RiskScorer` to aggregate static, churn, coverage, and propagation risks. - Updated `FileRisk` model with `sub_scores`. - Updated `RiskBaselineConfig` with new weights. - Added comprehensive unit and integration tests.
1 parent 96e1353 commit c4c8044

11 files changed

Lines changed: 757 additions & 48 deletions

File tree

codesage/config/risk_baseline.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,34 @@
33
class RiskBaselineConfig(BaseModel):
44
"""Configuration for the baseline risk scorer."""
55

6-
# Weights for risk scoring
6+
# Weights for risk scoring (Base static score)
77
weight_complexity_max: float = 0.4
88
weight_complexity_avg: float = 0.3
99
weight_fan_out: float = 0.2
1010
weight_loc: float = 0.1
1111

12+
# Weights for multi-dimensional scoring
13+
# Final = w_static * static + w_churn * churn + w_cov * (static * (1-cov))
14+
# Or as per task: Score = w1 * Complexity + w2 * Churn + w3 * (1 - Coverage)
15+
# The "Complexity" here refers to the static score calculated above.
16+
17+
weight_static_score: float = 0.5
18+
weight_churn: float = 0.3
19+
weight_coverage_penalty: float = 0.2
20+
21+
# Propagation
22+
propagation_factor: float = 0.2
23+
propagation_iterations: int = 5
24+
1225
# Thresholds for complexity and risk levels
1326
threshold_complexity_high: int = 10
1427
threshold_risk_medium: float = 0.4
1528
threshold_risk_high: float = 0.7
1629

30+
# Churn settings
31+
churn_since_days: int = 90
32+
threshold_churn_high: int = 10 # If file changed > 10 times in 90 days, normalized churn = 1.0
33+
1734
@classmethod
1835
def from_defaults(cls) -> "RiskBaselineConfig":
1936
return cls()

codesage/history/git_miner.py

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
import subprocess
2+
from datetime import datetime, timedelta
3+
from typing import Dict, List, Tuple, Optional
4+
import logging
5+
6+
logger = logging.getLogger(__name__)
7+
8+
class GitMiner:
9+
def __init__(self, repo_path: str = "."):
10+
self.repo_path = repo_path
11+
self._churn_cache: Dict[str, int] = {}
12+
self._last_modified_cache: Dict[str, datetime] = {}
13+
self._is_initialized = False
14+
15+
def _run_git_cmd(self, args: List[str]) -> str:
16+
try:
17+
result = subprocess.run(
18+
["git"] + args,
19+
cwd=self.repo_path,
20+
capture_output=True,
21+
text=True,
22+
check=True
23+
)
24+
return result.stdout.strip()
25+
except subprocess.CalledProcessError as e:
26+
logger.warning(f"Git command failed: {e}")
27+
return ""
28+
29+
def _initialize_stats(self, since_days: int = 90):
30+
"""
31+
Parses git log once to populate churn and last modified dates.
32+
"""
33+
if self._is_initialized:
34+
return
35+
36+
since_date = (datetime.now() - timedelta(days=since_days)).strftime("%Y-%m-%d")
37+
38+
# Get all commits with file changes
39+
# Format: timestamp|filename
40+
cmd = [
41+
"log",
42+
f"--since={since_date}",
43+
"--pretty=format:%at", # Timestamp
44+
"--name-only", # List changed files
45+
]
46+
47+
output = self._run_git_cmd(cmd)
48+
49+
current_timestamp = None
50+
51+
for line in output.split('\n'):
52+
line = line.strip()
53+
if not line:
54+
continue
55+
56+
# If line is a timestamp (digits)
57+
if line.isdigit():
58+
current_timestamp = int(line)
59+
continue
60+
61+
# Otherwise it's a filename
62+
file_path = line
63+
self._churn_cache[file_path] = self._churn_cache.get(file_path, 0) + 1
64+
65+
if current_timestamp:
66+
dt = datetime.fromtimestamp(current_timestamp)
67+
if file_path not in self._last_modified_cache:
68+
self._last_modified_cache[file_path] = dt
69+
else:
70+
# git log is usually newest first, so we keep the first one we see (max)
71+
# or if we process in order, the first one is indeed the latest.
72+
# Wait, git log default is reverse chronological (newest first).
73+
# So the first time we see a file, it's the latest commit.
74+
# We only set it if not present.
75+
pass
76+
77+
self._is_initialized = True
78+
79+
def get_file_churn(self, file_path: str, since_days: int = 90) -> int:
80+
"""
81+
Returns the number of times a file has been changed in the last `since_days`.
82+
"""
83+
self._initialize_stats(since_days)
84+
return self._churn_cache.get(file_path, 0)
85+
86+
def get_last_modified(self, file_path: str) -> Optional[datetime]:
87+
"""
88+
Returns the last modification time of the file from git history.
89+
"""
90+
self._initialize_stats() # Use default since_days or make sure we have data
91+
return self._last_modified_cache.get(file_path)
92+
93+
def get_hotspots(self, limit: int = 10, since_days: int = 90) -> List[Tuple[str, int]]:
94+
"""
95+
Returns the top `limit` modified files.
96+
"""
97+
self._initialize_stats(since_days)
98+
sorted_files = sorted(self._churn_cache.items(), key=lambda x: x[1], reverse=True)
99+
return sorted_files[:limit]

codesage/risk/propagation.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
from typing import Dict, List, Set, Tuple
2+
import logging
3+
4+
logger = logging.getLogger(__name__)
5+
6+
class RiskPropagator:
7+
def __init__(self, attenuation_factor: float = 0.5, max_iterations: int = 10, epsilon: float = 0.01):
8+
self.attenuation_factor = attenuation_factor
9+
self.max_iterations = max_iterations
10+
self.epsilon = epsilon
11+
12+
def propagate(self, dependency_graph: Dict[str, List[str]], base_scores: Dict[str, float]) -> Dict[str, float]:
13+
"""
14+
Propagates risk scores through the dependency graph.
15+
dependency_graph: Dict[str, List[str]] where key is a file and value is a list of files it depends on (imports).
16+
base_scores: Dict[str, float] initial risk scores for each file.
17+
18+
If A depends on B (A -> B), then risk flows from B to A.
19+
"Calling a high risk component makes you risky."
20+
"""
21+
22+
final_scores = base_scores.copy()
23+
24+
# Build reverse graph: who depends on X? (X -> [A, ...])
25+
# Wait, if A depends on B, risk propagates B -> A.
26+
# So we iterate through nodes. For a node A, we look at its dependencies (B, C).
27+
# A's new score = A's base score + sum(B's score * factor)
28+
29+
# However, B's score might also increase if B depends on D.
30+
# So this is an iterative process.
31+
32+
nodes = list(base_scores.keys())
33+
34+
for _ in range(self.max_iterations):
35+
changes = 0
36+
current_scores = final_scores.copy()
37+
38+
for node in nodes:
39+
# dependencies: files that 'node' imports
40+
dependencies = dependency_graph.get(node, [])
41+
42+
incoming_risk = 0.0
43+
for dep in dependencies:
44+
if dep in current_scores:
45+
incoming_risk += current_scores[dep] * self.attenuation_factor
46+
47+
# Formula: Base + Propagated
48+
# We should probably dampen it so it doesn't explode, or clamp it?
49+
# The user formula says: new_score = base_scores[node] + incoming_risk
50+
# If we want 0-100 or 0-1 scale, this might exceed 1.0.
51+
# But that's fine, we can normalize later or cap it.
52+
53+
new_score = base_scores[node] + incoming_risk
54+
55+
if abs(new_score - final_scores[node]) > self.epsilon:
56+
final_scores[node] = new_score
57+
changes += 1
58+
59+
if changes == 0:
60+
break
61+
62+
return final_scores

0 commit comments

Comments
 (0)