forked from paiml/rust-mcp-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path28_sse_optimized.rs
More file actions
135 lines (113 loc) Β· 4.29 KB
/
28_sse_optimized.rs
File metadata and controls
135 lines (113 loc) Β· 4.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
//! Optimized SSE Transport Example
//!
//! PMCP-4002: Demonstrates optimized SSE transport with advanced features
//!
//! Run with: cargo run --example 28_sse_optimized --features sse
use pmcp::shared::{OptimizedSseConfig, OptimizedSseTransport, Transport, TransportMessage};
use std::time::Duration;
use tracing::{info, Level};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize logging
tracing_subscriber::fmt().with_max_level(Level::INFO).init();
info!("π Starting Optimized SSE Transport Example");
// Configure optimized SSE transport
let config = OptimizedSseConfig {
url: "http://localhost:8080/sse".to_string(),
connection_timeout: Duration::from_secs(30),
keepalive_interval: Duration::from_secs(15),
max_reconnects: 5,
reconnect_delay: Duration::from_secs(1),
buffer_size: 100,
flush_interval: Duration::from_millis(100),
enable_pooling: true,
max_connections: 10,
enable_compression: false,
};
info!("β
Configuration:");
info!(" β’ URL: {}", config.url);
info!(" β’ Connection pooling: {}", config.enable_pooling);
info!(" β’ Max connections: {}", config.max_connections);
info!(" β’ Buffer size: {}", config.buffer_size);
info!(" β’ Flush interval: {:?}", config.flush_interval);
info!(" β’ Keepalive: {:?}", config.keepalive_interval);
info!(" β’ Compression: {}", config.enable_compression);
// Create transport
let mut transport = OptimizedSseTransport::new(config);
info!(
"π Transport created with type: {}",
transport.transport_type()
);
// Demonstrate sending messages
info!("π€ Sending test messages...");
// Send a notification
let notification = TransportMessage::Notification(pmcp::types::Notification::Progress(
pmcp::types::ProgressNotification::new(
pmcp::types::ProgressToken::String("task-001".to_string()),
25.0,
Some("Processing started".to_string()),
),
));
if let Err(e) = transport.send(notification).await {
info!("Failed to send notification: {}", e);
} else {
info!("β Notification sent");
}
// Send a request
let request = TransportMessage::Request {
id: pmcp::types::RequestId::from(1i64),
request: pmcp::types::Request::Client(Box::new(pmcp::types::ClientRequest::Ping)),
};
if let Err(e) = transport.send(request).await {
info!("Failed to send request: {}", e);
} else {
info!("β Request sent");
}
// Demonstrate batch sending (messages will be coalesced)
info!("π¦ Sending batch of messages...");
for i in 0..10 {
let progress_msg = TransportMessage::Notification(pmcp::types::Notification::Progress(
pmcp::types::ProgressNotification::new(
pmcp::types::ProgressToken::String(format!("batch-{}", i)),
i as f64 * 10.0,
Some(format!("Batch message {}", i)),
),
));
if let Err(e) = transport.send(progress_msg).await {
info!("Failed to send batch message {}: {}", i, e);
}
}
info!("β Batch messages queued (will be coalesced and flushed)");
// Check connection status
info!(
"π Connection status: {}",
if transport.is_connected() {
"Connected"
} else {
"Disconnected"
}
);
// Simulate receiving (would normally come from server)
info!("π₯ Attempting to receive messages...");
match tokio::time::timeout(Duration::from_secs(2), transport.receive()).await {
Ok(Ok(msg)) => {
info!("Received message: {:?}", msg);
},
Ok(Err(e)) => {
info!("Receive error: {}", e);
},
Err(_) => {
info!("No messages received (timeout)");
},
}
// Demonstrate connection pooling benefit
info!("π Connection pooling benefits:");
info!(" β’ Reuses existing connections");
info!(" β’ Reduces latency for subsequent requests");
info!(" β’ Maintains TCP keepalive");
info!(" β’ Automatic reconnection on failure");
// Close transport
transport.close().await?;
info!("π Transport closed");
Ok(())
}