Skip to content

Commit 0decb1d

Browse files
author
root
committed
Add moderation debug response headers
1 parent d00a7d5 commit 0decb1d

4 files changed

Lines changed: 292 additions & 50 deletions

File tree

src/moderation/smart.rs

Lines changed: 155 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use std::sync::atomic::{AtomicU64, Ordering};
44
use std::sync::{Arc, Mutex, OnceLock};
55
use std::time::Duration;
66
use std::time::{SystemTime, UNIX_EPOCH};
7+
use std::time::Instant;
78

89
use anyhow::{anyhow, Result};
910
use indexmap::IndexMap;
@@ -43,17 +44,31 @@ pub struct SmartModerationResult {
4344
pub reason: Option<String>,
4445
pub source: String,
4546
pub confidence: Option<f64>,
47+
#[serde(skip_serializing)]
48+
pub debug: Option<ModerationDebugInfo>,
49+
}
50+
51+
#[derive(Debug, Clone, Default, Serialize)]
52+
pub struct ModerationDebugInfo {
53+
pub local_model_source: Option<String>,
54+
pub local_model_confidence: Option<f64>,
55+
pub local_model_latency_ms: Option<f64>,
56+
pub local_model_decision: Option<String>,
57+
pub llm_reviewed: bool,
58+
pub llm_result: Option<String>,
59+
pub llm_latency_ms: Option<f64>,
60+
pub llm_error: Option<String>,
4661
}
4762

4863
#[derive(Debug)]
4964
pub enum SmartModerationError {
5065
ConcurrencyLimit(String),
51-
Other(anyhow::Error),
66+
Other(anyhow::Error, Option<ModerationDebugInfo>),
5267
}
5368

5469
impl From<anyhow::Error> for SmartModerationError {
5570
fn from(value: anyhow::Error) -> Self {
56-
Self::Other(value)
71+
Self::Other(value, None)
5772
}
5873
}
5974

