Skip to content

Commit 4ab3bc3

Browse files
author
Terraphim AI
committed
feat: Add progress update emissions to TwoPassDebateWorkflow
Enable real-time status updates for workflow execution via broadcast channel. Changes: - Add ProgressUpdate type for status updates - Add progress_tx field and with_progress_sender() builder method - Add emit_progress() helper to send status updates - Emit progress at all workflow milestones Related: zestic-ai/truthforge#45
1 parent a1b8804 commit 4ab3bc3

2 files changed

Lines changed: 110 additions & 2 deletions

File tree

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,6 @@ async-trait = "0.1"
2121
thiserror = "1.0"
2222
anyhow = "1.0"
2323
log = "0.4"
24+
25+
[patch.crates-io]
26+
genai = { git = "https://github.com/terraphim/rust-genai.git", branch = "main" }

crates/terraphim_truthforge/src/workflows/two_pass_debate.rs

Lines changed: 107 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,26 @@ use crate::agents::omission_detector::{OmissionDetectorAgent, OmissionDetectorCo
44
use crate::agents::taxonomy_linker::{TaxonomyLinkerAgent, TaxonomyLinkerConfig};
55
use crate::error::{Result, TruthForgeError};
66
use crate::types::*;
7+
use serde::{Deserialize, Serialize};
78
use std::sync::Arc;
89
use terraphim_multi_agent::{GenAiLlmClient, LlmMessage, LlmRequest};
10+
use tokio::sync::broadcast;
911
use tokio::task::JoinSet;
1012
use tracing::{debug, info, warn};
1113
use uuid::Uuid;
1214

15+
/// Progress update for WebSocket streaming
16+
/// This is a simplified version to avoid cross-repository dependencies
17+
#[derive(Debug, Clone, Serialize, Deserialize)]
18+
pub struct ProgressUpdate {
19+
pub session_id: Uuid,
20+
pub pillar: Option<String>,
21+
pub stage: String,
22+
pub progress: f32,
23+
pub message: String,
24+
pub timestamp: chrono::DateTime<chrono::Utc>,
25+
}
26+
1327
enum PassOneAgentResult {
1428
OmissionCatalog(OmissionCatalog),
1529
BiasAnalysis(BiasAnalysis),
@@ -724,6 +738,7 @@ pub struct TwoPassDebateWorkflow {
724738
pass_two: PassTwoOptimizer,
725739
response_generator: ResponseGenerator,
726740
llm_client: Option<Arc<GenAiLlmClient>>,
741+
progress_tx: Option<broadcast::Sender<ProgressUpdate>>,
727742
}
728743

729744
impl TwoPassDebateWorkflow {
@@ -733,6 +748,7 @@ impl TwoPassDebateWorkflow {
733748
pass_two: PassTwoOptimizer::new(),
734749
response_generator: ResponseGenerator::new(),
735750
llm_client: None,
751+
progress_tx: None,
736752
}
737753
}
738754

@@ -744,17 +760,67 @@ impl TwoPassDebateWorkflow {
744760
self
745761
}
746762

763+
pub fn with_progress_sender(mut self, tx: broadcast::Sender<ProgressUpdate>) -> Self {
764+
self.progress_tx = Some(tx);
765+
self
766+
}
767+
768+
fn emit_progress(&self, session_id: Uuid, stage: &str, message: &str, progress: f32) {
769+
if let Some(tx) = &self.progress_tx {
770+
let update = ProgressUpdate {
771+
session_id,
772+
pillar: None,
773+
stage: stage.to_string(),
774+
progress,
775+
message: message.to_string(),
776+
timestamp: chrono::Utc::now(),
777+
};
778+
let _ = tx.send(update);
779+
}
780+
}
781+
747782
pub async fn execute(&self, narrative: &NarrativeInput) -> Result<TruthForgeAnalysisResult> {
748783
let start_time = std::time::Instant::now();
749784

750785
info!(
751786
"Starting TwoPassDebateWorkflow for session {}",
752787
narrative.session_id
753788
);
789+
self.emit_progress(
790+
narrative.session_id,
791+
"workflow_start",
792+
"Starting TwoPassDebateWorkflow",
793+
0.0,
794+
);
795+
796+
self.emit_progress(
797+
narrative.session_id,
798+
"pass_1_start",
799+
"Starting Pass 1 Orchestration",
800+
0.1,
801+
);
754802

755803
let pass_one_result = self.pass_one.execute(narrative).await?;
756804

805+
let omissions_count = pass_one_result.omission_catalog.omissions.len();
806+
self.emit_progress(
807+
narrative.session_id,
808+
"pass_1_complete",
809+
&format!(
810+
"Pass 1: Analysis complete, {} omissions identified",
811+
omissions_count
812+
),
813+
0.4,
814+
);
815+
757816
info!("Pass 1 complete, generating Pass 1 debate");
817+
self.emit_progress(
818+
narrative.session_id,
819+
"pass_1_debate",
820+
"Generating Pass 1 debate",
821+
0.45,
822+
);
823+
758824
let pass_one_debate = if self.llm_client.is_some() {
759825
self.generate_pass_one_debate(narrative, &pass_one_result)
760826
.await?
@@ -764,15 +830,29 @@ impl TwoPassDebateWorkflow {
764830
};
765831

766832
info!("Pass 1 debate complete, starting Pass 2 exploitation");
833+
self.emit_progress(
834+
narrative.session_id,
835+
"pass_2_start",
836+
"Pass 1 debate complete, starting Pass 2 exploitation",
837+
0.5,
838+
);
839+
767840
let pass_two_result = self
768841
.pass_two
769842
.execute(narrative, &pass_one_result, &pass_one_debate)
770843
.await?;
771844

772845
info!("Pass 2 complete, generating cumulative analysis");
773-
774-
let omissions_count = pass_one_result.omission_catalog.omissions.len();
775846
let exploited_count = pass_two_result.exploited_vulnerabilities.len();
847+
self.emit_progress(
848+
narrative.session_id,
849+
"pass_2_complete",
850+
&format!(
851+
"Pass 2 complete, {} vulnerabilities identified",
852+
exploited_count
853+
),
854+
0.7,
855+
);
776856

777857
let cumulative_analysis = self
778858
.generate_cumulative_analysis_mock(
@@ -785,6 +865,13 @@ impl TwoPassDebateWorkflow {
785865
let risk_level = cumulative_analysis.strategic_risk_level.clone();
786866

787867
info!("Cumulative analysis complete, generating response strategies");
868+
self.emit_progress(
869+
narrative.session_id,
870+
"response_generation",
871+
"Generating response strategies",
872+
0.8,
873+
);
874+
788875
let response_strategies = self
789876
.response_generator
790877
.generate_strategies(
@@ -798,6 +885,15 @@ impl TwoPassDebateWorkflow {
798885
"Response strategies generated: {} strategies",
799886
response_strategies.len()
800887
);
888+
self.emit_progress(
889+
narrative.session_id,
890+
"strategies_complete",
891+
&format!(
892+
"Response strategies generated: {} strategies",
893+
response_strategies.len()
894+
),
895+
0.9,
896+
);
801897

802898
let result = TruthForgeAnalysisResult {
803899
session_id: narrative.session_id,
@@ -824,6 +920,15 @@ impl TwoPassDebateWorkflow {
824920
"TwoPassDebateWorkflow complete for session {} in {}ms",
825921
narrative.session_id, result.processing_time_ms
826922
);
923+
self.emit_progress(
924+
narrative.session_id,
925+
"workflow_complete",
926+
&format!(
927+
"TwoPassDebateWorkflow complete in {}ms",
928+
result.processing_time_ms
929+
),
930+
1.0,
931+
);
827932

828933
Ok(result)
829934
}

0 commit comments

Comments
 (0)