Skip to content

Commit 4de4979

Browse files
Alex Holmbergclaude
authored andcommitted
feat(22-01): wire processor to server startup
- Add enable_processor and processor_config to AgUiConfig - Add with_processor() and with_processor_config() builder methods - Modify AgUiServer::run() to spawn processor when enabled - Add integration test for processor with ServerState - 3 new tests for processor wiring Co-Authored-By: Claude <noreply@anthropic.com>
1 parent d747592 commit 4de4979

1 file changed

Lines changed: 111 additions & 0 deletions

File tree

src/server/mod.rs

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ pub struct AgUiConfig {
7979
pub host: String,
8080
/// Maximum number of concurrent connections.
8181
pub max_connections: usize,
82+
/// Whether to start the agent processor.
83+
pub enable_processor: bool,
84+
/// Configuration for the agent processor (if enabled).
85+
pub processor_config: Option<ProcessorConfig>,
8286
}
8387

8488
impl Default for AgUiConfig {
@@ -87,6 +91,8 @@ impl Default for AgUiConfig {
8791
port: 9090,
8892
host: "127.0.0.1".to_string(),
8993
max_connections: 100,
94+
enable_processor: false,
95+
processor_config: None,
9096
}
9197
}
9298
}
@@ -108,6 +114,25 @@ impl AgUiConfig {
108114
self.host = host.into();
109115
self
110116
}
117+
118+
/// Enables or disables the agent processor.
119+
///
120+
/// When enabled, the server will spawn an AgentProcessor that
121+
/// consumes messages from the message channel and processes them.
122+
pub fn with_processor(mut self, enable: bool) -> Self {
123+
self.enable_processor = enable;
124+
if enable && self.processor_config.is_none() {
125+
self.processor_config = Some(ProcessorConfig::default());
126+
}
127+
self
128+
}
129+
130+
/// Sets the processor configuration.
131+
pub fn with_processor_config(mut self, config: ProcessorConfig) -> Self {
132+
self.processor_config = Some(config);
133+
self.enable_processor = true;
134+
self
135+
}
111136
}
112137

113138
/// Shared state for the AG-UI server.
@@ -206,11 +231,30 @@ impl AgUiServer {
206231
/// Runs the AG-UI server.
207232
///
208233
/// This method blocks until the server is shut down.
234+
/// If the processor is enabled in config, it will be spawned as a background task.
209235
pub async fn run(self) -> Result<(), std::io::Error> {
210236
let addr: SocketAddr = format!("{}:{}", self.config.host, self.config.port)
211237
.parse()
212238
.expect("Invalid address");
213239

240+
// Optionally start the agent processor
241+
if self.config.enable_processor {
242+
let processor_config = self.config.processor_config.clone()
243+
.unwrap_or_default();
244+
245+
if let Some(msg_rx) = self.state.take_message_receiver().await {
246+
let event_bridge = self.state.event_sender();
247+
let mut processor = AgentProcessor::new(msg_rx, event_bridge, processor_config);
248+
249+
// Spawn processor in background
250+
tokio::spawn(async move {
251+
processor.run().await;
252+
});
253+
254+
println!("Agent processor started");
255+
}
256+
}
257+
214258
let app = Router::new()
215259
.route("/", get(routes::health))
216260
.route("/sse", get(routes::sse_handler))
@@ -328,4 +372,71 @@ mod tests {
328372
let rx2 = state.take_message_receiver().await;
329373
assert!(rx2.is_none());
330374
}
375+
376+
#[test]
377+
fn test_config_with_processor() {
378+
let config = AgUiConfig::new().with_processor(true);
379+
assert!(config.enable_processor);
380+
assert!(config.processor_config.is_some());
381+
}
382+
383+
#[test]
384+
fn test_config_with_processor_config() {
385+
let processor_config = ProcessorConfig::new()
386+
.with_provider("anthropic")
387+
.with_model("claude-3-sonnet");
388+
389+
let config = AgUiConfig::new()
390+
.with_processor_config(processor_config);
391+
392+
assert!(config.enable_processor);
393+
let proc_config = config.processor_config.unwrap();
394+
assert_eq!(proc_config.provider, "anthropic");
395+
assert_eq!(proc_config.model, "claude-3-sonnet");
396+
}
397+
398+
#[tokio::test]
399+
async fn test_processor_integration_with_state() {
400+
use ag_ui_core::types::{Message, RunAgentInput};
401+
use ag_ui_core::Event;
402+
403+
// Create state and get components
404+
let state = ServerState::new();
405+
let msg_tx = state.message_sender();
406+
let mut event_rx = state.subscribe();
407+
let msg_rx = state.take_message_receiver().await.expect("Should get receiver");
408+
409+
// Create and spawn processor
410+
let event_bridge = state.event_sender();
411+
let mut processor = AgentProcessor::with_defaults(msg_rx, event_bridge);
412+
413+
let handle = tokio::spawn(async move {
414+
processor.run().await;
415+
});
416+
417+
// Send a message
418+
let thread_id = ThreadId::random();
419+
let run_id = RunId::random();
420+
let input = RunAgentInput::new(thread_id.clone(), run_id.clone())
421+
.with_messages(vec![Message::new_user("Integration test message")]);
422+
423+
msg_tx.send(AgentMessage::new(input)).await.expect("Should send");
424+
425+
// Verify events are emitted
426+
let event = tokio::time::timeout(
427+
std::time::Duration::from_millis(200),
428+
event_rx.recv()
429+
).await.expect("Should receive in time").expect("Should have event");
430+
431+
assert!(matches!(event, Event::RunStarted(_)));
432+
433+
// Stop processor by dropping sender
434+
drop(msg_tx);
435+
436+
// Wait for processor to finish
437+
let _ = tokio::time::timeout(
438+
std::time::Duration::from_millis(200),
439+
handle
440+
).await;
441+
}
331442
}

0 commit comments

Comments
 (0)