Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rust/crates/sift_cli/assets/docs/src/agents/mcp.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ not run interactively.
| `list_assets` | List assets, with filtering and ordering. |
| `list_runs` | List runs, with filtering and ordering. |
| `list_channels` | List channels for an asset. |
| `list_reports` | List reports, with filtering and ordering. |
| `get_data` | Download channel data for an asset/run to a Parquet file, with optional decimation. |
| `sql` | Run SQL over one or more Parquet files; chain after `get_data` for analysis. |
| `upload_dataset` | Stream a Parquet dataset into Sift. |
Expand Down
2 changes: 1 addition & 1 deletion rust/crates/sift_cli/assets/skills/agents-md/AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ to combine them when working with Sift.

1. **Sift MCP server** — started by `sift-cli mcp`. The preferred surface for
agents. Exposes structured, authenticated tools:
- `list_assets`, `list_runs`, `list_channels`: discover what exists.
- `list_assets`, `list_runs`, `list_channels`, `list_reports`: discover what exists.
- `get_data`: download channel data for an asset/run to a Parquet file.
- `sql`: run SQL over one or more Parquet files (chain after `get_data`).
- `upload_dataset`: stream a Parquet dataset into Sift.
Expand Down
2 changes: 1 addition & 1 deletion rust/crates/sift_cli/assets/skills/claude-code/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ to combine them when working with Sift.

1. **Sift MCP server** — started by `sift-cli mcp`. The preferred surface for
agents. Exposes structured, authenticated tools:
- `list_assets`, `list_runs`, `list_channels`: discover what exists.
- `list_assets`, `list_runs`, `list_channels`, `list_reports`: discover what exists.
- `get_data`: download channel data for an asset/run to a Parquet file.
- `sql`: run SQL over one or more Parquet files (chain after `get_data`).
- `upload_dataset`: stream a Parquet dataset into Sift.
Expand Down
5 changes: 4 additions & 1 deletion rust/crates/sift_mcp/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use sift_rs::SiftChannel;

use crate::service::{
assets::AssetService, channels::ChannelService, data::DataService, ingest::IngestService,
runs::RunService,
reports::ReportService, runs::RunService,
};

#[derive(Clone)]
Expand All @@ -24,6 +24,7 @@ pub struct SiftMcpServer {
pub data_service: DataService,
pub ingest_service: IngestService,
pub run_service: RunService,
pub report_service: ReportService,
}

