Skip to content

Commit 9abdd98

Browse files
committed
WIP
1 parent 532a234 commit 9abdd98

3 files changed

Lines changed: 175 additions & 1 deletion

File tree

rust/crates/sift_mcp/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ mod server;
77
use server::SiftMcpServer;
88

99
mod error;
10+
mod prompt;
1011
mod service;
1112
mod tool;
1213

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
use rmcp::{
2+
handler::server::wrapper::Parameters,
3+
model::{PromptMessage, PromptMessageRole},
4+
prompt, prompt_router,
5+
schemars::{self, JsonSchema},
6+
};
7+
use serde::Deserialize;
8+
9+
use crate::server::SiftMcpServer;
10+
11+
#[derive(Debug, Deserialize, JsonSchema)]
12+
pub struct ExploreAssetArgs {
13+
asset: String,
14+
}
15+
16+
#[derive(Debug, Deserialize, JsonSchema)]
17+
pub struct AnalyzeRunArgs {
18+
asset: String,
19+
run: String,
20+
channels: Option<String>,
21+
question: Option<String>,
22+
}
23+
24+
#[derive(Debug, Deserialize, JsonSchema)]
25+
pub struct DeriveAndUploadArgs {
26+
source_asset: String,
27+
source_run: String,
28+
transform: String,
29+
target_asset: Option<String>,
30+
target_run: Option<String>,
31+
}
32+
33+
#[prompt_router(vis = "pub(crate)")]
34+
impl SiftMcpServer {
35+
#[prompt(
36+
name = "explore_asset",
37+
description = "Discover a Sift asset along with its recent runs and channel inventory. Read-only starting point for a session."
38+
)]
39+
pub async fn explore_asset(&self, params: Parameters<ExploreAssetArgs>) -> Vec<PromptMessage> {
40+
let Parameters(ExploreAssetArgs { asset }) = params;
41+
42+
let body = format!(
43+
"Help the user explore a Sift asset and everything recorded against it. The user referred \
44+
to the asset as: \"{asset}\".\n\n\
45+
Use the Sift MCP tools as follows:\n\
46+
1. Resolve the asset with `list_assets`. Try an exact match first \
47+
(`name == \"{asset}\"`); if nothing comes back, fall back to a substring match \
48+
(`name.matches(\"{asset}\")`). When several assets match, ask the user which one they \
49+
mean before continuing.\n\
50+
2. List recent runs with `list_runs` filtered by `asset_id == \"<resolved asset_id>\"`, \
51+
ordered `start_time desc`. Run and channel namespaces are per-asset, so always scope by \
52+
`asset_id` instead of listing everything.\n\
53+
3. List channels with `list_channels` filtered by `asset_id == \"<resolved asset_id>\"`.\n\
54+
4. Summarize for the user: the resolved asset (name and id), its most recent runs (name, \
55+
start/stop, duration), and the channel inventory grouped by data type. Surface the exact \
56+
run and channel names so they can be reused with `get_data`.\n\n\
57+
This step is discovery only. Do not pull sample data."
58+
);
59+
60+
vec![PromptMessage::new_text(PromptMessageRole::User, body)]
61+
}
62+
63+
#[prompt(
64+
name = "analyze_run",
65+
description = "Pull a run's channel data and produce a per-channel statistical summary. Optionally targets specific channels and answers a question."
66+
)]
67+
pub async fn analyze_run(&self, params: Parameters<AnalyzeRunArgs>) -> Vec<PromptMessage> {
68+
let Parameters(AnalyzeRunArgs {
69+
asset,
70+
run,
71+
channels,
72+
question,
73+
}) = params;
74+
75+
let channels_line = match channels {
76+
Some(c) => format!("the following channels: {c}"),
77+
None => "all channels on the asset (you choose a sensible subset)".to_string(),
78+
};
79+
let question_line = match question {
80+
Some(q) => format!("Answer this question for the user: \"{q}\".\n"),
81+
None => String::new(),
82+
};
83+
84+
let body = format!(
85+
"Help the user analyze a single run on a Sift asset.\n\n\
86+
Asset: \"{asset}\"\nRun: \"{run}\"\nChannels: {channels_line}\n\n\
87+
{question_line}\
88+
Steps:\n\
89+
1. Resolve the asset and run. Use `list_assets` (`name == \"{asset}\"`) for the \
90+
asset_id, then `list_runs` (`name == \"{run}\" && asset_id == \"...\"`) to confirm the run.\n\
91+
2. Confirm the target channels exist with `list_channels` scoped by `asset_id`. If no \
92+
channels were named, choose a sensible set and tell the user which you picked.\n\
93+
3. Pull data with `get_data`. Pass `run_name` so the run's start/stop bounds apply \
94+
automatically; do not hand-compute timestamps. Use `channel_search.Names` with exact \
95+
channel names. Choose `sample_ms` to suit the run length: decimate (e.g. 100-1000 ms) \
96+
for long runs and use 0 only when raw fidelity is required. Write to a Parquet path in a \
97+
working directory.\n\
98+
4. Summarize with `sql` against the `get_data` output: per-channel row count, min/max/mean, \
99+
and null rate, plus anything needed to answer the user's question. Keep \
100+
`timestamp_unix_nanos` in the projection in case the result is uploaded later.\n\
101+
5. Report the findings and surface the Parquet paths so the work can be continued."
102+
);
103+
104+
vec![PromptMessage::new_text(PromptMessageRole::User, body)]
105+
}
106+
107+
#[prompt(
108+
name = "derive_and_upload",
109+
description = "Derive a new dataset from an existing run via SQL and upload it back to Sift. Confirms the write destination before uploading."
110+
)]
111+
pub async fn derive_and_upload(
112+
&self,
113+
params: Parameters<DeriveAndUploadArgs>,
114+
) -> Vec<PromptMessage> {
115+
let Parameters(DeriveAndUploadArgs {
116+
source_asset,
117+
source_run,
118+
transform,
119+
target_asset,
120+
target_run,
121+
}) = params;
122+
123+
let target_asset_line = match target_asset {
124+
Some(a) => format!("Target asset: \"{a}\""),
125+
None => {
126+
"Target asset: not given - propose a default and confirm with the user".to_string()
127+
}
128+
};
129+
let target_run_line = match target_run {
130+
Some(r) => format!("Target run: \"{r}\""),
131+
None => "Target run: not given - ask whether to create one".to_string(),
132+
};
133+
134+
let body = format!(
135+
"Help the user derive a new dataset from existing Sift data and upload it back to Sift. \
136+
The upload is a write, so confirm the destination before running it.\n\n\
137+
Source asset: \"{source_asset}\"\nSource run: \"{source_run}\"\n\
138+
Transform: \"{transform}\"\n{target_asset_line}\n{target_run_line}\n\n\
139+
Steps:\n\
140+
1. Resolve the source asset and run with `list_assets` and `list_runs`. Identify the \
141+
channels the transform needs via `list_channels` scoped by `asset_id`.\n\
142+
2. Extract with `get_data`, passing `run_name` so the run bounds apply. Choose \
143+
`channel_search.Names` and a `sample_ms` suited to the transform.\n\
144+
3. Apply the transform with `sql`. CRITICAL: column 0 of any dataset uploaded to Sift \
145+
MUST be `timestamp_unix_nanos` (Int64, non-null). Project it first in the SELECT and never \
146+
rename or drop it. For aggregations that collapse rows, bucket on a time expression \
147+
derived from `timestamp_unix_nanos` or emit `MIN(timestamp_unix_nanos)` so every output \
148+
row still carries a timestamp.\n\
149+
4. Before uploading, CONFIRM with the user: (a) the target `asset` (suggest \
150+
\"{source_asset}-derived\" or similar if none was given, but let them override), \
151+
(b) whether to create a `run_name` (required if any tags or metadata are wanted), and \
152+
(c) any tags or metadata to attach. Do not silently default these.\n\
153+
5. Upload with `upload_dataset`, passing the `sql` output as `input`. After it returns, \
154+
tell the user where the data landed and offer to verify via `list_runs` or \
155+
`list_channels`."
156+
);
157+
158+
vec![PromptMessage::new_text(PromptMessageRole::User, body)]
159+
}
160+
}