@@ -105,16 +120,20 @@ async fn decide_moderation(
105120
env_map: &HashMap<String, String>,
106121
) -> Result<SmartModerationResult, SmartModerationError> {
107122
if should_force_ai_review(profile) {
108-
return run_ai_moderation_with_history(text, profile, http_client, env_map).await;
123+
return run_ai_moderation_with_history(text, profile, http_client, env_map, None).await;
109124
}
110125

111126
if profile.local_model_exists() {
112-
if let Some(result) = run_local_model(text, profile) {
127+
let (result, debug) = evaluate_local_model(text, profile);
128+
if let Some(mut result) = result {
129+
result.debug = Some(debug);
113130
return Ok(result);
114131
}
132+
133+
return run_ai_moderation_with_history(text, profile, http_client, env_map, Some(debug)).await;
115134
}
116135

117-
run_ai_moderation_with_history(text, profile, http_client, env_map).await
136+
run_ai_moderation_with_history(text, profile, http_client, env_map, None).await
118137
}
119138

120139
fn should_force_ai_review(profile: &ModerationProfile) -> bool {
@@ -129,43 +148,92 @@ fn should_force_ai_review(profile: &ModerationProfile) -> bool {
129148
next_random_unit() < rate
130149
}
131150

132-
fn run_local_model(text: &str, profile: &ModerationProfile) -> Option<SmartModerationResult> {
151+
fn evaluate_local_model(
152+
text: &str,
153+
profile: &ModerationProfile,
154+
) -> (Option<SmartModerationResult>, ModerationDebugInfo) {
155+
let started = Instant::now();
133156
let (probability, source) = match profile.config.local_model_type.as_str() {
134157
"fasttext" => match fasttext::predict_proba(text, profile) {
135158
Ok(Some(probability)) => (probability, "fasttext_model"),
136-
Ok(None) | Err(_) => return None,
159+
Ok(None) | Err(_) => {
160+
return (
161+
None,
162+
ModerationDebugInfo {
163+
local_model_source: Some("fasttext_model".to_string()),
164+
local_model_decision: Some("skipped".to_string()),
165+
..Default::default()
166+
},
167+
)
168+
}
137169
},
138170
"hashlinear" => match hashlinear::predict_proba(text, profile) {
139171
Ok(Some(probability)) => (probability, "hashlinear_model"),
140-
Ok(None) | Err(_) => return None,
172+
Ok(None) | Err(_) => {
173+
return (
174+
None,
175+
ModerationDebugInfo {
176+
local_model_source: Some("hashlinear_model".to_string()),
177+
local_model_decision: Some("skipped".to_string()),
178+
..Default::default()
179+
},
180+
)
181+
}
141182
},
142183
_ => match bow::predict_proba(text, profile) {
143184
Ok(Some(probability)) => (probability, "bow_model"),
144-
Ok(None) | Err(_) => return None,
185+
Ok(None) | Err(_) => {
186+
return (
187+
None,
188+
ModerationDebugInfo {
189+
local_model_source: Some("bow_model".to_string()),
190+
local_model_decision: Some("skipped".to_string()),
191+
..Default::default()
192+
},
193+
)
194+
}
145195
},
146196
};
147197
let low = profile.config.probability.low_risk_threshold;
148198
let high = profile.config.probability.high_risk_threshold;
199+
let latency_ms = started.elapsed().as_secs_f64() * 1000.0;
200+
let mut debug = ModerationDebugInfo {
201+
local_model_source: Some(source.to_string()),
202+
local_model_confidence: Some(probability),
203+
local_model_latency_ms: Some(latency_ms),
204+
..Default::default()
205+
};
149206

150207
if probability < low {
151-
return Some(SmartModerationResult {
152-
violation: false,
153-
category: None,
154-
reason: Some(format!("{source}: low risk (p={probability:.3})")),
155-
source: source.to_string(),
156-
confidence: Some(probability),
157-
});
208+
debug.local_model_decision = Some("allow".to_string());
209+
return (
210+
Some(SmartModerationResult {
211+
violation: false,
212+
category: None,
213+
reason: Some(format!("{source}: low risk (p={probability:.3})")),
214+
source: source.to_string(),
215+
confidence: Some(probability),
216+
debug: None,
217+
}),
218+
debug,
219+
);
158220
}
159221
if probability > high {
160-
return Some(SmartModerationResult {
161-
violation: true,
162-
category: None,
163-
reason: Some(format!("{source}: high risk (p={probability:.3})")),
164-
source: source.to_string(),
165-
confidence: Some(probability),
166-
});
222+
debug.local_model_decision = Some("block".to_string());
223+
return (
224+
Some(SmartModerationResult {
225+
violation: true,
226+
category: None,
227+
reason: Some(format!("{source}: high risk (p={probability:.3})")),
228+
source: source.to_string(),
229+
confidence: Some(probability),
230+
debug: None,
231+
}),
232+
debug,
233+
);
167234
}
168-
None
235+
debug.local_model_decision = Some("uncertain".to_string());
236+
(None, debug)
169237
}
170238

171239
async fn llm_moderate(
@@ -204,7 +272,7 @@ async fn llm_moderate(
204272
) -> Result<SmartModerationResult, SmartModerationError> {
205273
let response = response
206274
.error_for_status()
207-
.map_err(|err| SmartModerationError::Other(anyhow!(err)))?;
275+
.map_err(|err| SmartModerationError::Other(anyhow!(err), None))?;
208276

209277
let is_sse = response
210278
.headers()
@@ -216,19 +284,20 @@ async fn llm_moderate(
216284
if is_sse {
217285
let content = read_openai_chat_sse_content(response)
218286
.await
219-
.map_err(|err| SmartModerationError::Other(err))?;
287+
.map_err(|err| SmartModerationError::Other(err, None))?;
220288
parse_moderation_content(&content)
221289
} else {
222290
let payload = response
223291
.json::<Value>()
224292
.await
225-
.map_err(|err| SmartModerationError::Other(anyhow!(err)))?;
293+
.map_err(|err| SmartModerationError::Other(anyhow!(err), None))?;
226294
parse_openai_moderation_response(payload)
227295
}
228296
}
229297

230298
let mut attempted_models = Vec::new();
231299
let mut last_error = None;
300+
let started = Instant::now();
232301
for _attempt in 0..=max_retries {
233302
let model = pick_model_for_attempt(&models, &attempted_models);
234303
attempted_models.push(model.clone());
@@ -248,18 +317,26 @@ async fn llm_moderate(
248317
}))
249318
.send()
250319
.await
251-
.map_err(|err| SmartModerationError::Other(anyhow!(err)))?;
320+
.map_err(|err| SmartModerationError::Other(anyhow!(err), None))?;
252321

253322
parse_llm_response(response).await
254323
})
255324
.await;
256325

257326
match response {
258-
Ok(Ok(parsed)) => return Ok(parsed),
327+
Ok(Ok(mut parsed)) => {
328+
parsed.debug = Some(ModerationDebugInfo {
329+
llm_reviewed: true,
330+
llm_result: Some(if parsed.violation { "block".to_string() } else { "allow".to_string() }),
331+
llm_latency_ms: Some(started.elapsed().as_secs_f64() * 1000.0),
332+
..Default::default()
333+
});
334+
return Ok(parsed);
335+
}
259336
Ok(Err(err)) => {
260337
let err = match err {
261338
SmartModerationError::ConcurrencyLimit(message) => anyhow!(message),
262-
SmartModerationError::Other(err) => err,
339+
SmartModerationError::Other(err, _) => err,
263340
};
264341
last_error = Some(err.context("llm moderation request failed"));
265342
}
@@ -364,11 +441,43 @@ async fn run_ai_moderation_with_history(
364441
profile: &ModerationProfile,
365442
http_client: &Client,
366443
env_map: &HashMap<String, String>,
444+
inherited_debug: Option<ModerationDebugInfo>,
367445
) -> Result<SmartModerationResult, SmartModerationError> {
368-
if let Some(result) = load_history_result(text, profile).await? {
446+
if let Some(mut result) = load_history_result(text, profile).await? {
447+
let mut debug = inherited_debug.unwrap_or_default();
448+
debug.llm_reviewed = true;
449+
debug.llm_result = Some(if result.violation {
450+
"block".to_string()
451+
} else {
452+
"allow".to_string()
453+
});
454+
result.debug = Some(debug);
369455
return Ok(result);
370456
}
371-
let result = llm_moderate(text, profile, http_client, env_map).await?;
457+
let mut result = llm_moderate(text, profile, http_client, env_map).await.map_err(|err| {
458+
let mut debug = inherited_debug.clone().unwrap_or_default();
459+
debug.llm_reviewed = true;
460+
debug.llm_result = Some("error".to_string());
461+
debug.llm_error = Some(truncate_header_value(&match &err {
462+
SmartModerationError::ConcurrencyLimit(message) => message.clone(),
463+
SmartModerationError::Other(error, _) => format!("{error:#}"),
464+
}));
465+
match err {
466+
SmartModerationError::ConcurrencyLimit(message) => {
467+
SmartModerationError::Other(anyhow!(message), Some(debug))
468+
}
469+
SmartModerationError::Other(error, _) => SmartModerationError::Other(error, Some(debug)),
470+
}
471+
})?;
472+
let mut debug = inherited_debug.unwrap_or_default();
473+
let latency = result
474+
.debug
475+
.as_ref()
476+
.and_then(|value| value.llm_latency_ms);
477+
debug.llm_reviewed = true;
478+
debug.llm_result = Some(if result.violation { "block".to_string() } else { "allow".to_string() });
479+
debug.llm_latency_ms = latency;
480+
result.debug = Some(debug);
372481
save_history_result(text, profile, &result).await?;
373482
Ok(result)
374483
}
@@ -410,10 +519,11 @@ async fn load_history_result(
410519
.map(|created_at| format!("From DB: {created_at}")),
411520
source: "ai".to_string(),
412521
confidence: None,
522+
debug: None,
413523
}))
414524
})
415525
.await
416-
.map_err(|err| SmartModerationError::Other(anyhow!("history storage read task failed: {err}")))?
526+
.map_err(|err| SmartModerationError::Other(anyhow!("history storage read task failed: {err}"), None))?
417527
}
418528

