Skip to content

Commit 991e31a

Browse files
refactor: reduce cognitive complexity of dispatch and compile_pipeline_inner (#831)
1 parent ba615af commit 991e31a

2 files changed

Lines changed: 193 additions & 146 deletions

File tree

src/audit/cli.rs

Lines changed: 153 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,61 @@ pub async fn dispatch(opts: AuditOptions<'_>) -> Result<()> {
6767
..AuditData::default()
6868
};
6969

70+
let filters = artifact_filters.as_deref();
71+
let saw_artifact_auth_error =
72+
fetch_and_record_artifacts(&client, &ctx, &auth, parsed.build_id, filters, &run_dir, &mut audit)
73+
.await?;
74+
75+
if saw_artifact_auth_error && !has_any_local_artifacts(&run_dir).await {
76+
anyhow::bail!(
77+
"failed to download artifacts and no local cache. Use 'az pipelines runs artifact download --run-id {}' to fetch them manually, then re-run.",
78+
parsed.build_id
79+
);
80+
}
81+
82+
run_analyzers(&client, &ctx, &auth, parsed.build_id, filters, &run_dir, &mut audit).await;
83+
populate_performance_metrics(&mut audit);
84+
85+
audit.metrics.error_count = audit.errors.len() as u64;
86+
audit.metrics.warning_count = audit.warnings.len() as u64;
87+
findings::derive_findings(&mut audit);
88+
89+
save_run_summary(
90+
&run_dir,
91+
&RunSummary {
92+
ado_aw_version: env!("CARGO_PKG_VERSION").to_string(),
93+
build_id: parsed.build_id,
94+
processed_at: Utc::now(),
95+
audit_data: audit.clone(),
96+
},
97+
)
98+
.await?;
99+
100+
render_audit(&audit, opts.json)?;
101+
if !opts.json {
102+
eprintln!("✓ Audit complete. Reports in {}", run_dir.display());
103+
}
104+
Ok(())
105+
}
106+
107+
/// Download all selected artifacts for the build, recording auth errors and
108+
/// non-fatal download failures as warnings rather than hard failures.
109+
/// Returns `true` if at least one artifact download was blocked by an auth error.
110+
async fn fetch_and_record_artifacts(
111+
client: &reqwest::Client,
112+
ctx: &AdoContext,
113+
auth: &crate::ado::AdoAuth,
114+
build_id: u64,
115+
artifact_filters: Option<&[String]>,
116+
run_dir: &Path,
117+
audit: &mut AuditData,
118+
) -> Result<bool> {
70119
let mut saw_artifact_auth_error = false;
71-
match list_build_artifacts(&client, &ctx, &auth, parsed.build_id).await {
120+
match list_build_artifacts(client, ctx, auth, build_id).await {
72121
Ok(artifacts) => {
73122
let selected: Vec<_> = artifacts
74123
.into_iter()
75-
.filter(|artifact| {
76-
artifact_matches_selected(&artifact.name, artifact_filters.as_deref())
77-
})
124+
.filter(|artifact| artifact_matches_selected(&artifact.name, artifact_filters))
78125
.collect();
79126

80127
if selected.is_empty() {
@@ -84,17 +131,16 @@ pub async fn dispatch(opts: AuditOptions<'_>) -> Result<()> {
84131
} else {
85132
"no artifacts were published for this build".to_string()
86133
};
87-
warn_and_record(&mut audit, "audit::artifacts", message);
134+
warn_and_record(audit, "audit::artifacts", message);
88135
}
89136

90137
for artifact in selected {
91-
match download_artifact_preserving_cache(&client, &auth, &artifact, &run_dir).await
92-
{
138+
match download_artifact_preserving_cache(client, auth, &artifact, run_dir).await {
93139
Ok(()) => {}
94140
Err(error) if is_authz_error(&error) => {
95141
saw_artifact_auth_error = true;
96142
warn_and_record(
97-
&mut audit,
143+
audit,
98144
"audit::artifacts",
99145
format!(
100146
"failed to download artifact '{}': {:#}; using any local copy already present",
@@ -104,12 +150,9 @@ pub async fn dispatch(opts: AuditOptions<'_>) -> Result<()> {
104150
}
105151
Err(error) => {
106152
warn_and_record(
107-
&mut audit,
153+
audit,
108154
"audit::artifacts",
109-
format!(
110-
"failed to download artifact '{}': {:#}",
111-
artifact.name, error
112-
),
155+
format!("failed to download artifact '{}': {:#}", artifact.name, error),
113156
);
114157
}
115158
}
@@ -118,7 +161,7 @@ pub async fn dispatch(opts: AuditOptions<'_>) -> Result<()> {
118161
Err(error) if is_authz_error(&error) => {
119162
saw_artifact_auth_error = true;
120163
warn_and_record(
121-
&mut audit,
164+
audit,
122165
"audit::artifacts",
123166
format!(
124167
"failed to list build artifacts: {:#}; using any local cache already present",
@@ -127,90 +170,38 @@ pub async fn dispatch(opts: AuditOptions<'_>) -> Result<()> {
127170
);
128171
}
129172
Err(error) => {
130-
return Err(error).context(format!(
131-
"failed to list artifacts for build {}",
132-
parsed.build_id
133-
));
173+
return Err(error)
174+
.context(format!("failed to list artifacts for build {}", build_id));
134175
}
135176
}
177+
Ok(saw_artifact_auth_error)
178+
}
136179

137-
if saw_artifact_auth_error && !has_any_local_artifacts(&run_dir).await {
138-
anyhow::bail!(
139-
"failed to download artifacts and no local cache. Use 'az pipelines runs artifact download --run-id {}' to fetch them manually, then re-run.",
140-
parsed.build_id
141-
);
142-
}
143-
144-
match collect_downloaded_files(&run_dir, artifact_filters.as_deref()).await {
180+
/// Run all analysis passes over the downloaded artifacts and populate `audit`.
181+
/// Individual analyzer failures are recorded as warnings rather than returned as errors.
182+
async fn run_analyzers(
183+
client: &reqwest::Client,
184+
ctx: &AdoContext,
185+
auth: &crate::ado::AdoAuth,
186+
build_id: u64,
187+
artifact_filters: Option<&[String]>,
188+
run_dir: &Path,
189+
audit: &mut AuditData,
190+
) {
191+
match collect_downloaded_files(run_dir, artifact_filters).await {
145192
Ok(files) => audit.downloaded_files = files,
146193
Err(error) => warn_and_record(
147-
&mut audit,
194+
audit,
148195
"audit::artifacts",
149196
format!("failed to enumerate downloaded files: {:#}", error),
150197
),
151198
}
152199

153-
if let Some(agent_outputs_dir) = find_artifact_dir(&run_dir, "agent_outputs").await {
154-
let firewall_dir = agent_outputs_dir.join("logs").join("firewall");
155-
match firewall::analyze_firewall_logs(&firewall_dir).await {
156-
Ok(result) => audit.firewall_analysis = result,
157-
Err(error) => warn_and_record(
158-
&mut audit,
159-
"audit::firewall",
160-
format!("firewall analysis failed: {:#}", error),
161-
),
162-
}
163-
match policy::analyze_policy(&firewall_dir).await {
164-
Ok(result) => audit.policy_analysis = result,
165-
Err(error) => warn_and_record(
166-
&mut audit,
167-
"audit::policy",
168-
format!("policy analysis failed: {:#}", error),
169-
),
170-
}
171-
172-
let mcpg_dir = agent_outputs_dir.join("logs").join("mcpg");
173-
match mcp::analyze_mcp_tool_usage(&mcpg_dir).await {
174-
Ok(result) => audit.mcp_tool_usage = result,
175-
Err(error) => warn_and_record(
176-
&mut audit,
177-
"audit::mcp",
178-
format!("MCP tool-usage analysis failed: {:#}", error),
179-
),
180-
}
181-
match mcp::analyze_mcp_server_health(&mcpg_dir).await {
182-
Ok(result) => audit.mcp_server_health = result,
183-
Err(error) => warn_and_record(
184-
&mut audit,
185-
"audit::mcp",
186-
format!("MCP server-health analysis failed: {:#}", error),
187-
),
188-
}
189-
match mcp::extract_mcp_failures(&mcpg_dir).await {
190-
Ok(result) => audit.mcp_failures = result,
191-
Err(error) => warn_and_record(
192-
&mut audit,
193-
"audit::mcp",
194-
format!("MCP failure extraction failed: {:#}", error),
195-
),
196-
}
197-
198-
match otel::analyze_otel(&agent_outputs_dir).await {
199-
Ok(result) => {
200-
audit.metrics = result.metrics;
201-
audit.engine_config = result.engine_config;
202-
audit.performance_metrics = result.performance;
203-
audit.overview.aw_info = result.aw_info;
204-
}
205-
Err(error) => warn_and_record(
206-
&mut audit,
207-
"audit::otel",
208-
format!("OTel analysis failed: {:#}", error),
209-
),
210-
}
200+
if let Some(agent_outputs_dir) = find_artifact_dir(run_dir, "agent_outputs").await {
201+
run_agent_output_analyzers(&agent_outputs_dir, audit).await;
211202
}
212203

213-
match safe_outputs::analyze_safe_outputs(&run_dir).await {
204+
match safe_outputs::analyze_safe_outputs(run_dir).await {
214205
Ok(result) => {
215206
audit.safe_output_summary = result.summary;
216207
audit.safe_output_execution = result.execution;
@@ -219,55 +210,120 @@ pub async fn dispatch(opts: AuditOptions<'_>) -> Result<()> {
219210
audit.key_findings.extend(result.findings);
220211
}
221212
Err(error) => warn_and_record(
222-
&mut audit,
213+
audit,
223214
"audit::safe_outputs",
224215
format!("safe-output analysis failed: {:#}", error),
225216
),
226217
}
227218

228-
match detection::analyze_detection(&run_dir).await {
219+
match detection::analyze_detection(run_dir).await {
229220
Ok(result) => audit.detection_analysis = result,
230221
Err(error) => warn_and_record(
231-
&mut audit,
222+
audit,
232223
"audit::detection",
233224
format!("detection analysis failed: {:#}", error),
234225
),
235226
}
236227

237-
match missing::extract_missing_tools(&run_dir).await {
228+
match missing::extract_missing_tools(run_dir).await {
238229
Ok(result) => audit.missing_tools = result,
239230
Err(error) => warn_and_record(
240-
&mut audit,
231+
audit,
241232
"audit::missing_tools",
242233
format!("missing-tool extraction failed: {:#}", error),
243234
),
244235
}
245-
match missing::extract_missing_data(&run_dir).await {
236+
match missing::extract_missing_data(run_dir).await {
246237
Ok(result) => audit.missing_data = result,
247238
Err(error) => warn_and_record(
248-
&mut audit,
239+
audit,
249240
"audit::missing_data",
250241
format!("missing-data extraction failed: {:#}", error),
251242
),
252243
}
253-
match missing::extract_noops(&run_dir).await {
244+
match missing::extract_noops(run_dir).await {
254245
Ok(result) => audit.noops = result,
255246
Err(error) => warn_and_record(
256-
&mut audit,
247+
audit,
257248
"audit::noops",
258249
format!("noop extraction failed: {:#}", error),
259250
),
260251
}
261252

262-
match jobs::fetch_timeline(&client, &ctx, &auth, parsed.build_id).await {
253+
match jobs::fetch_timeline(client, ctx, auth, build_id).await {
263254
Ok(timeline) => audit.jobs = jobs::timeline_to_jobs(&timeline),
264255
Err(error) => warn_and_record(
265-
&mut audit,
256+
audit,
266257
"audit::jobs",
267258
format!("job timeline analysis failed: {:#}", error),
268259
),
269260
}
261+
}
262+
263+
/// Run analyzers that operate on the `agent_outputs` artifact directory.
264+
async fn run_agent_output_analyzers(agent_outputs_dir: &Path, audit: &mut AuditData) {
265+
let firewall_dir = agent_outputs_dir.join("logs").join("firewall");
266+
match firewall::analyze_firewall_logs(&firewall_dir).await {
267+
Ok(result) => audit.firewall_analysis = result,
268+
Err(error) => warn_and_record(
269+
audit,
270+
"audit::firewall",
271+
format!("firewall analysis failed: {:#}", error),
272+
),
273+
}
274+
match policy::analyze_policy(&firewall_dir).await {
275+
Ok(result) => audit.policy_analysis = result,
276+
Err(error) => warn_and_record(
277+
audit,
278+
"audit::policy",
279+
format!("policy analysis failed: {:#}", error),
280+
),
281+
}
282+
283+
let mcpg_dir = agent_outputs_dir.join("logs").join("mcpg");
284+
match mcp::analyze_mcp_tool_usage(&mcpg_dir).await {
285+
Ok(result) => audit.mcp_tool_usage = result,
286+
Err(error) => warn_and_record(
287+
audit,
288+
"audit::mcp",
289+
format!("MCP tool-usage analysis failed: {:#}", error),
290+
),
291+
}
292+
match mcp::analyze_mcp_server_health(&mcpg_dir).await {
293+
Ok(result) => audit.mcp_server_health = result,
294+
Err(error) => warn_and_record(
295+
audit,
296+
"audit::mcp",
297+
format!("MCP server-health analysis failed: {:#}", error),
298+
),
299+
}
300+
match mcp::extract_mcp_failures(&mcpg_dir).await {
301+
Ok(result) => audit.mcp_failures = result,
302+
Err(error) => warn_and_record(
303+
audit,
304+
"audit::mcp",
305+
format!("MCP failure extraction failed: {:#}", error),
306+
),
307+
}
270308

309+
match otel::analyze_otel(agent_outputs_dir).await {
310+
Ok(result) => {
311+
audit.metrics = result.metrics;
312+
audit.engine_config = result.engine_config;
313+
audit.performance_metrics = result.performance;
314+
audit.overview.aw_info = result.aw_info;
315+
}
316+
Err(error) => warn_and_record(
317+
audit,
318+
"audit::otel",
319+
format!("OTel analysis failed: {:#}", error),
320+
),
321+
}
322+
}
323+
324+
/// Backfill performance metric fields that can be derived from other already-populated
325+
/// analysis results (firewall request count, most-used MCP tool).
326+
fn populate_performance_metrics(audit: &mut AuditData) {
271327
if let Some(firewall_analysis) = &audit.firewall_analysis {
272328
let performance = audit.performance_metrics.get_or_insert_default();
273329
if performance.network_requests.is_none() {
@@ -282,27 +338,6 @@ pub async fn dispatch(opts: AuditOptions<'_>) -> Result<()> {
282338
performance.most_used_tool = Some(tool.name.clone());
283339
}
284340
}
285-
286-
audit.metrics.error_count = audit.errors.len() as u64;
287-
audit.metrics.warning_count = audit.warnings.len() as u64;
288-
findings::derive_findings(&mut audit);
289-
290-
save_run_summary(
291-
&run_dir,
292-
&RunSummary {
293-
ado_aw_version: env!("CARGO_PKG_VERSION").to_string(),
294-
build_id: parsed.build_id,
295-
processed_at: Utc::now(),
296-
audit_data: audit.clone(),
297-
},
298-
)
299-
.await?;
300-
301-
render_audit(&audit, opts.json)?;
302-
if !opts.json {
303-
eprintln!("✓ Audit complete. Reports in {}", run_dir.display());
304-
}
305-
Ok(())
306341
}
307342

308343
async fn resolve_audit_context(

0 commit comments

Comments
 (0)