rust/crates/sift_mcp/src/server/mod.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,12 @@
1-
use rmcp::{ServerHandler, handler::server::tool::ToolRouter, tool_handler};
1+
use rmcp::{
2+
RoleServer, ServerHandler,
3+
handler::server::router::prompt::PromptRouter,
4+
handler::server::tool::ToolRouter,
5+
model::{GetPromptRequestParams, GetPromptResult, ListPromptsResult, PaginatedRequestParams},
6+
prompt_handler,
7+
service::RequestContext,
8+
tool_handler,
9+
};
210
use sift_rs::SiftChannel;
311

412
use crate::service::{
@@ -9,6 +17,7 @@ use crate::service::{
917
#[derive(Clone)]
1018
pub struct SiftMcpServer {
1119
pub tool_router: ToolRouter<Self>,
20+
pub prompt_router: PromptRouter<Self>,
1221

1322
pub asset_service: AssetService,
1423
pub channel_service: ChannelService,
@@ -23,6 +32,7 @@ pub struct SiftMcpServer {
2332
version = "0.1.0",
2433
instructions = "Sift MCP Server",
2534
)]
35+
#[prompt_handler(router = self.prompt_router)]
2636
impl ServerHandler for SiftMcpServer {}
2737

2838
impl SiftMcpServer {
@@ -32,6 +42,8 @@ impl SiftMcpServer {
3242
let mut tool_router = Self::list_router();
3343
tool_router.merge(Self::data_router());
3444

45+
let prompt_router = Self::prompt_router();
46+
3547
let asset_service = AssetService::new(channel.clone());
3648
let data_service = DataService::new(channel.clone());
3749
let channel_service = ChannelService::new(channel.clone());
@@ -45,6 +57,7 @@ impl SiftMcpServer {
4557
ingest_service,
4658
run_service,
4759
tool_router,
60+
prompt_router,
4861
}
4962
}
5063
}

0 commit comments

Comments
 (0)