#[tool_handler(
Expand All @@ -49,13 +50,15 @@ impl SiftMcpServer {
let channel_service = ChannelService::new(channel.clone());
let ingest_service = IngestService::new(channel.clone());
let run_service = RunService::new(channel.clone());
let report_service = ReportService::new(channel.clone());

Self {
asset_service,
channel_service,
data_service,
ingest_service,
run_service,
report_service,
tool_router,
prompt_router,
}
Expand Down
1 change: 1 addition & 0 deletions rust/crates/sift_mcp/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod assets;
pub mod channels;
pub mod data;
pub mod ingest;
pub mod reports;
pub mod runs;

pub(crate) mod common;
67 changes: 67 additions & 0 deletions rust/crates/sift_mcp/src/service/reports/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use crate::service::common;
use anyhow::{Context, Result};
use sift_rs::{
SiftChannel,
reports::v1::{
ListReportsRequest, ListReportsResponse, Report, report_service_client::ReportServiceClient,
},
};

#[cfg(test)]
mod test;

#[derive(Clone)]
pub struct ReportService {
channel: SiftChannel,
}

impl ReportService {
pub fn new(channel: SiftChannel) -> Self {
Self { channel }
}

pub async fn list_reports(
&self,
filter: String,
order_by: Option<String>,
limit: Option<u32>,
organization_id: Option<String>,
) -> Result<Vec<Report>> {
let (page_size, record_limit) = common::paging(limit);

let mut client = ReportServiceClient::new(self.channel.clone());
let mut page_token = String::new();
let mut results = Vec::new();

loop {
let resp = client
.list_reports(ListReportsRequest {
filter: filter.clone(),
page_size,
page_token,
order_by: order_by.clone().unwrap_or_default(),
organization_id: organization_id.clone().unwrap_or_default(),
})
.await
.context("failed to query reports")?;

let ListReportsResponse {
reports,
next_page_token,
} = resp.into_inner();
if reports.is_empty() {
break;
}
results.extend(reports);

if results.len() >= record_limit || next_page_token.is_empty() {
break;
}
page_token = next_page_token;
}

results.truncate(record_limit);

Ok(results)
}
}
235 changes: 235 additions & 0 deletions rust/crates/sift_mcp/src/service/reports/test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
use sift_rs::reports::v1::{
ListReportsResponse, Report, report_service_server::ReportServiceServer,
};
use sift_test_util::{grpc::memory_sift_channel, mock::reports::v1::MockReportServiceImpl};
use tokio::task::JoinHandle;
use tonic::{Response, Status, transport::Server};

use super::ReportService;
use crate::service::common::PAGE_SIZE;

async fn service_with_mock(mock: MockReportServiceImpl) -> (ReportService, JoinHandle<()>) {
let (client, server) = tokio::io::duplex(1024);
let channel = memory_sift_channel(client).await;

let handle = tokio::spawn(async move {
Server::builder()
.add_service(ReportServiceServer::new(mock))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.unwrap();
});

(ReportService::new(channel), handle)
}

#[tokio::test]
async fn list_reports_returns_single_page() {
let mut mock = MockReportServiceImpl::new();
mock.expect_list_reports()
.withf(|req| req.get_ref().filter == "name == \"nightly\"")
.returning(|_| {
Ok(Response::new(ListReportsResponse {
reports: vec![Report {
report_id: "rep1".into(),
name: "nightly".into(),
..Default::default()
}],
next_page_token: String::new(),
}))
});

let (service, _h) = service_with_mock(mock).await;

let reports = service
.list_reports("name == \"nightly\"".to_string(), None, None, None)
.await
.expect("list_reports failed");

assert_eq!(reports.len(), 1);
assert_eq!(reports[0].report_id, "rep1");
}

#[tokio::test]
async fn list_reports_forwards_organization_id() {
let mut mock = MockReportServiceImpl::new();
mock.expect_list_reports()
.withf(|req| req.get_ref().organization_id == "org-123")
.returning(|_| {
Ok(Response::new(ListReportsResponse {
reports: vec![Report {
report_id: "rep1".into(),
..Default::default()
}],
next_page_token: String::new(),
}))
});

let (service, _h) = service_with_mock(mock).await;

let reports = service
.list_reports(String::new(), None, None, Some("org-123".to_string()))
.await
.expect("list_reports failed");

assert_eq!(reports.len(), 1);
}

#[tokio::test]
async fn list_reports_paginates_until_token_empty() {
let mut mock = MockReportServiceImpl::new();
mock.expect_list_reports().returning(|req| {
let req = req.into_inner();
assert_eq!(req.page_size, PAGE_SIZE);
let (reports, next) = match req.page_token.as_str() {
"" => (
vec![Report {
report_id: "rep1".into(),
..Default::default()
}],
"page-2".to_string(),
),
"page-2" => (
vec![Report {
report_id: "rep2".into(),
..Default::default()
}],
String::new(),
),
other => return Err(Status::invalid_argument(format!("bad token: {other}"))),
};
Ok(Response::new(ListReportsResponse {
reports,
next_page_token: next,
}))
});

let (service, _h) = service_with_mock(mock).await;

let reports = service
.list_reports(String::new(), None, None, None)
.await
.expect("list_reports failed");

let ids: Vec<&str> = reports.iter().map(|r| r.report_id.as_str()).collect();
assert_eq!(ids, vec!["rep1", "rep2"]);
}

#[tokio::test]
async fn list_reports_respects_limit() {
let mut mock = MockReportServiceImpl::new();
mock.expect_list_reports().times(1).returning(|req| {
let req = req.into_inner();
assert_eq!(req.page_size, 2);
Ok(Response::new(ListReportsResponse {
reports: vec![
Report {
report_id: "rep1".into(),
..Default::default()
},
Report {
report_id: "rep2".into(),
..Default::default()
},
],
next_page_token: "page-2".into(),
}))
});

let (service, _h) = service_with_mock(mock).await;

let reports = service
.list_reports(String::new(), None, Some(2), None)
.await
.expect("list_reports failed");

assert_eq!(reports.len(), 2);
}

#[tokio::test]
async fn list_reports_truncates_to_limit_across_pages() {
let mut mock = MockReportServiceImpl::new();
mock.expect_list_reports().returning(|req| {
let req = req.into_inner();
assert_eq!(req.page_size, 3);
let (reports, next) = match req.page_token.as_str() {
"" => (
vec![
Report {
report_id: "rep1".into(),
..Default::default()
},
Report {
report_id: "rep2".into(),
..Default::default()
},
],
"page-2".to_string(),
),
"page-2" => (
vec![
Report {
report_id: "rep3".into(),
..Default::default()
},
Report {
report_id: "rep4".into(),
..Default::default()
},
],
String::new(),
),
other => return Err(Status::invalid_argument(format!("bad token: {other}"))),
};
Ok(Response::new(ListReportsResponse {
reports,
next_page_token: next,
}))
});

let (service, _h) = service_with_mock(mock).await;

let reports = service
.list_reports(String::new(), None, Some(3), None)
.await
.expect("list_reports failed");

let ids: Vec<&str> = reports.iter().map(|r| r.report_id.as_str()).collect();
assert_eq!(ids, vec!["rep1", "rep2", "rep3"]);
}

#[tokio::test]
async fn list_reports_breaks_on_empty_page() {
let mut mock = MockReportServiceImpl::new();
mock.expect_list_reports().times(1).returning(|_| {
Ok(Response::new(ListReportsResponse {
reports: vec![],
next_page_token: "ignored".into(),
}))
});

let (service, _h) = service_with_mock(mock).await;

let reports = service
.list_reports(String::new(), None, None, None)
.await
.expect("list_reports failed");

assert!(reports.is_empty());
}

#[tokio::test]
async fn list_reports_propagates_grpc_error() {
let mut mock = MockReportServiceImpl::new();
mock.expect_list_reports()
.returning(|_| Err(Status::not_found("no such report")));

let (service, _h) = service_with_mock(mock).await;

let err = service
.list_reports(String::new(), None, None, None)
.await
.expect_err("expected error");

assert!(err.to_string().contains("failed to query reports"));
}
Loading
Loading