Skip to content

Commit 83f2cc0

Browse files
authored
rust(feature): bootstrap test utils + mcp (#578)
1 parent d1e6e70 commit 83f2cc0

20 files changed

Lines changed: 1175 additions & 16 deletions

File tree

Cargo.toml

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
[workspace]
22
resolver = "2"
33
members = [
4-
"rust/crates/sift_error",
5-
"rust/crates/sift_rs",
6-
"rust/crates/sift_stream",
7-
"rust/crates/sift_error",
8-
"rust/crates/sift_connect",
9-
"rust/crates/sift_stream_bindings",
10-
"rust/crates/sift_cli",
11-
"rust/crates/sift_pbfs",
4+
"rust/crates/sift_error",
5+
"rust/crates/sift_rs",
6+
"rust/crates/sift_stream",
7+
"rust/crates/sift_error",
8+
"rust/crates/sift_connect",
9+
"rust/crates/sift_stream_bindings",
10+
"rust/crates/sift_cli",
11+
"rust/crates/sift_pbfs",
12+
"rust/crates/sift_mcp",
13+
"rust/crates/sift_test_util",
1214
]
1315

1416
[workspace.package]
@@ -28,8 +30,9 @@ arrow-array = "58.1.0"
2830
arrow-schema = "58.1.0"
2931
async-channel = "2.2"
3032
async-trait = "^0.1"
33+
mockall = "0.14.0"
3134
bytes = "1.11.1"
32-
bytesize = { version = "2" }
35+
bytesize = "2"
3336
chrono = { version = "0.4", default-features = false, features = ["clock"] }
3437
clap = { version = "4.6", features = ["cargo", "derive", "wrap_help"] }
3538
clap_complete = "4.6"
@@ -43,31 +46,32 @@ flate2 = "1.1"
4346
futures = { version = "0.3", default-features = false, features = ["alloc"] }
4447
futures-core = "0.3"
4548
hyper = { version = "1.8", features = ["server", "http1"] }
46-
hyper-util = { version = "0.1", features = ["service", "server", "tokio"] }
49+
hyper-util = { version = "0.1.20", features = ["service", "server", "tokio"] }
4750
indicatif = "0.18"
4851
indoc = "2.0"
4952
parquet = "58.0"
5053
prost = "^0.14"
5154
prost-types = "^0.14"
5255
pbjson = "^0.9"
5356
pbjson-types = "^0.9"
54-
pyo3 = { version = "0.28" }
57+
pyo3 = "0.28"
5558
pyo3-async-runtimes = { version = "0.28", features = ["tokio-runtime"] }
56-
pyo3-stub-gen = { version = "0.10" }
59+
pyo3-stub-gen = "0.10"
5760
rand = "0.10"
5861
reqwest = "0.13"
59-
serde = { version = "^1.0" }
60-
serde_json = { version = "^1.0" }
62+
rmcp = "1.7.0"
63+
serde = "^1.0"
64+
serde_json = "^1.0"
6165
tdms = "0.3.0"
6266
hdf5 = { package = "hdf5-metno", version = "0.12", features = ["static"] }
6367
tempdir = "0.3"
64-
tokio = { version = "1" }
68+
tokio = "1"
6569
tokio-stream = "0.1"
6670
toml = "0.9"
6771
tonic = { version = "^0.14", features = ["gzip"] }
6872
tonic-prost = "^0.14"
6973
tower = "^0.5"
70-
tracing = { version = "0.1" }
74+
tracing = "0.1"
7175
tracing-appender = "0.2"
7276
tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }
7377
tracing-test = { version = "0.2", features = ["no-env-filter"] }
@@ -79,6 +83,8 @@ sift_rs = { version = "0.9.1", path = "rust/crates/sift_rs" }
7983
sift_error = { version = "0.9.1", path = "rust/crates/sift_error" }
8084
sift_stream = { version = "0.9.1", path = "rust/crates/sift_stream" }
8185
sift_pbfs = { version = "0.9.1", path = "rust/crates/sift_pbfs" }
86+
sift_mcp = { version = "0.9.1", path = "rust/crates/sift_mcp" }
87+
sift_test_util = { version = "0.9.1", path = "rust/crates/sift_test_util" }
8288

8389
sift_stream_bindings = { version = "0.3.0", path = "rust/crates/sift_stream_bindings" }
8490