419529
#[cfg(not(feature = "storage-debug"))]
@@ -442,7 +552,7 @@ async fn save_history_result(
442552
Ok(())
443553
})
444554
.await
445-
.map_err(|err| SmartModerationError::Other(anyhow!("history storage write task failed: {err}")))?
555+
.map_err(|err| SmartModerationError::Other(anyhow!("history storage write task failed: {err}"), None))?
446556
}
447557

448558
#[cfg(not(feature = "storage-debug"))]
@@ -502,6 +612,7 @@ fn parse_moderation_content(
502612
reason: data.get("reason").and_then(Value::as_str).map(ToString::to_string),
503613
source: "ai".to_string(),
504614
confidence: data.get("confidence").and_then(Value::as_f64),
615+
debug: None,
505616
})
506617
}
507618

@@ -516,8 +627,9 @@ fn check_cache(profile_name: &str, text: &str) -> Option<SmartModerationResult>
516627
let key = cache_key(text);
517628
let mut guard = cache.lock().expect("moderation cache");
518629
let profile_cache = guard.get_mut(profile_name)?;
519-
let result = profile_cache.shift_remove(&key)?;
630+
let mut result = profile_cache.shift_remove(&key)?;
520631
profile_cache.insert(key, result.clone());
632+
result.debug = None;
521633
Some(result)
522634
}
523635

@@ -532,12 +644,18 @@ fn save_cache(profile_name: &str, text: &str, result: &SmartModerationResult) {
532644
.entry(profile_name.to_string())
533645
.or_insert_with(IndexMap::new);
534646
profile_cache.shift_remove(&key);
535-
profile_cache.insert(key, result.clone());
647+
let mut cached = result.clone();
648+
cached.debug = None;
649+
profile_cache.insert(key, cached);
536650
while profile_cache.len() > CACHE_SIZE {
537651
profile_cache.shift_remove_index(0);
538652
}
539653
}
540654

655+
fn truncate_header_value(value: &str) -> String {
656+
value.chars().take(160).collect()
657+
}
658+
541659
fn cache_key(text: &str) -> String {
542660
format!("{:x}", md5::compute(text.as_bytes()))
543661
}
@@ -639,6 +757,7 @@ mod tests {
639757
reason: Some("cached".to_string()),
640758
source: "ai".to_string(),
641759
confidence: Some(0.9),
760+
debug: None,
642761
}
643762
}
644763

0 commit comments

Comments
 (0)