rust/crates/sift_mcp/Cargo.toml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
[package]
2+
name = "sift_mcp"
3+
authors.workspace = true
4+
version.workspace = true
5+
edition.workspace = true
6+
categories.workspace = true
7+
homepage.workspace = true
8+
repository.workspace = true
9+
keywords.workspace = true
10+
readme.workspace = true
11+
license.workspace = true
12+
13+
[dependencies]
14+
rmcp = { workspace = true, features = ["transport-io"] }
15+
serde.workspace = true
16+
serde_json.workspace = true
17+
sift_connect.workspace = true
18+
sift_rs.workspace = true
19+
sift_stream.workspace = true
20+
tokio.workspace = true
21+
tonic.workspace = true
22+
anyhow.workspace = true
23+
clap = { workspace = true, features = ["cargo"] }
24+
25+
[dev-dependencies]
26+
sift_test_util.workspace = true
27+
tokio-stream.workspace = true
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
use rmcp::model::{CallToolResult, ErrorCode};
2+
use serde_json::json;
3+
use tonic::{Code, Status};
4+
5+
pub type McpResult = Result<CallToolResult, rmcp::ErrorData>;
6+
7+
pub fn from_grpc_status(status: Status) -> rmcp::ErrorData {
8+
let code = from_grpc_code(status.code());
9+
let message = status.message().to_string();
10+
let data = Some(json!({
11+
"grpc_code": status.code().to_string(),
12+
}));
13+
14+
rmcp::ErrorData {
15+
code,
16+
message: message.into(),
17+
data,
18+
}
19+
}
20+
21+
pub fn from_anyhow(error: anyhow::Error) -> rmcp::ErrorData {
22+
let code = ErrorCode::INTERNAL_ERROR;
23+
let message = format!("{error:?}");
24+
25+
rmcp::ErrorData {
26+
code,
27+
message: message.into(),
28+
data: None,
29+
}
30+
}
31+
32+
fn from_grpc_code(code: Code) -> ErrorCode {
33+
match code {
34+
Code::InvalidArgument | Code::OutOfRange => ErrorCode::INVALID_PARAMS,
35+
Code::NotFound => ErrorCode::RESOURCE_NOT_FOUND,
36+
Code::Unimplemented => ErrorCode::METHOD_NOT_FOUND,
37+
_ => ErrorCode::INTERNAL_ERROR,
38+
}
39+
}

rust/crates/sift_mcp/src/lib.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
use anyhow::{Context, Result};
2+
use clap::{crate_name, crate_version};
3+
use rmcp::{ServiceExt, transport::stdio};
4+
use sift_rs::{Credentials, SiftChannelBuilder};
5+
6+
pub(crate) mod server;
7+
use server::SiftMcpServer;
8+
9+
pub mod tool;
10+
11+
mod error;
12+
13+
pub async fn run(credentials: Credentials, use_tls: bool) -> Result<()> {
14+
let channel = SiftChannelBuilder::new(credentials)
15+
.use_tls(use_tls)
16+
.user_agent(format!("{}/{}", crate_name!(), crate_version!()))
17+
.build()
18+
.context("failed to build gRPC channel to connect to Sift")?;
19+
20+
let service = SiftMcpServer::new(channel)
21+
.serve(stdio())
22+
.await
23+
.context("failed to start MCP server")?;
24+
25+
service
26+
.waiting()
27+
.await
28+
.context("MCP server terminated unexpectedly")?;
29+
30+
Ok(())
31+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
use rmcp::{ServerHandler, handler::server::tool::ToolRouter, tool_handler};
2+
use sift_rs::SiftChannel;
3+
4+
#[derive(Clone)]
5+
pub struct SiftMcpServer {
6+
pub(crate) channel: SiftChannel,
7+
pub tool_router: ToolRouter<Self>,
8+
}
9+
10+
#[tool_handler(
11+
router = self.tool_router,
12+
name = "SiftMcp",
13+
version = "0.1.0",
14+
instructions = "Sift MCP Server",
15+
)]
16+
impl ServerHandler for SiftMcpServer {}
17+
18+
impl SiftMcpServer {
19+
pub fn new(channel: SiftChannel) -> Self {
20+
// Add more routers here as new tool groups are introduced, e.g.
21+
// Self::resource_router().merge(Self::ingestion_router())
22+
23+
let tool_router = Self::resource_router();
24+
Self {
25+
channel,
26+
tool_router,
27+
}
28+
}
29+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod resource;
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
use anyhow::Context;
2+
use rmcp::{
3+
handler::server::wrapper::Parameters,
4+
model::CallToolResult,
5+
schemars::{self, JsonSchema},
6+
tool, tool_router,
7+
};
8+
use serde::Deserialize;
9+
use sift_rs::{
10+
assets::v1::{ListAssetsRequest, ListAssetsResponse, asset_service_client::AssetServiceClient},
11+
runs::v2::{ListRunsRequest, ListRunsResponse, run_service_client::RunServiceClient},
12+
};
13+
14+
use crate::{error, server::SiftMcpServer};
15+
16+
#[cfg(test)]
17+
mod test;
18+
19+
const PAGE_SIZE: u32 = 1000;
20+
21+
#[derive(Debug, Deserialize, JsonSchema)]
22+
pub struct GetParams {
23+
filter: String,
24+
limit: Option<u32>,
25+
}
26+
27+
#[tool_router(router = resource_router, vis = "pub(crate)")]
28+
impl SiftMcpServer {
29+
#[tool(
30+
name = "get_asset",
31+
description = "
32+
Retrieve and filter assets in Sift. The `filter` parameter is a Common Expression Language (CEL).
33+
Available fields to filter by are `asset_id`, `created_by_user_id`, `modified_by_user_id`,
34+
`created_date`, `modified_date`, `name`, 'name_lower', `tag_id`, `tag_name`, 'archived_date', `is_archived`, and `metadata`.
35+
Metadata can be used in filters by using `metadata.{metadata_key_name}` as the field name.
36+
",
37+
annotations(title = "Resource/get_asset", read_only_hint = true)
38+
)]
39+
pub async fn get_asset(&self, params: Parameters<GetParams>) -> error::McpResult {
40+
let Parameters(GetParams { filter, limit }) = params;
41+
let (page_size, record_limit) = paging(limit);
42+
43+
let mut client = AssetServiceClient::new(self.channel.clone());
44+
let mut page_token = String::new();
45+
let mut results = Vec::new();
46+
47+
loop {
48+
let resp = client
49+
.list_assets(ListAssetsRequest {
50+
filter: filter.clone(),
51+
page_size,
52+
page_token,
53+
..Default::default()
54+
})
55+
.await
56+
.map_err(error::from_grpc_status)?;
57+
58+
let ListAssetsResponse {
59+
assets,
60+
next_page_token,
61+
} = resp.into_inner();
62+
if assets.is_empty() {
63+
break;
64+
}
65+
results.extend(assets);
66+
67+
if results.len() >= record_limit || next_page_token.is_empty() {
68+
break;
69+
}
70+
page_token = next_page_token;
71+
}
72+
73+
results.truncate(record_limit);
74+
let out = serde_json::to_value(&results)
75+
.context("failed to serialize assets")
76+
.map_err(error::from_anyhow)?;
77+
78+
Ok(CallToolResult::structured(out))
79+
}
80+
81+
#[tool(
82+
name = "get_run",
83+
description = "
84+
Retrieve and filter runs in Sift. The `filter` parameter is a Common Expression Language (CEL).
85+
Available fields to filter by are `run_id` `organization_id`, `asset_id`, `asset_name`, `client_key`, `name`,
86+
`description`, `created_by_user_id`, `modified_by_user_id`, `created_date`, `modified_date`, `start_time`,
87+
`stop_time`, `tag_id`, `asset_tag_id`, `duration`, 'duration_string', `annotation_comments_count`, `annotation_state`,
88+
`archived_date`, `is_archived`, and `metadata`. Metadata can be used in filters by using
89+
`metadata.{metadata_key_name}` as the field name. `duration` is in the format of elapsed seconds and `duration_string`
90+
allows for `h`, `m`, `s`, `ms` suffixes (example: `duration_string > duration('10h')).
91+
",
92+
annotations(title = "Resource/get_run", read_only_hint = true)
93+
)]
94+
pub async fn get_run(&self, params: Parameters<GetParams>) -> error::McpResult {
95+
let Parameters(GetParams { filter, limit }) = params;
96+
let (page_size, record_limit) = paging(limit);
97+
98+
let mut client = RunServiceClient::new(self.channel.clone());
99+
let mut page_token = String::new();
100+
let mut results = Vec::new();
101+
102+
loop {
103+
let resp = client
104+
.list_runs(ListRunsRequest {
105+
filter: filter.clone(),
106+
page_size,
107+
page_token,
108+
..Default::default()
109+
})
110+
.await
111+
.map_err(error::from_grpc_status)?;
112+
113+
let ListRunsResponse {
114+
runs,
115+
next_page_token,
116+
} = resp.into_inner();
117+
if runs.is_empty() {
118+
break;
119+
}
120+
results.extend(runs);
121+
122+
if results.len() >= record_limit || next_page_token.is_empty() {
123+
break;
124+
}
125+
page_token = next_page_token;
126+
}
127+
128+
results.truncate(record_limit);
129+
let out = serde_json::to_value(&results)
130+
.context("failed to serialize runs")
131+
.map_err(error::from_anyhow)?;
132+
133+
Ok(CallToolResult::structured(out))
134+
}
135+
}
136+
137+
fn paging(limit: Option<u32>) -> (u32, usize) {
138+
match limit {
139+
Some(lim) if lim <= PAGE_SIZE => (lim, lim as usize),
140+
_ => (PAGE_SIZE, usize::MAX),
141+
}
142+
}

0 commit comments

Comments
 (0)