diff --git a/core/mcp_server.py b/core/mcp_server.py index f39a915..b7f9fbb 100644 --- a/core/mcp_server.py +++ b/core/mcp_server.py @@ -17,6 +17,15 @@ logger = logging.getLogger(__name__) +class MethodNotFoundError(Exception): + """Raised when a JSON-RPC request names a method the server does not implement. + + Mapped to JSON-RPC error code -32601 ("Method not found") rather than the + generic -32603 ("Internal error"), so well-behaved clients can tell an + unknown method apart from a genuine server-side failure. + """ + + class MCPServer: """MCP Server that handles JSON-RPC requests and routes to Plugin Manager.""" @@ -86,7 +95,7 @@ async def handle_request(self, request: Dict[str, Any]) -> Optional[Dict[str, An }, ) return None - raise ValueError(f"Unknown method: {method}") + raise MethodNotFoundError(f"Unknown method: {method}") # Don't send response for notifications if is_notification: @@ -122,14 +131,24 @@ async def handle_request(self, request: Dict[str, Any]) -> Optional[Dict[str, An except Exception as e: duration_ms = (time.perf_counter() - start_time) * 1000 - error_response = { - "jsonrpc": "2.0", - "id": request_id, - "error": { + # Unknown methods are a client error (-32601 Method not found), not a + # server fault (-32603 Internal error). Everything else is -32603. + if isinstance(e, MethodNotFoundError): + error = { + "code": -32601, + "message": "Method not found", + "data": str(e), + } + else: + error = { "code": -32603, "message": "Internal error", "data": str(e), - }, + } + error_response = { + "jsonrpc": "2.0", + "id": request_id, + "error": error, } # Log JSON-RPC error response diff --git a/plugins/ckan/plugin.py b/plugins/ckan/plugin.py index 626413f..7af13f6 100644 --- a/plugins/ckan/plugin.py +++ b/plugins/ckan/plugin.py @@ -3,7 +3,10 @@ This plugin provides access to CKAN-based open data portals. """ +import difflib import logging +import re +from datetime import datetime, timezone from typing import Any, Dict, List, Optional, Tuple import httpx @@ -18,6 +21,71 @@ from plugins.ckan.config_schema import CKANPluginConfig from plugins.ckan.sql_validator import SafeSQLBuilder, SQLValidator + +# Datasets edited longer ago than this are flagged with a DATA FRESHNESS +# caveat -- useful when the model is about to call a 4-year-old snapshot +# "current". +_STALE_DATASET_DAYS = 365 + +# Tunes when the SMALL SAMPLE banner fires: any total in (1, _SMALL_SAMPLE_MAX] +# triggers it. Single-record (N=1) gets its own, stronger banner. +_SMALL_SAMPLE_MAX = 10 + +# Used by _params_repr to keep the echoed-Query line from being dominated +# by a multi-line SQL string. Long values are tail-truncated. +_PARAMS_REPR_MAX = 200 + +# Copilot C8: default response size should be small. Copilot truncates +# long tool responses or streams them slowly; the model summarizes 20 +# records well more reliably than 100 records badly. Caller can always +# pass a higher `limit` explicitly. +DEFAULT_QUERY_LIMIT = 20 +DEFAULT_SEARCH_LIMIT = 10 + +# A field whose values are mostly NULL-like (real null, "", "N/A", +# "Unknown", "None", or the string "NULL") at or above this share triggers +# a DATA QUALITY caveat (civic-AI #11). Tuned so a column with the odd +# missing value stays silent but a column that's mostly empty fires. +_NULL_LIKE_FREQ_THRESHOLD = 0.20 + +# Values that look semantically null even though they're real strings. +# CKAN datasets put any of these in a column to mean "missing" -- the +# model otherwise treats "Unknown" as a meaningful category (civic-AI #10). +_NULL_LIKE_STRINGS = frozenset({"", "n/a", "na", "unknown", "none", "null", "-", "--"}) + +# Resource is considered "old" when its frequency is unset and last +# modified more than this many days ago (civic-AI #6). +_NO_FREQUENCY_OLD_DAYS = 730 # 2 years + +# Multiplier for the abandonment detector (civic-AI #5): a resource is +# flagged if its last_modified is older than this many times the stated +# update_frequency interval (e.g. weekly cadence + 4x stale = >28d). +_ABANDONMENT_INTERVAL_MULTIPLIER = 4 + +# Approximate day-count for each declared update frequency. Keys are +# lowercased, normalized substrings; the first matching substring of the +# dataset's stated frequency wins. See _frequency_days(). +_FREQUENCY_DAYS: Dict[str, int] = { + "real-time": 1, + "realtime": 1, + "daily": 1, + "weekly": 7, + "biweekly": 14, + "bi-weekly": 14, + "fortnight": 14, + "semi-month": 15, + "semimonth": 15, + "monthly": 30, + "bimonth": 60, + "bi-month": 60, + "quarter": 91, + "semi-annual": 182, + "semiannual": 182, + "biannual": 182, + "annual": 365, + "yearly": 365, +} + logger = logging.getLogger(__name__) @@ -88,7 +156,7 @@ async def shutdown(self) -> None: def _is_queryable(resource: Dict[str, Any]) -> bool: """A CKAN resource is queryable via datastore_search only if it has been loaded into CKAN's Postgres datastore. Boston attaches each - dataset as 5–7 download-only resources (GeoJSON, KML, SHP, ...) plus + dataset as 5-7 download-only resources (GeoJSON, KML, SHP, ...) plus a single CSV that's actually loaded; only that one returns rows.""" return bool(resource.get("datastore_active")) @@ -173,29 +241,30 @@ def get_tools(self) -> List[ToolDefinition]: Returns: List of tool definitions + + Note on Copilot B3 (enum schemas): every free-text string + parameter on these tools is either an opaque CKAN identifier + (resource UUID, dataset slug) or free user input (search + keyword, SQL). The fixed-vocabulary surfaces that exist on this + plugin -- WHERE operators, aggregate function names, ORDER BY + direction -- are all nested inside structured `where` / `metrics` + / `order_by` strings rather than top-level params, so JSON + Schema enum doesn't apply cleanly. The hallucination guard + that enum would have provided is delivered instead by the + pre-flight field-name validator and the SafeSQLBuilder + allowlist (see plugins/ckan/sql_validator.py). """ city = self.plugin_config.city_name return [ ToolDefinition( name="search_datasets", description=( - f"Search for datasets in {city}'s open data portal by keyword.\n\n" - "Returns a list of CKAN datasets. Each dataset contains a " - "`resources` array; each resource has its own `id` (a UUID) " - "that identifies a queryable table.\n\n" - "Next step:\n" - " - EASIEST: if you just want data rows, call " - "`ckan__search_and_query` with the same query — it combines " - "search + query in one call.\n" - " - Otherwise: pick a resource from `resources[].id` in the " - "response and call `ckan__query_data` with that value as " - "`resource_id`.\n" - " - To inspect a dataset's resources first, call " - "`ckan__get_dataset` with `dataset_id` set to the dataset's " - "`id` or `name`.\n\n" - "The formatted response surfaces a `suggested_resource_id` " - "and `suggested_next_tool` line at the top — read those to " - "pick the next call." + f"Find datasets in {city}'s open data portal by keyword. " + "If you want data rows, prefer ckan__search_and_query " + "(combines search + query in one call). Use this tool " + "when you need to list or compare candidate datasets. " + "Returns dataset metadata; the response highlights the " + "best queryable resource_id." ), input_schema={ "type": "object", @@ -212,8 +281,11 @@ def get_tools(self) -> List[ToolDefinition]: }, "limit": { "type": "integer", - "description": "Maximum number of datasets to return (default: 20).", - "default": 20, + "description": ( + f"Maximum number of datasets to return " + f"(default: {DEFAULT_SEARCH_LIMIT})." + ), + "default": DEFAULT_SEARCH_LIMIT, }, }, "required": ["query"], @@ -222,15 +294,12 @@ def get_tools(self) -> List[ToolDefinition]: ToolDefinition( name="get_dataset", description=( - f"Get full metadata for one dataset in {city}'s open data " - "portal, including its `resources` array.\n\n" - "Use this to find the resource UUIDs needed by " - "`ckan__query_data`, `ckan__get_schema`, " - "`ckan__aggregate_data`, or `ckan__execute_sql`. The " - "response lists each resource with its `Resource ID` " - "(a UUID).\n\n" - "Next step: call `ckan__query_data` with `resource_id` set " - "to one of the `Resource ID` values from this response." + f"Get full metadata for one {city} dataset, including " + "its resources (each with a queryable resource_id " + "UUID). Use this to discover resource UUIDs needed by " + "ckan__query_data / ckan__aggregate_data / " + "ckan__execute_sql, and to see the data freshness / " + "update-cadence caveats for the dataset." ), input_schema={ "type": "object", @@ -250,43 +319,14 @@ def get_tools(self) -> List[ToolDefinition]: ToolDefinition( name="query_data", description=( - f"Query rows from a specific resource in {city}'s open " - "data portal.\n\n" - "The `resource_id` parameter is a CKAN resource UUID — " - "NOT a dataset ID. Get one by first calling " - "`ckan__search_datasets` or `ckan__get_dataset` and " - "reading the `id` inside the `resources` array.\n\n" - "IMPORTANT: only resources with `datastore_active=true` " - "are queryable here. Boston datasets typically attach " - "5–7 resources (GeoJSON, KML, SHP, PDF, ArcGIS REST, " - "CSV) but only the CSV one is loaded into the " - "datastore. If you call this tool with a download-only " - "resource UUID it will return 404. The output of " - "`ckan__search_datasets` and `ckan__get_dataset` " - "labels resources as QUERYABLE or DOWNLOAD-ONLY — pick " - "the QUERYABLE one.\n\n" - "FILTERING — pick the right knob:\n" - " - `filters` is EQUALITY ONLY (case_status='Closed'). " - "Cannot do dates, ranges, BETWEEN, IN, LIKE, or any " - "comparison. A timestamp column will NEVER match an " - "equality filter on a date string like '2026-04-29'.\n" - " - `where` is structured comparison. Use this for " - "date ranges, numeric bounds, IN-lists, LIKE/ILIKE, " - "or NULL checks. Example for 'closed on 2026-04-29':\n" - " where = {\"close_date\": {\"gte\": " - "\"2026-04-29\", \"lt\": \"2026-04-30\"}, " - "\"case_status\": \"Closed\"}\n" - " - For window functions, CTEs, joins, or anything " - "the structured `where` can't express, use " - "`ckan__execute_sql` instead.\n\n" - "Note: `query` arguments on search tools match dataset " - "metadata (titles/tags), NOT row content. To filter " - "ROWS by date/status/etc., use `where` here or in " - "`ckan__search_and_query`.\n\n" - "Tip: if you only have a keyword and no resource_id " - "yet, use `ckan__search_and_query` instead — it does " - "the lookup and the data fetch in a single call and " - "auto-picks the datastore-loaded resource." + f"Query rows from a {city} resource. CASE-SENSITIVE " + "field names -- typos are rejected with a 'did you " + "mean' suggestion. `resource_id` is the UUID from " + "ckan__search_datasets / ckan__get_dataset, NOT a " + "dataset ID. Use `filters` for equality (status='Open') " + "and `where` for ranges/dates/IN/LIKE. If you only " + "have a keyword and no resource_id, use " + "ckan__search_and_query instead." ), input_schema={ "type": "object", @@ -294,45 +334,50 @@ def get_tools(self) -> List[ToolDefinition]: "resource_id": { "type": "string", "description": ( - "CKAN resource UUID (36-char, e.g. " - "'11111111-2222-3333-4444-555555555555'). " - "Provenance: the `id` field inside the " - "`resources` array returned by " - "`ckan__search_datasets` or " - "`ckan__get_dataset`. This is NOT a dataset ID." + "CKAN resource UUID (36-char " + "hex+hyphen format). Get one from " + "ckan__search_datasets or " + "ckan__get_dataset; this is NOT a " + "dataset ID. Do NOT invent or guess " + "this value." ), }, "filters": { "type": "object", "description": ( - "EQUALITY-ONLY filters as field:value " - "pairs (e.g. {\"status\": \"Open\"}). For " - "ranges/dates/IN/LIKE, use `where` " - "instead — `filters` cannot express " - "anything other than exact equality." + "Equality-only row filters as a JSON " + "object of field -> value. For range, " + "date, IN, or LIKE comparisons use " + "`where` instead; `filters` cannot " + "express anything other than exact " + "equality." ), }, "where": { "type": "object", "description": ( - "Structured WHERE clause supporting " - "comparison operators. Each entry is " - "either {field: scalar} (equality) or " - "{field: {op: value, ...}} where op is " - "one of: eq, ne, gt, gte, lt, lte, in, " - "not_in, like, ilike, is_null. Example " - "for 'closed on 2026-04-29': " - "{\"close_date\": {\"gte\": " - "\"2026-04-29\", \"lt\": \"2026-04-30\"}, " - "\"case_status\": \"Closed\"}. The " - "schema footer in any prior query result " - "lists available column names and types." + "Structured WHERE clause as a JSON " + "object of field -> spec. A spec is " + "either a scalar (equality) or " + "{op: value} where `op` is one of: " + "eq, ne, gt, gte, lt, lte, in, " + "not_in, like, ilike, is_null. Field " + "names are case-sensitive; get them " + "from ckan__get_schema or from the " + "'Filterable columns' footer of a " + "prior query." ), }, "limit": { "type": "integer", - "description": "Maximum number of records (default: 100).", - "default": 100, + "description": ( + f"Maximum number of records (default: " + f"{DEFAULT_QUERY_LIMIT}). Ask for a higher " + "limit when the user explicitly wants more " + "than ~20 records or you need a larger " + "sample to summarize." + ), + "default": DEFAULT_QUERY_LIMIT, }, }, "required": ["resource_id"], @@ -341,15 +386,12 @@ def get_tools(self) -> List[ToolDefinition]: ToolDefinition( name="get_schema", description=( - f"Get the schema (field names and types) for a resource " - f"in {city}'s open data portal.\n\n" - "Call this BEFORE `ckan__aggregate_data` or " - "`ckan__execute_sql` so you know the exact field names " - "to reference in `group_by`, `metrics`, SELECT, or WHERE " - "clauses.\n\n" - "Next step: pass the field names you discover to " - "`ckan__aggregate_data` (in `group_by` / `metrics`) or " - "to `ckan__execute_sql`." + f"Get the case-sensitive field names + types for a " + f"{city} resource. Call before ckan__aggregate_data " + "or ckan__execute_sql so you reference real columns. " + "Note: most CKAN columns are TEXT even when the " + "values are dates or numbers -- watch for type-note " + "warnings in query responses." ), input_schema={ "type": "object", @@ -357,10 +399,9 @@ def get_tools(self) -> List[ToolDefinition]: "resource_id": { "type": "string", "description": ( - "CKAN resource UUID. Provenance: the `id` " - "inside the `resources` array returned by " - "`ckan__search_datasets` or " - "`ckan__get_dataset`." + "CKAN resource UUID. Get one from " + "ckan__search_datasets or " + "ckan__get_dataset; do NOT invent." ), }, }, @@ -370,35 +411,17 @@ def get_tools(self) -> List[ToolDefinition]: ToolDefinition( name="execute_sql", description=( - f"Execute a raw PostgreSQL SELECT query against " - f"{city}'s CKAN datastore.\n\n" - "⚠️ Use this only when the structured `where` " - "argument on `ckan__query_data` / " - "`ckan__search_and_query` cannot express your filter " - "(e.g. window functions, CTEs, joins, aggregations " - "beyond ckan__aggregate_data).\n\n" - "Security: Only SELECT allowed. INSERT/UPDATE/DELETE " - "blocked.\n\n" - "Concrete examples:\n" - "- Closed on a specific date:\n" - " SELECT * FROM \"\" WHERE " - "close_date >= '2026-04-29' AND close_date < " - "'2026-04-30' AND case_status = 'Closed' LIMIT 100\n" - "- Counts by day:\n" - " SELECT date_trunc('day', close_date) AS d, " - "COUNT(*) FROM \"\" GROUP BY d " - "ORDER BY d DESC LIMIT 30\n" - "- Window functions: RANK() OVER (...)\n" - "- CTEs: WITH subquery AS (...)\n\n" - "Resource IDs must be double-quoted: " - "FROM \"uuid-here\"\n\n" - "Prerequisites:\n" - " - resource UUID for the FROM clause: get from " - "`ckan__search_datasets` or `ckan__get_dataset`.\n" - " - field names: get from `ckan__get_schema`, or " - "the 'Filterable columns' footer of any prior " - "successful `ckan__query_data` / " - "`ckan__search_and_query` call." + f"LAST RESORT: raw PostgreSQL SELECT against {city}'s " + "CKAN datastore. Try ckan__query_data and " + "ckan__aggregate_data first -- only use this for " + "joins, CTEs, window functions, or anything the " + "structured tools can't express. Field names and " + "resource UUIDs must come from ckan__get_schema / " + "ckan__get_dataset, not guesses. Only SELECT allowed; " + "resource UUIDs in FROM must be double-quoted " + '(FROM "uuid"). Responses carry a SQL PASSTHROUGH ' + "warning so you can confirm the generated SQL matched " + "the user's actual question." ), input_schema={ "type": "object", @@ -414,21 +437,14 @@ def get_tools(self) -> List[ToolDefinition]: ToolDefinition( name="aggregate_data", description=( - f"Aggregate data with GROUP BY from {city}'s open data " - "portal.\n\n" - "Prerequisites:\n" - " - `resource_id`: get from `ckan__search_datasets` / " - "`ckan__get_dataset` (the `id` inside the `resources` " - "array).\n" - " - field names for `group_by` / `metrics`: get from " - "`ckan__get_schema`.\n\n" - "Examples:\n" - '- Count by field: group_by=["neighborhood"], ' - 'metrics={"count": "count(*)"}\n' - '- Multiple metrics: metrics={"total": "count(*)", ' - '"avg": "avg(field)"}\n' - '- With filters: filters={"status": "Open"}\n\n' - "Supports: count(*), sum(), avg(), min(), max(), stddev()." + f"GROUP BY + counts/sums/avgs on a {city} resource. " + "CASE-SENSITIVE field names; typos rejected with a " + "'did you mean' hint. resource_id from " + "ckan__search_datasets / ckan__get_dataset; field " + "names from ckan__get_schema. Example: " + 'group_by=["neighborhood"], ' + 'metrics={"n": "count(*)"}. Supports count(*), ' + "sum, avg, min, max, stddev." ), input_schema={ "type": "object", @@ -436,25 +452,57 @@ def get_tools(self) -> List[ToolDefinition]: "resource_id": { "type": "string", "description": ( - "CKAN resource UUID. Provenance: the `id` " - "inside the `resources` array returned by " - "`ckan__search_datasets` or " - "`ckan__get_dataset`." + "CKAN resource UUID. Get one from " + "ckan__search_datasets or " + "ckan__get_dataset; do NOT invent." ), }, "group_by": { "type": "array", "items": {"type": "string"}, - "description": "Field names to group by. Get exact names from `ckan__get_schema`.", + "description": ( + "Field names to group by. " + "CASE-SENSITIVE; get exact names " + "from ckan__get_schema." + ), }, "metrics": { "type": "object", - "description": "Map of alias -> aggregate expression, e.g. {\"count\": \"count(*)\"}.", + "description": ( + "JSON object of alias -> aggregate " + "expression. Supported aggregates: " + "count(*), count(field), sum(field), " + "avg(field), min(field), max(field), " + "stddev(field). Field names are " + "CASE-SENSITIVE; get them from " + "ckan__get_schema." + ), + }, + "filters": { + "type": "object", + "description": ( + "Equality-only row filters as a JSON " + "object of field -> value." + ), + }, + "having": { + "type": "object", + "description": ( + "HAVING clause: numeric thresholds on " + "aggregate expressions." + ), + }, + "order_by": { + "type": "string", + "description": ( + "Field or alias to ORDER BY; suffix " + "with ' DESC' for descending." + ), + }, + "limit": { + "type": "integer", + "default": DEFAULT_QUERY_LIMIT, }, - "filters": {"type": "object"}, - "having": {"type": "object"}, - "order_by": {"type": "string"}, - "limit": {"type": "integer", "default": 100}, }, "required": ["resource_id", "metrics"], }, @@ -462,49 +510,14 @@ def get_tools(self) -> List[ToolDefinition]: ToolDefinition( name="search_and_query", description=( - f"ONE-CALL keyword-to-data for {city}'s open data " - "portal: searches for the best-matching dataset and " - "immediately returns rows from its first datastore-" - "loaded resource — no tool chaining required.\n\n" - "Use this when you have a keyword (e.g. " - "'311 service requests', 'parks', 'building permits') " - "and want actual data rows. It combines " - "`ckan__search_datasets` + `ckan__query_data` into a " - "single server-side step, so you do NOT need to " - "extract a resource_id from a previous response.\n\n" - "Auto-picks the right resource: Boston datasets " - "typically attach 5–7 resources (GeoJSON, KML, SHP, " - "PDF, ArcGIS REST, CSV) but only the CSV is loaded " - "into the queryable datastore. This tool walks the " - "search results and skips datasets / resources that " - "aren't datastore-active, so you don't get a 404 " - "from a download-only resource.\n\n" - "WHAT `query` MEANS: `query` matches dataset metadata " - "(title, tags, description) — it does NOT filter ROWS. " - "If the user asks for '311 requests closed on 4/29', " - "the `query` finds the 311 dataset and `where` does " - "the row filtering:\n" - " query=\"311\", where={\"close_date\": {\"gte\": " - "\"2026-04-29\", \"lt\": \"2026-04-30\"}, " - "\"case_status\": \"Closed\"}\n\n" - "Returns: data rows from the chosen dataset's chosen " - "resource, plus a 'Filterable columns' footer listing " - "the schema (so you can refine with `where` or pivot " - "to `ckan__execute_sql` for joins/CTEs/window funcs), " - "an 'Other queryable resources in this dataset' " - "block listing siblings (e.g. per-year archives), " - "and a header showing which dataset and resource " - "were used.\n\n" - "MULTI-RESOURCE DATASETS: a single dataset can hold " - "many queryable resources. Boston's 311 dataset has " - "22 (a rolling 'NEW SYSTEM' view plus per-year " - "archives 2011–2026). Use `resource_name` to pick a " - "specific one — e.g. resource_name=\"2020\" picks " - "'311 Service Requests - 2020'. If you don't pass " - "`resource_name`, the first datastore-loaded " - "resource is used (which is typically the rolling " - "current view, NOT historical archives — so older " - "questions need `resource_name`)." + f"START HERE for {city} data: one call from keyword " + "to rows. `query` matches dataset TITLE/TAGS, NOT " + "row content; use `where` to filter rows (dates, " + "ranges, IN/LIKE). Auto-picks the queryable resource. " + "Multi-archive datasets (e.g. per-year 311 archives): " + 'pass `resource_name="2020"` to pick a specific ' + "year, or include_resource_totals=true for an " + "across-resources count breakdown." ), input_schema={ "type": "object", @@ -522,30 +535,34 @@ def get_tools(self) -> List[ToolDefinition]: }, "limit": { "type": "integer", - "description": "Maximum number of data rows to return (default: 100).", - "default": 100, + "description": ( + f"Maximum number of data rows to return " + f"(default: {DEFAULT_QUERY_LIMIT})." + ), + "default": DEFAULT_QUERY_LIMIT, }, "filters": { "type": "object", "description": ( - "EQUALITY-ONLY row filters (e.g. " - "{\"case_status\": \"Closed\"}). For " - "ranges/dates/IN/LIKE, use `where` " - "instead." + "Equality-only row filters as a JSON " + "object of field -> value. For range, " + "date, IN, or LIKE comparisons use " + "`where` instead." ), }, "where": { "type": "object", "description": ( - "Structured WHERE clause for ranges, " - "dates, IN, LIKE, NULL checks. Each " - "entry is {field: scalar} (equality) or " - "{field: {op: value, ...}} where op is " - "one of: eq, ne, gt, gte, lt, lte, in, " - "not_in, like, ilike, is_null. The " - "right knob for 'closed on 2026-04-29': " - "{\"close_date\": {\"gte\": " - "\"2026-04-29\", \"lt\": \"2026-04-30\"}}." + "Structured WHERE clause as a JSON " + "object of field -> spec for range, " + "date, IN, LIKE, or NULL checks. A " + "spec is a scalar (equality) or " + "{op: value} where `op` is one of: " + "eq, ne, gt, gte, lt, lte, in, " + "not_in, like, ilike, is_null. Field " + "names are case-sensitive; get them " + "from a prior query's 'Filterable " + "columns' footer." ), }, "dataset_index": { @@ -563,7 +580,7 @@ def get_tools(self) -> List[ToolDefinition]: "Which resource within the chosen dataset " "to query. If omitted, auto-picks the " "first datastore_active resource (Boston " - "datasets typically attach 5–7 resources " + "datasets typically attach 5-7 resources " "but only the CSV is queryable). " "`resource_name` takes precedence." ), @@ -579,8 +596,8 @@ def get_tools(self) -> List[ToolDefinition]: "archives '311 Service Requests - 2020', " "'... - 2021', etc., plus a rolling " "'NEW SYSTEM'). Examples: " - "resource_name=\"2020\" picks the 2020 " - "archive; resource_name=\"NEW SYSTEM\" " + 'resource_name="2020" picks the 2020 ' + 'archive; resource_name="NEW SYSTEM" ' "picks the rolling current view. The " "alternates list in any prior " "search_and_query response shows " @@ -588,6 +605,25 @@ def get_tools(self) -> List[ToolDefinition]: "`resource_index`." ), }, + "include_resource_totals": { + "type": "boolean", + "description": ( + "When true, runs COUNT(*) in parallel " + "against every queryable resource in the " + "matched dataset and surfaces per-resource " + "totals + a grand total in the response. " + "Use this for dataset-wide counting " + "questions ('total 311 requests ever', " + "'how many permits across all years'). " + "If a `where` clause is set, it's applied " + "to each resource -- be aware schemas can " + "differ across per-year archives, so a " + "where clause built for one resource may " + "fail on others (those resources will " + "show n=null). Default false (one query, " + "fast path)." + ), + }, }, "required": ["query"], }, @@ -609,16 +645,20 @@ async def execute_tool( try: if tool_name == "search_datasets": query = arguments.get("query", "") - limit = arguments.get("limit", 20) - datasets, total = await self._search_datasets_with_count( - query, limit + limit = arguments.get("limit", DEFAULT_SEARCH_LIMIT) + datasets, total = await self._search_datasets_with_count(query, limit) + text = self._format_search_results(datasets, total=total, limit=limit) + summary = ( + f"{self.plugin_config.portal_url.rstrip('/')} search for {query!r}" ) return ToolResult( content=[ { "type": "text", - "text": self._format_search_results( - datasets, total=total, limit=limit + "text": self._wrap_response( + text, + source_summary=summary, + calls=[("package_search", {"q": query, "rows": limit})], ), } ], @@ -634,11 +674,18 @@ async def execute_tool( error_message="dataset_id is required", ) dataset = await self.get_dataset(dataset_id) + text = self._format_dataset(dataset) return ToolResult( content=[ { "type": "text", - "text": self._format_dataset(dataset), + "text": self._wrap_response( + text, + source_summary=self._source_summary_for_dataset( + dataset=dataset + ), + calls=[("package_show", {"id": dataset_id})], + ), } ], success=True, @@ -654,7 +701,24 @@ async def execute_tool( ) filters = arguments.get("filters") or {} where = arguments.get("where") or None - limit = arguments.get("limit", 100) + limit = arguments.get("limit", DEFAULT_QUERY_LIMIT) + # Pre-flight: validate `where`/`filters` column names so + # typos surface here (with suggestions) instead of as a + # cryptic upstream 409. Best-effort; degrades silently. + schema_fields = None + if where or filters: + schema_fields = await self._schema_fields_safe(resource_id) + field_err = self._validate_field_names( + self._collect_field_refs(where, filters), + schema_fields, + context="where/filters", + ) + if field_err: + return ToolResult( + content=[], + success=False, + error_message=field_err, + ) records, fields, total, error = await self._query_with_schema( resource_id=resource_id, filters=filters, @@ -667,12 +731,28 @@ async def execute_tool( success=False, error_message=error, ) + text = self._format_query_results(records, fields, limit, total) + # Pick the action that actually fired so the provenance + # header matches the path taken (SQL path vs equality + # search path). + action = "datastore_search_sql" if where else "datastore_search" + params: Dict[str, Any] = { + "resource_id": resource_id, + "limit": limit, + } + if filters: + params["filters"] = filters + if where: + params["where"] = where + summary = self._source_summary_for_dataset(resource={"id": resource_id}) return ToolResult( content=[ { "type": "text", - "text": self._format_query_results( - records, fields, limit, total + "text": self._wrap_response( + text, + source_summary=summary, + calls=[(action, params)], ), } ], @@ -688,11 +768,23 @@ async def execute_tool( error_message="resource_id is required", ) schema = await self.get_schema(resource_id) + text = self._format_schema(schema) return ToolResult( content=[ { "type": "text", - "text": self._format_schema(schema), + "text": self._wrap_response( + text, + source_summary=self._source_summary_for_dataset( + resource={"id": resource_id} + ), + calls=[ + ( + "datastore_search", + {"resource_id": resource_id, "limit": 0}, + ) + ], + ), } ], success=True, @@ -713,15 +805,27 @@ async def execute_tool( success=False, error_message=result.get("message", "SQL execution failed"), ) - # Format SQL results records = result.get("records", []) fields = result.get("fields", []) effective_limit = result.get("effective_limit") formatted_text = self._format_sql_results( - records, fields, effective_limit=effective_limit + records, + fields, + effective_limit=effective_limit, + is_passthrough=True, ) + summary = f"raw SQL on {self.plugin_config.portal_url.rstrip('/')}" return ToolResult( - content=[{"type": "text", "text": formatted_text}], + content=[ + { + "type": "text", + "text": self._wrap_response( + formatted_text, + source_summary=summary, + calls=[("datastore_search_sql", {"sql": sql})], + ), + } + ], success=True, ) @@ -733,12 +837,15 @@ async def execute_tool( success=False, error_message="query is required", ) - limit = arguments.get("limit", 100) + limit = arguments.get("limit", DEFAULT_QUERY_LIMIT) filters = arguments.get("filters") or {} where = arguments.get("where") or None dataset_index = arguments.get("dataset_index") resource_index = arguments.get("resource_index") resource_name = arguments.get("resource_name") + include_resource_totals = bool( + arguments.get("include_resource_totals", False) + ) composite = await self.search_and_query( query=query, limit=limit, @@ -747,6 +854,7 @@ async def execute_tool( dataset_index=dataset_index, resource_index=resource_index, resource_name=resource_name, + include_resource_totals=include_resource_totals, ) if composite.get("error"): return ToolResult( @@ -756,11 +864,35 @@ async def execute_tool( "message", "search_and_query failed" ), ) + text = self._format_search_and_query(composite, limit) + # search_and_query fires two upstream actions in sequence. + # Surface both so the user can reproduce the chain. + chosen_rid = (composite.get("resource") or {}).get("id") + second_action = "datastore_search_sql" if where else "datastore_search" + second_params: Dict[str, Any] = { + "resource_id": chosen_rid, + "limit": limit, + } + if filters: + second_params["filters"] = filters + if where: + second_params["where"] = where + summary = self._source_summary_for_dataset( + dataset=composite.get("dataset"), + resource=composite.get("resource"), + ) return ToolResult( content=[ { "type": "text", - "text": self._format_search_and_query(composite, limit), + "text": self._wrap_response( + text, + source_summary=summary, + calls=[ + ("package_search", {"q": query}), + (second_action, second_params), + ], + ), } ], success=True, @@ -781,14 +913,35 @@ async def execute_tool( success=False, error_message="metrics parameter is required", ) + group_by = arguments.get("group_by", []) or [] + filters = arguments.get("filters") + # Pre-flight: catch field-name typos in group_by / metrics + # / filters before we generate SQL. Best-effort. + schema_fields = await self._schema_fields_safe(resource_id) + field_err = self._validate_field_names( + self._collect_field_refs( + where=None, + filters=filters, + group_by=group_by, + metrics=metrics, + ), + schema_fields, + context="group_by/metrics/filters", + ) + if field_err: + return ToolResult( + content=[], + success=False, + error_message=field_err, + ) result = await self.aggregate_data( resource_id=resource_id, - group_by=arguments.get("group_by", []), + group_by=group_by, metrics=metrics, - filters=arguments.get("filters"), + filters=filters, having=arguments.get("having"), order_by=arguments.get("order_by"), - limit=arguments.get("limit", 100), + limit=arguments.get("limit", DEFAULT_QUERY_LIMIT), ) if result.get("error"): return ToolResult( @@ -801,8 +954,27 @@ async def execute_tool( result.get("fields", []), effective_limit=result.get("effective_limit"), ) + agg_params: Dict[str, Any] = { + "resource_id": resource_id, + "metrics": metrics, + } + if group_by: + agg_params["group_by"] = group_by + if filters: + agg_params["filters"] = filters + summary = self._source_summary_for_dataset(resource={"id": resource_id}) return ToolResult( - content=[{"type": "text", "text": formatted}], success=True + content=[ + { + "type": "text", + "text": self._wrap_response( + formatted, + source_summary=summary, + calls=[("datastore_search_sql", agg_params)], + ), + } + ], + success=True, ) else: @@ -839,7 +1011,7 @@ async def search_datasets( async def _search_datasets_with_count( self, query: str, limit: int = 20 ) -> Tuple[List[Dict[str, Any]], Optional[int]]: - """Same as search_datasets but also returns CKAN's `count` — the + """Same as search_datasets but also returns CKAN's `count` -- the true number of datasets matching the query, regardless of the row cap. Lets the formatter say "20 of 47 matching datasets returned" instead of just "Found 20".""" @@ -990,26 +1162,22 @@ async def _query_with_schema( result = await self.execute_sql(sql) if result.get("error"): - return [], [], None, result.get( - "message", "SQL execution failed" - ) + return [], [], None, result.get("message", "SQL execution failed") records = result.get("records", []) fields = result.get("fields", []) - # If we hit the LIMIT exactly, we don't actually know the total — + # If we hit the LIMIT exactly, we don't actually know the total -- # do a cheap COUNT(*) follow-up so the model gets a real number # instead of mistaking the limit for the count. total: Optional[int] if len(records) >= limit_int: - total = await self._count_via_sql( - validated_id, where_sql, filters - ) + total = await self._count_via_sql(validated_id, where_sql, filters) else: total = len(records) return records, fields, total, None - # No `where` → cheap datastore_search path. + # No `where` -> cheap datastore_search path. params: Dict[str, Any] = {"resource_id": resource_id, "limit": limit} if filters: params["filters"] = filters @@ -1035,7 +1203,7 @@ async def _query_with_schema( return [], [], None, msg result = response.get("result", {}) - # CKAN returns `total` for free here — a true count of rows + # CKAN returns `total` for free here -- a true count of rows # matching the filter, not capped by limit. total_val = result.get("total") try: @@ -1155,20 +1323,14 @@ async def aggregate_data( having_parts: List[str] = [] for expr, value in having.items(): expr_quoted = SafeSQLBuilder.validate_metric_expr(expr) - if isinstance(value, bool) or not isinstance( - value, (int, float) - ): - raise ValueError( - f"HAVING value must be numeric: {value!r}" - ) + if isinstance(value, bool) or not isinstance(value, (int, float)): + raise ValueError(f"HAVING value must be numeric: {value!r}") having_parts.append(f"{expr_quoted} > {value}") having_clause = " HAVING " + " AND ".join(having_parts) order_clause = "" if order_by: - order_clause = " ORDER BY " + SafeSQLBuilder.validate_order_by( - order_by - ) + order_clause = " ORDER BY " + SafeSQLBuilder.validate_order_by(order_by) limit_int = SafeSQLBuilder.clamp_limit(limit) except ValueError as e: @@ -1185,7 +1347,9 @@ async def aggregate_data( @staticmethod def _queryable_resources(dataset: Dict[str, Any]) -> List[Dict[str, Any]]: """All datastore_active resources in a dataset, in package_show order.""" - return [r for r in (dataset.get("resources") or []) if r.get("datastore_active")] + return [ + r for r in (dataset.get("resources") or []) if r.get("datastore_active") + ] @classmethod def _resource_by_name( @@ -1219,6 +1383,7 @@ async def search_and_query( dataset_index: Optional[int] = None, resource_index: Optional[int] = None, resource_name: Optional[str] = None, + include_resource_totals: bool = False, ) -> Dict[str, Any]: """Search for a dataset and immediately query a queryable resource. @@ -1237,6 +1402,12 @@ async def search_and_query( """ explicit_dataset = dataset_index is not None explicit_resource = resource_index is not None + explicit_resource_name = bool(resource_name) + # The model auto-picked the resource (no resource_name, resource_index, + # or pinned dataset_index). Used by the formatter to decide whether to + # warn that the answer is for ONE of N queryable resources and might + # be partial relative to the user's "total" question. + auto_picked_resource = not (explicit_resource or explicit_resource_name) ds_idx = dataset_index or 0 # Cap how many search results we fetch so dataset_index can pick a # non-best match without an unbounded scan. @@ -1275,8 +1446,7 @@ async def search_and_query( resources = ds.get("resources") or [] if not resources: skipped_summary.append( - f" [{idx}] {ds.get('title') or ds.get('id')}: " - "no resources" + f" [{idx}] {ds.get('title') or ds.get('id')}: no resources" ) continue @@ -1341,12 +1511,7 @@ async def search_and_query( chosen_dataset, chosen_resource = ds, queryable break - formats = sorted( - { - (r.get("format") or "?").upper() - for r in resources - } - ) + formats = sorted({(r.get("format") or "?").upper() for r in resources}) skipped_summary.append( f" [{idx}] {ds.get('title') or ds.get('id')}: " f"no datastore-loaded resource (formats: {', '.join(formats)})" @@ -1355,11 +1520,7 @@ async def search_and_query( # If we walked all datasets and resource_name was set but never # matched, give a name-specific error rather than the generic # "no queryable resource" one. - if ( - chosen_dataset is None - and resource_name - and not explicit_dataset - ): + if chosen_dataset is None and resource_name and not explicit_dataset: return { "error": True, "message": ( @@ -1411,6 +1572,16 @@ async def search_and_query( ), } + # Optional: parallel COUNT(*) across every queryable resource of the + # chosen dataset, so a multi-archive dataset like Boston's 311 can + # answer "total across all years" in one follow-up call instead of + # 22 sequential ones. + sibling_totals: Optional[Dict[str, Optional[int]]] = None + if include_resource_totals: + sibling_totals = await self._count_all_queryable( + chosen_dataset, where=where, filters=filters + ) + return { "dataset": chosen_dataset, "resource": chosen_resource, @@ -1418,8 +1589,45 @@ async def search_and_query( "fields": fields, "total": total, "alternate_datasets": datasets, + "auto_picked_resource": auto_picked_resource, + "sibling_totals": sibling_totals, } + async def _count_all_queryable( + self, + dataset: Dict[str, Any], + where: Optional[Dict[str, Any]] = None, + filters: Optional[Dict[str, Any]] = None, + ) -> Dict[str, Optional[int]]: + """Run COUNT(*) in parallel against every queryable resource of a + dataset. Returns a {resource_id: int_total or None} dict -- a None + means the count failed for that resource (e.g. column doesn't exist + in that archive's schema, common when applying a `where` clause + that uses NEW SYSTEM column names against a per-year archive).""" + try: + where_sql = SafeSQLBuilder.build_where_clause(where) if where else "" + except ValueError: + where_sql = "" + + async def count_one(resource: Dict[str, Any]) -> Tuple[str, Optional[int]]: + rid = resource.get("id") or "" + try: + validated_id = SafeSQLBuilder.validate_resource_id(rid) + except ValueError: + return rid, None + n = await self._count_via_sql(validated_id, where_sql, filters) + return rid, n + + queryables = self._queryable_resources(dataset) + if not queryables: + return {} + import asyncio + + results = await asyncio.gather( + *(count_one(r) for r in queryables), return_exceptions=False + ) + return dict(results) + async def health_check(self) -> bool: """Check if CKAN API is accessible. @@ -1433,6 +1641,755 @@ async def health_check(self) -> bool: logger.error(f"Health check failed: {e}") return False + @staticmethod + def _iso_now_utc() -> str: + """UTC ISO-8601 with `Z` suffix used by the Retrieved footer. + + Civic-AI principle: every response ends with a timestamp so a + downstream caller can tell a fresh fetch from a stale cached one. + """ + return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + @staticmethod + def _params_repr(params: Dict[str, Any]) -> str: + """Render a {k: v} dict as `k1=..., k2=...` for the echoed-Query line. + + Long string values (typically SQL) are tail-truncated so the + provenance header doesn't dominate the response. + """ + parts: List[str] = [] + for key, value in params.items(): + if isinstance(value, str) and len(value) > _PARAMS_REPR_MAX: + value = value[: _PARAMS_REPR_MAX - 3] + "..." + parts.append(f"{key}={value!r}") + return ", ".join(parts) + + def _format_provenance_header( + self, + source_summary: str, + calls: List[Tuple[str, Dict[str, Any]]], + ) -> str: + """Render the leading `## Source` block. + + Layout (top-down so Copilot summarizers see the load-bearing facts + first; Copilot C2/C4): + + ## Source + Source: + API: POST [; POST ...] + Query: + + The human-readable `Source:` line is what the model is expected to + quote when attributing answers. The `API:` / `Query:` lines are for + reproducibility (civic-AI #1, #2). + """ + if not source_summary and not calls: + return "" + lines: List[str] = ["## Source"] + if source_summary: + lines.append(f"Source: {source_summary}") + if calls: + base = self.plugin_config.base_url.rstrip("/") + endpoints = [f"POST {base}/api/3/action/{a}" for a, _ in calls] + lines.append("API: " + "; ".join(endpoints)) + # Echo each call's params on its own line so a multi-call + # composite (search_and_query) doesn't blur which params + # belong to which call. + for action, params in calls: + if params: + lines.append(f"Query [{action}]: {self._params_repr(params)}") + return "\n".join(lines) + + @staticmethod + def _format_critical_reminders(body: str) -> str: + """Emit a bottom-of-response prose reminder for each critical + caveat already fired upstream. + + Copilot A3: a caveat that appears once as a structured marker + is more likely to be dropped by GPT-4o than one that also + appears in prose. Repeating the load-bearing ones in plain + language at the bottom is cheap insurance. + + Pulls the trigger from the caller's already-rendered body so we + never double-fire and stay silent when nothing critical did. + """ + reminders: List[str] = [] + # Order matches the body's caveat block so the prose reminders + # read naturally if more than one fires. + if "APPARENT ABANDONMENT" in body: + reminders.append( + "(Reminder: this dataset shows APPARENT ABANDONMENT -- " + "the stated update cadence and the actual resource " + "last-modified date diverge. Verify with the publisher " + "before reporting these values as current.)" + ) + if "NO UPDATE CADENCE DECLARED" in body: + reminders.append( + "(Reminder: the publisher has not declared an update " + "cadence and the resource has not been touched in over " + "two years. Treat this as a possibly one-shot snapshot.)" + ) + if "SINGLE-RECORD CLAIM" in body: + reminders.append( + "(Reminder: only ONE record matched. Do not generalize " + "or treat this as a trend; it is a single anecdote.)" + ) + if "SQL PASSTHROUGH" in body: + reminders.append( + "(Reminder: the SQL above was written by the model, not " + "selected from a curated set. Verify it matches the " + "user's actual question before trusting the result.)" + ) + return "\n".join(reminders) + + def _wrap_response( + self, + text: str, + source_summary: str = "", + calls: Optional[List[Tuple[str, Dict[str, Any]]]] = None, + ) -> str: + """Centralized response wrapper. + + Prepends a `## Source` block (civic-AI #1, #2; Copilot C4) and + appends a UTC ISO-8601 retrieval timestamp (civic-AI #3). Also + appends a prose-reminder block for critical caveats that fired + in the body (Copilot A3). Per-formatter code never has to add + these by hand. + """ + header = self._format_provenance_header(source_summary, calls or []) + reminders = self._format_critical_reminders(text) + footer = f"_Retrieved: {self._iso_now_utc()}_" + parts: List[str] = [] + if header: + parts.append(header) + parts.append("") + parts.append(text) + if reminders: + parts.append("") + parts.append(reminders) + parts.append("") + parts.append(footer) + return "\n".join(parts) + + def _package_url(self, dataset_id: str) -> str: + """Public CKAN package page URL. + + Boston's portal exposes `/dataset/`. We use + the dataset name (slug) when available because it's the stable + human-readable identifier on the portal; the UUID also resolves + but reads worse. + """ + return f"{self.plugin_config.portal_url.rstrip('/')}/dataset/{dataset_id}" + + def _source_summary_for_dataset( + self, + dataset: Optional[Dict[str, Any]] = None, + resource: Optional[Dict[str, Any]] = None, + ) -> str: + """Build the human-readable `Source:` one-liner. + + Civic-AI #1: ` / resource () / + ` when we have full context; degrades gracefully when + we have less. + """ + portal = self.plugin_config.portal_url.rstrip("/") + if not dataset: + if resource: + rid = resource.get("id", "?") + rname = resource.get("name") or "" + rname_part = f" ({rname})" if rname else "" + return f"resource {rid}{rname_part} on {portal}" + return f"{portal}" + title = dataset.get("title") or dataset.get("name") or "unknown dataset" + slug = dataset.get("name") or dataset.get("id") or "" + url = self._package_url(slug) if slug else portal + if resource: + rid = resource.get("id", "?") + rname = resource.get("name") or "" + rname_part = f" ({rname})" if rname else "" + return f"{title} / resource {rid}{rname_part} / {url}" + return f"{title} / {url}" + + @staticmethod + def _format_sample_size_caveat( + total: Optional[int], + unit: str = "row", + ) -> str: + """Emit a SINGLE-RECORD or SMALL SAMPLE banner so the model doesn't + pattern-match a tiny result into a trend. + + Returns ``""`` when the sample is large enough not to warrant a + warning, or when ``total`` is unknown (we'd rather stay silent than + false-alarm). Civic-AI principles #5 and #6. + """ + if total is None: + return "" + if total == 1: + return ( + "=== SINGLE-RECORD CLAIM (N=1) ===\n" + f"Exactly 1 {unit} matched. Do NOT report this as a " + "trend, average, or general pattern -- it's an N=1 " + "anecdote. Quote the record directly or say so when " + "summarizing.\n" + "=================================" + ) + if 1 < total <= _SMALL_SAMPLE_MAX: + return ( + "=== SMALL SAMPLE ===\n" + f"Only {total} {unit}(s) matched. Treat any aggregate " + "or rate calculated from this as a small-sample estimate " + "and call out the sample size when summarizing.\n" + "====================" + ) + return "" + + @staticmethod + def _parse_ckan_iso(value: Optional[str]) -> Optional[datetime]: + """Parse the timestamp shapes CKAN returns. + + Handles naive ISO with microseconds ("2021-04-14T12:34:56.789012"), + trailing-Z UTC, and plain dates. Returns ``None`` on anything else + so callers can degrade silently. + """ + if not value or not isinstance(value, str): + return None + try: + iso = value.rstrip("Z") + parsed = datetime.fromisoformat(iso) + if parsed.tzinfo is None: + parsed = parsed.replace(tzinfo=timezone.utc) + return parsed + except (ValueError, AttributeError): + return None + + @staticmethod + def _frequency_days(frequency: Optional[str]) -> Optional[int]: + """Translate a CKAN-declared `frequency` string to a day count. + + Returns ``None`` for unset / unrecognized values so the abandonment + detector can stay silent rather than guess. Substring match because + portals are inconsistent ("Weekly", "Updated Weekly", "weekly cadence"). + """ + if not frequency or not isinstance(frequency, str): + return None + f = frequency.strip().lower() + if not f or f in ("as needed", "as-needed", "irregular", "one-time"): + return None + for key, days in _FREQUENCY_DAYS.items(): + if key in f: + return days + return None + + @classmethod + def _format_freshness_caveat( + cls, + dataset: Optional[Dict[str, Any]] = None, + resource: Optional[Dict[str, Any]] = None, + ) -> str: + """Surface freshness caveats for the chosen dataset/resource. + + Renders, in this order (whichever apply): + + 1. **APPARENT ABANDONMENT** (civic-AI #5): declared frequency is + "weekly"/"daily"/etc. but the resource hasn't been touched in + more than 4x that interval. + 2. **NO UPDATE CADENCE DECLARED** (civic-AI #6): frequency unset + and resource > 2 years old. + 3. **DATA FRESHNESS** (civic-AI #4 + legacy): dual timestamps + showing metadata_modified vs resource.last_modified, with a + warning when they diverge by more than a year. + + Best-effort: missing fields are skipped rather than guessed. + """ + if dataset is None and resource is None: + return "" + dataset = dataset or {} + # If no explicit resource passed, fall back to the first queryable + # one so get_dataset / search_and_query can both call this with + # what they have. + if resource is None: + for r in dataset.get("resources") or []: + if r.get("datastore_active"): + resource = r + break + else: + resource = (dataset.get("resources") or [None])[0] + resource = resource or {} + + meta_mod = cls._parse_ckan_iso(dataset.get("metadata_modified")) + res_mod = cls._parse_ckan_iso( + resource.get("last_modified") or resource.get("revision_timestamp") + ) + now = datetime.now(timezone.utc) + frequency_text = dataset.get("frequency") or dataset.get("update_frequency") + freq_days = cls._frequency_days(frequency_text) + + blocks: List[str] = [] + + # 1. Abandonment detector + if ( + res_mod is not None + and freq_days is not None + and (now - res_mod).days > freq_days * _ABANDONMENT_INTERVAL_MULTIPLIER + ): + age_days = (now - res_mod).days + interval_label = ( + f"{age_days} days" if age_days < 365 else f"{age_days / 365:.1f} years" + ) + blocks.append( + "[APPARENT ABANDONMENT]\n" + f"Dataset declares {frequency_text!r} updates but the " + f"resource was last modified " + f"{res_mod.strftime('%Y-%m-%d')} ({interval_label} ago). " + f"Expected cadence: every ~{freq_days} days. The data is " + "likely stale; verify with the publisher before relying " + "on it for current questions.\n" + "[/APPARENT ABANDONMENT]" + ) + + # 2. No-frequency note + elif ( + res_mod is not None + and freq_days is None + and (now - res_mod).days > _NO_FREQUENCY_OLD_DAYS + ): + years = (now - res_mod).days / 365.0 + blocks.append( + "[NO UPDATE CADENCE DECLARED]\n" + f"Update cadence not declared; resource last modified " + f"{res_mod.strftime('%Y-%m-%d')} ({years:.1f} years " + "ago). Cannot tell if this is current or a one-time " + "snapshot.\n" + "[/NO UPDATE CADENCE DECLARED]" + ) + + # 3. Dual-timestamp DATA FRESHNESS card. + # + # Fires when: + # - the resource itself is older than a year, OR + # - the resource timestamp and the metadata timestamp diverge + # by more than a year (description edits papering over data + # that hasn't moved), OR + # - the resource timestamp is missing but metadata_modified is + # itself stale (degraded signal -- still better than silence). + res_old = res_mod is not None and (now - res_mod).days > _STALE_DATASET_DAYS + divergent = ( + res_mod is not None + and meta_mod is not None + and abs((meta_mod - res_mod).days) > _STALE_DATASET_DAYS + ) + meta_only_old = ( + res_mod is None + and meta_mod is not None + and (now - meta_mod).days > _STALE_DATASET_DAYS + ) + if res_old or divergent or meta_only_old: + lines = ["[DATA FRESHNESS]"] + if res_mod is not None: + age = (now - res_mod).days + lines.append( + f"Data last updated: {res_mod.strftime('%Y-%m-%d')} " + f"(resource, {age} days ago)." + ) + if meta_mod is not None: + age = (now - meta_mod).days + lines.append( + f"Metadata last touched: " + f"{meta_mod.strftime('%Y-%m-%d')} " + f"(description/tags, {age} days ago -- " + "may not reflect data changes)." + ) + if divergent: + lines.append( + "WARNING: these timestamps diverge by more than a " + "year. The metadata says recent, the data underneath " + "is stale. Use the resource timestamp when judging " + "currency." + ) + lines.append( + "Treat this as a historical snapshot, not current state. " + "If the user asked about 'now' / 'today' / 'currently', " + "say so explicitly." + ) + lines.append("[/DATA FRESHNESS]") + blocks.append("\n".join(lines)) + + return "\n\n".join(blocks) + + @staticmethod + def _looks_like_date(values: List[Any]) -> bool: + """True when most sampled string values match a date-ish shape. + + Accepts ISO 8601 ("2024-06-15"), US format ("06/15/2024"), and the + ISO datetime forms. 80% threshold so a column with the odd typo + still trips the check. + """ + if not values: + return False + date_re = re.compile( + r"^\s*\d{4}-\d{2}-\d{2}" # ISO + r"|^\s*\d{1,2}[/\-]\d{1,2}[/\-]\d{2,4}" # US or DMY + r"|^\s*[A-Za-z]{3,9}\s+\d{1,2},?\s+\d{4}" # named month + ) + matches = sum(1 for v in values if isinstance(v, str) and date_re.match(v)) + return matches / max(len(values), 1) >= 0.8 + + @staticmethod + def _looks_like_number(values: List[Any]) -> bool: + """True when most sampled string values are numeric-looking. + + Skips dates (the date check should run first) -- a "20240615" + string would otherwise be misclassified as a number; the date + regex catches dashed forms before this is consulted. + """ + if not values: + return False + num_re = re.compile(r"^\s*-?\d+(?:\.\d+)?\s*$") + matches = sum(1 for v in values if isinstance(v, str) and num_re.match(v)) + return matches / max(len(values), 1) >= 0.8 + + @classmethod + def _format_stringly_typed_caveat( + cls, + records: List[Dict[str, Any]], + fields: Optional[List[Dict[str, Any]]], + ) -> str: + """Emit a TYPE NOTE banner when TEXT fields hold date- or number- + shaped values. Civic-AI #9. + + Pulls sampled values from the actual returned rows so we don't + false-alarm on TEXT columns that genuinely hold text. Best-effort; + if records or fields are missing, returns "". + """ + if not records or not fields: + return "" + text_types = {"text", "string", "varchar"} + candidates = [ + f + for f in fields + if (f.get("type") or "").lower() in text_types + and f.get("id") + and f.get("id") != "_id" + ] + if not candidates: + return "" + notes: List[str] = [] + for f in candidates: + fid = f["id"] + sampled = [r.get(fid) for r in records if r.get(fid) not in (None, "")] + # Need a minimum sample so a single value doesn't trip the + # check. Three matches the small-sample threshold elsewhere. + if len(sampled) < 3: + continue + if cls._looks_like_date(sampled): + notes.append( + f" - {fid!r} is stored as TEXT but values look like " + "dates. Comparisons like > '2024-01-01' will be " + "STRING comparison, not date comparison; ORDER BY " + "this column will sort alphabetically. Cast in SQL " + f'(e.g. ("{fid}")::timestamp) before comparing ' + "or ordering." + ) + elif cls._looks_like_number(sampled): + notes.append( + f" - {fid!r} is stored as TEXT but values look like " + "numbers. Comparisons (> < BETWEEN) will be string " + "comparison ('10' < '2'); ORDER BY this column will " + f'sort lexicographically. Cast (e.g. ("{fid}")::' + "numeric) before comparing or aggregating." + ) + if not notes: + return "" + return ( + "[STRINGLY-TYPED FIELDS]\n" + + "\n".join(notes) + + ("\n[/STRINGLY-TYPED FIELDS]") + ) + + @staticmethod + def _is_null_like(value: Any) -> bool: + """True when a value is one of CKAN's six common null + representations (civic-AI #10): actual null, "", "N/A", "Unknown", + "None", "NULL".""" + if value is None: + return True + if isinstance(value, str): + return value.strip().lower() in _NULL_LIKE_STRINGS + return False + + @classmethod + def _render_null_like(cls, value: Any) -> str: + """Render a null-like value distinctly so the model doesn't treat + the string 'Unknown' as a real category. + + Real nulls become ``, empty strings ``, and other + null-like literals become `<"">` -- the quoting makes + clear the literal *value* is sentinel, not real content. + """ + if value is None: + return "" + if isinstance(value, str): + text = value.strip() + if text == "": + return "" + return f'<"{value}">' + return str(value) + + @classmethod + def _normalize_value_for_display( + cls, + value: Any, + field_type: Optional[str], + ) -> str: + """Render a single cell for record preview. + + For timestamp/date columns we coerce to ISO 8601. Midnight-aligned + timestamps render as date-only so a date column doesn't grow a + meaningless `T00:00:00`. NULL-likes are rendered distinctly + (civic-AI #10) so the model can tell "Unknown" the category apart + from missing data. + """ + if cls._is_null_like(value): + return cls._render_null_like(value) + if not field_type: + return str(value) + ftype = field_type.lower() + if ftype in ("timestamp", "timestamptz", "date"): + parsed = cls._parse_timestamp(value) + if parsed is None: + return str(value) + if ( + parsed.hour == 0 + and parsed.minute == 0 + and parsed.second == 0 + and parsed.microsecond == 0 + ): + return parsed.strftime("%Y-%m-%d") + return parsed.strftime("%Y-%m-%dT%H:%M:%S") + return str(value) + + @classmethod + def _format_null_like_frequency_caveat( + cls, + records: List[Dict[str, Any]], + fields: Optional[List[Dict[str, Any]]], + ) -> str: + """Civic-AI #11: when >20% of a column's returned values are + null-like, emit a DATA QUALITY note naming the field and its + missing-rate.""" + if not records or not fields: + return "" + usable_fields = [f for f in fields if f.get("id") and f.get("id") != "_id"] + offenders: List[Tuple[str, float]] = [] + n = len(records) + for f in usable_fields: + fid = f["id"] + missing = sum(1 for r in records if cls._is_null_like(r.get(fid))) + ratio = missing / n if n else 0.0 + if ratio >= _NULL_LIKE_FREQ_THRESHOLD: + offenders.append((fid, ratio)) + if not offenders: + return "" + # Sort by missing-rate descending so the worst offenders surface + # first when there are several. + offenders.sort(key=lambda x: x[1], reverse=True) + lines = ["[DATA QUALITY]"] + for fid, ratio in offenders: + pct = int(round(ratio * 100)) + lines.append( + f" - {fid!r} is empty / null / 'N/A' / 'Unknown' in " + f"{pct}% of returned records. Treat aggregations on this " + "field with care: high missing-rate." + ) + lines.append("[/DATA QUALITY]") + return "\n".join(lines) + + @staticmethod + def _parse_timestamp(value: Any) -> Optional[datetime]: + """Best-effort parse of upstream timestamp values. + + CKAN/Postgres returns ISO strings but Boston's portal also ships + epoch-ms integers in some resources. Handle both; return None if + neither shape matches. + """ + if isinstance(value, datetime): + return value + if isinstance(value, (int, float)): + try: + # Heuristic: > 10^11 => ms, otherwise seconds. + ts = value / 1000.0 if value > 1e11 else float(value) + return datetime.fromtimestamp(ts, tz=timezone.utc) + except (OverflowError, OSError, ValueError): + return None + if isinstance(value, str): + text = value.strip() + if not text: + return None + try: + return datetime.fromisoformat(text.replace("Z", "+00:00")) + except ValueError: + return None + return None + + @staticmethod + def _field_types_by_id( + fields: Optional[List[Dict[str, Any]]], + ) -> Dict[str, str]: + """Build {field_id: type} lookup used by record-display normalization.""" + if not fields: + return {} + return {f.get("id"): (f.get("type") or "") for f in fields if f.get("id")} + + def _format_record_lines( + self, + record: Dict[str, Any], + field_types: Dict[str, str], + ) -> List[str]: + """Render one record's fields with date normalization applied. + + Skips CKAN's internal `_id` column the same way the legacy formatters + did, so output is unchanged for non-date columns. + """ + out: List[str] = [] + for key, value in record.items(): + if key == "_id": + continue + rendered = self._normalize_value_for_display(value, field_types.get(key)) + out.append(f" {key}: {rendered}") + return out + + @staticmethod + def _validate_field_names( + provided: List[str], + schema_fields: Optional[List[Dict[str, Any]]], + context: str, + ) -> Optional[str]: + """Pre-validate caller-supplied field names against the resource's + schema and return a 'did you mean?' error message for any typo. + + Best-effort (civic-AI #15): if we couldn't fetch a schema, returns + ``None`` and lets the upstream API do its own (cryptic) error. The + upstream's complaint isn't useful to an LLM, so when we *do* have + the schema we'd rather catch it here. + """ + if not schema_fields: + return None + known = {f.get("id") for f in schema_fields if f.get("id")} + if not known: + return None + unknown = [name for name in provided if name and name not in known] + if not unknown: + return None + suggestions: List[str] = [] + known_list = list(known) + for name in unknown: + close = difflib.get_close_matches(name, known_list, n=1, cutoff=0.6) + if close: + suggestions.append(f"'{name}' (did you mean '{close[0]}'?)") + else: + suggestions.append(f"'{name}'") + return ( + f"Unknown field name(s) in {context}: " + f"{', '.join(suggestions)}. Valid columns: " + f"{', '.join(sorted(known_list))}." + ) + + async def _schema_fields_safe( + self, resource_id: str + ) -> Optional[List[Dict[str, Any]]]: + """Fetch the resource's field list, swallowing any error. + + Used by the pre-flight field validator (civic-AI #9). If this fails + we degrade silently -- false silences beat false alarms. + """ + try: + return await self.get_schema(resource_id) + except Exception: # noqa: BLE001 -- best-effort by design + return None + + @staticmethod + def _collect_field_refs( + where: Optional[Dict[str, Any]], + filters: Optional[Dict[str, Any]], + group_by: Optional[List[str]] = None, + metrics: Optional[Dict[str, str]] = None, + ) -> List[str]: + """Extract every column name the caller wants to reference in a + single call, so the field validator can check them all at once.""" + refs: List[str] = [] + if where: + refs.extend(k for k in where.keys() if isinstance(k, str)) + if filters: + refs.extend(k for k in filters.keys() if isinstance(k, str)) + if group_by: + refs.extend(k for k in group_by if isinstance(k, str)) + if metrics: + # Metric expressions look like 'count(*)' or 'avg(field)' -- pull + # the column name out of the parens and skip count(*) since + # there's nothing to validate. + metric_re = re.compile( + r"^\s*[a-zA-Z_]+\s*\(\s*(?:distinct\s+)?([A-Za-z_][A-Za-z0-9_]*)\s*\)\s*$", + re.IGNORECASE, + ) + for expr in metrics.values(): + if not isinstance(expr, str): + continue + m = metric_re.match(expr) + if m: + refs.append(m.group(1)) + return refs + + @staticmethod + def _format_search_ambiguity_caveat( + datasets: List[Dict[str, Any]], + ) -> str: + """Warn when search_datasets returned multiple plausible matches + for a topic (civic-AI #14). + + Heuristic: fires when >= 2 datasets share a non-trivial title-token + with the top result (e.g. 'crime', 'crime incident report', 'crime + stats summary'). Without this warning, the model silently picks + the first hit and reports it as canonical. + """ + if not datasets or len(datasets) < 2: + return "" + # Build a token set from the top dataset's title (skipping short + # stopwords); count how many other datasets share at least one + # of those tokens. + stop = {"of", "and", "the", "in", "for", "to", "by", "on", "a", "an"} + + def tokens(text: str) -> set: + return { + t.lower() + for t in re.findall(r"[A-Za-z]{3,}", text or "") + if t.lower() not in stop + } + + top_tokens = tokens(datasets[0].get("title") or "") + if not top_tokens: + return "" + plausible: List[str] = [] + for ds in datasets[1:5]: # cap at 4 alternates so the banner stays brief + ds_tokens = tokens(ds.get("title") or "") + if top_tokens & ds_tokens: + plausible.append( + f" - {ds.get('title') or '(untitled)'} " + f"(dataset_id={ds.get('id', '?')})" + ) + if not plausible: + return "" + return ( + "[AMBIGUOUS SEARCH]\n" + f"Multiple datasets match this topic; the top hit " + f"({datasets[0].get('title') or '?'!r}) is one of " + f"{1 + len(plausible)} plausible candidates. Do NOT silently " + "pick the first one. Either show the user the alternates " + "below, or call ckan__search_and_query with each candidate " + "individually to compare. Plausible alternates:\n" + + "\n".join(plausible) + + "\n[/AMBIGUOUS SEARCH]" + ) + def _format_search_results( self, datasets: List[Dict[str, Any]], @@ -1441,7 +2398,13 @@ def _format_search_results( ) -> str: """Format search results for user display.""" if not datasets: - return f"No datasets found in {self.plugin_config.city_name}'s open data portal." + return ( + f"No datasets matched in {self.plugin_config.city_name}'s " + "open data portal.\n" + "NOTE: zero results does NOT mean the data doesn't exist. " + "Try a broader keyword, a synonym, or check the portal " + f"directly at {self.plugin_config.portal_url}." + ) suggested_resource_id: Optional[str] = None suggested_dataset_id: Optional[str] = None @@ -1453,14 +2416,23 @@ def _format_search_results( break lines: List[str] = [] + + # Civic-AI #14: when multiple plausible datasets matched, warn + # before the listing so the model doesn't silently pick the first. + ambiguity_caveat = self._format_search_ambiguity_caveat(datasets) + if ambiguity_caveat: + lines.append("## Caveats") + lines.append(ambiguity_caveat) + lines.append("") + if suggested_resource_id: lines.extend( [ "=== NEXT STEP (read this first) ===", f"suggested_resource_id: {suggested_resource_id}", "suggested_next_tool: ckan__query_data", - f"suggested_call: ckan__query_data(resource_id=\"{suggested_resource_id}\")", - "(this is the datastore-loaded resource — only such " + f'suggested_call: ckan__query_data(resource_id="{suggested_resource_id}")', + "(this is the datastore-loaded resource -- only such " "resources can be queried; others are file downloads.)", "(or use ckan__search_and_query for a one-call " "keyword-to-data flow.)", @@ -1525,7 +2497,7 @@ def _format_search_results( ) elif resources: lines.append( - f" resource_id: NONE QUERYABLE — this dataset has " + f" resource_id: NONE QUERYABLE -- this dataset has " f"{len(resources)} resource(s) but none are loaded into " "the datastore (datastore_active=false). Use " "ckan__get_dataset for download URLs." @@ -1565,7 +2537,7 @@ def _format_dataset(self, dataset: Dict[str, Any]) -> str: "=== NEXT STEP (read this first) ===", f"suggested_resource_id: {suggested_resource_id}", "suggested_next_tool: ckan__query_data", - f"suggested_call: ckan__query_data(resource_id=\"{suggested_resource_id}\")", + f'suggested_call: ckan__query_data(resource_id="{suggested_resource_id}")', "(this is the datastore-loaded resource; the others are " "file downloads only.)", "===================================", @@ -1579,12 +2551,24 @@ def _format_dataset(self, dataset: Dict[str, Any]) -> str: f"This dataset has {len(resources)} resource(s) but none " "are loaded into the datastore (datastore_active=false), " "so ckan__query_data will not work on them. They are " - "file downloads — see URLs below.", + "file downloads -- see URLs below.", "=================", "", ] ) + # Pick the queryable resource (or first if none) for freshness math. + freshness_resource = ( + self._first_queryable_resource(dataset) + or (dataset.get("resources") or [None])[0] + ) + freshness_caveat = self._format_freshness_caveat( + dataset=dataset, resource=freshness_resource + ) + if freshness_caveat: + lines.append(freshness_caveat) + lines.append("") + lines.extend( [ f"Dataset: {title}", @@ -1610,13 +2594,13 @@ def _format_dataset(self, dataset: Dict[str, Any]) -> str: lines.append(f" resource_id: {res_id}") if queryable_flag: lines.append( - f" Use ckan__query_data with resource_id=\"{res_id}\" to fetch rows." + f' Use ckan__query_data with resource_id="{res_id}" to fetch rows.' ) else: if res_url: lines.append(f" download_url: {res_url}") lines.append( - " (not loaded into datastore — ckan__query_data " + " (not loaded into datastore -- ckan__query_data " "will return 404 for this resource_id)" ) else: @@ -1634,49 +2618,77 @@ def _format_query_results( limit: int = 100, total: Optional[int] = None, ) -> str: - """Format query results for user display.""" + """Format query results for user display. + + Layout (Copilot C2: caveats lead, records follow): + + ## Caveats (only if any fire) + TRUNCATED / SAMPLE / STRINGLY-TYPED / DATA QUALITY + ## Records (always, or empty-state message) + header + records + ## Schema (Filterable columns footer) + """ n_returned = len(records) - truncated_warning = self._format_truncation_block( - n_returned, limit, total - ) + truncated_warning = self._format_truncation_block(n_returned, limit, total) + sample_caveat = self._format_sample_size_caveat(total, unit="row") + stringly_caveat = self._format_stringly_typed_caveat(records, fields) + null_caveat = self._format_null_like_frequency_caveat(records, fields) + field_types = self._field_types_by_id(fields) + + caveats = [ + c + for c in (truncated_warning, sample_caveat, stringly_caveat, null_caveat) + if c + ] + parts: List[str] = [] + if caveats: + parts.append("## Caveats") + parts.extend(caveats) + parts.append("") + + parts.append("## Records") if not records: - text = "No records found matching the query." - parts = [truncated_warning, text] if truncated_warning else [text] + parts.append( + "No records matched the query.\n" + "NOTE: zero records does NOT mean zero data exists. Verify " + "that the column names in `where` / `filters` are correct " + "(see schema below), that the date range covers the right " + "period, and that the filter values match the dataset's " + "vocabulary (e.g. 'Closed' vs 'closed' vs 'CLOSED')." + ) schema_footer = self._format_schema_footer(fields) if schema_footer: parts.append("") parts.append(schema_footer) return "\n".join(parts) - lines: List[str] = [] - if truncated_warning: - lines.append(truncated_warning) - lines.append("") - - # Header line: lead with the X-of-Y framing so the model can't - # mistake the rows-returned count for the answer to a counting - # question. - lines.append(self._format_count_header(n_returned, limit, total)) - lines.append("") + parts.append(self._format_count_header(n_returned, limit, total)) + # Repeat the "don't generalize" directive when the sample is + # capped (Copilot C3 + C10: redundancy is cheap, dropped caveats + # are expensive). + if total is not None and total > n_returned: + parts.append( + "(Reminder: the records below are a SAMPLE -- do NOT " + "generalize counts or percentages from them. The TOTAL " + f"is {total}, see the line above.)" + ) + parts.append("") - # Show first few records as examples for i, record in enumerate(records[:5], 1): - lines.append(f"Record {i}:") - for key, value in record.items(): - if key != "_id": # Skip internal ID - lines.append(f" {key}: {value}") - lines.append("") + parts.append(f"Record {i}:") + parts.extend(self._format_record_lines(record, field_types)) + parts.append("") if n_returned > 5: - lines.append(f"... and {n_returned - 5} more record(s) returned") + parts.append(f"... and {n_returned - 5} more record(s) returned") schema_footer = self._format_schema_footer(fields) if schema_footer: - lines.append("") - lines.append(schema_footer) + parts.append("") + parts.append(schema_footer) - return "\n".join(lines) + return "\n".join(parts) @staticmethod def _format_count_header( @@ -1700,7 +2712,7 @@ def _format_count_header( # total unknown return ( f"{n_returned} {unit} returned (limit={limit}, " - "true total unknown — see TRUNCATED warning above if any)." + "true total unknown -- see TRUNCATED warning above if any)." ) def _format_truncation_block( @@ -1709,16 +2721,16 @@ def _format_truncation_block( limit: int, total: Optional[int], ) -> str: - """Emit a clear warning when the result set is — or might be — + """Emit a clear warning when the result set is -- or might be -- truncated by LIMIT. Returns ``""`` when no warning is needed (result fits within limit and total is known/equal to returned).""" - # Total known and matches returned → fits within limit, no warning. + # Total known and matches returned -> fits within limit, no warning. if total is not None and total <= n_returned: return "" - # Total known but exceeds returned → exact truncation, exact total. + # Total known but exceeds returned -> exact truncation, exact total. if total is not None and total > n_returned: return ( "=== TRUNCATED ===\n" @@ -1729,12 +2741,12 @@ def _format_truncation_block( "`limit` (max 10000) or use `ckan__execute_sql` with " "your own LIMIT/ORDER BY. For just the count, use " "ckan__aggregate_data with metrics=" - '{"count": "count(*)"} and a matching filter — ' + '{"count": "count(*)"} and a matching filter -- ' "it's cheaper than fetching rows.\n" "=================" ) - # Total unknown and we hit the limit exactly → likely truncated. + # Total unknown and we hit the limit exactly -> likely truncated. if total is None and n_returned >= limit: return ( "=== MAY BE TRUNCATED ===\n" @@ -1742,7 +2754,7 @@ def _format_truncation_block( f"({limit}) and the true total could not be determined. " "Treat this as a possibly-incomplete sample. For " "counting questions, do NOT report " - f"{n_returned} as the answer — use ckan__aggregate_data " + f"{n_returned} as the answer -- use ckan__aggregate_data " 'with metrics={"count": "count(*)"} and the same ' "filter, or re-run with a higher limit.\n" "========================" @@ -1750,9 +2762,7 @@ def _format_truncation_block( return "" - def _format_schema_footer( - self, fields: Optional[List[Dict[str, Any]]] - ) -> str: + def _format_schema_footer(self, fields: Optional[List[Dict[str, Any]]]) -> str: """Render a per-call 'Filterable columns' block listing every field the model can pass to ``where``, ``filters``, or reference in ``execute_sql``. @@ -1761,9 +2771,7 @@ def _format_schema_footer( pivot (e.g. 'now filter by close_date') is a one-shot.""" if not fields: return "" - usable = [ - f for f in fields if f.get("id") and f.get("id") != "_id" - ] + usable = [f for f in fields if f.get("id") and f.get("id") != "_id"] if not usable: return "" lines = [ @@ -1788,15 +2796,13 @@ def _format_schema(self, fields: List[Dict[str, Any]]) -> str: field_info = field.get("info", {}) description = field_info.get("label", "") if field_info else "" - lines.append(f" • {field_id} ({field_type})") + lines.append(f" * {field_id} ({field_type})") if description: lines.append(f" {description}") return "\n".join(lines) - def _format_search_and_query( - self, composite: Dict[str, Any], limit: int - ) -> str: + def _format_search_and_query(self, composite: Dict[str, Any], limit: int) -> str: """Format a search_and_query composite result for user display.""" dataset = composite.get("dataset", {}) or {} resource = composite.get("resource", {}) or {} @@ -1804,6 +2810,10 @@ def _format_search_and_query( fields = composite.get("fields", []) or [] total = composite.get("total") alternates = composite.get("alternate_datasets", []) or [] + auto_picked = bool(composite.get("auto_picked_resource")) + sibling_totals: Optional[Dict[str, Optional[int]]] = composite.get( + "sibling_totals" + ) dataset_id = dataset.get("id", "unknown") dataset_title = dataset.get("title", "Untitled") @@ -1814,13 +2824,65 @@ def _format_search_and_query( count_line = self._format_count_header(n_returned, limit, total) lines: List[str] = [] - truncated_warning = self._format_truncation_block( - n_returned, limit, total - ) - if truncated_warning: - lines.append(truncated_warning) + + # PARTIAL warning fires when the model auto-picked one of N + # queryable resources. Without this, GPT-4o tends to read the + # one-resource answer as canonical and report it as the dataset + # total. (Pre-enhancement behavior was actually better here: + # the model was forced into search_datasets->get_dataset->iterate + # query_data, which surfaced all resources naturally.) + sibling_queryables = self._queryable_resources(dataset) + siblings = [r for r in sibling_queryables if r.get("id") != resource_id] + if auto_picked and siblings: + n_total_resources = len(sibling_queryables) + partial_block = ( + "=== PARTIAL DATASET ANSWER ===\n" + f"This dataset has {n_total_resources} queryable resources " + f"(e.g. a rolling current view + per-year archives). " + f"This response covers ONE of them: " + f"'{resource_name}'.\n\n" + "If the user's question was about a TOTAL across all " + "resources (e.g. 'how many ever', 'in total', " + "'across all years'), this answer is INCOMPLETE. Options:\n" + " - Re-call ckan__search_and_query with " + "include_resource_totals=true to get a per-resource " + "row count breakdown in one call.\n" + " - Pick a specific archive with resource_name=<...> " + '(e.g. resource_name="2018").\n' + " - Use ckan__execute_sql with UNION ALL across the " + "resource UUIDs listed below.\n" + "If the user's question was about RECENT data, this " + "rolling/current resource is likely fine.\n" + "===============================" + ) + lines.append(partial_block) lines.append("") + truncated_warning = self._format_truncation_block(n_returned, limit, total) + sample_caveat = self._format_sample_size_caveat(total, unit="row") + stringly_caveat = self._format_stringly_typed_caveat(records, fields) + null_caveat = self._format_null_like_frequency_caveat(records, fields) + freshness_caveat = self._format_freshness_caveat( + dataset=dataset, resource=resource + ) + + composite_caveats = [ + c + for c in ( + freshness_caveat, + truncated_warning, + sample_caveat, + stringly_caveat, + null_caveat, + ) + if c + ] + if composite_caveats: + lines.append("## Caveats") + for caveat in composite_caveats: + lines.append(caveat) + lines.append("") + lines.extend( [ "=== search_and_query result ===", @@ -1834,6 +2896,41 @@ def _format_search_and_query( ] ) + # When include_resource_totals=true was requested, lead with the + # cross-resource breakdown -- this is the answer to "total" + # questions and should be the most prominent thing. + if sibling_totals is not None: + grand: Optional[int] = 0 + per_resource_lines: List[str] = [] + had_failure = False + # Order by package_show order (rolling view first, then + # per-year archives) so the model can read it sequentially. + for r in sibling_queryables: + rid = r.get("id") or "" + rname = r.get("name") or "(unnamed)" + n = sibling_totals.get(rid) + if n is None: + per_resource_lines.append(f" - {rname}: n=null (count failed)") + had_failure = True + else: + per_resource_lines.append(f" - {rname}: {n}") + if grand is not None: + grand += n + grand_line = ( + f"GRAND TOTAL across {len(sibling_queryables)} resources: " + f"{grand}" + + ( + " (some resources returned null -- sum is partial)" + if had_failure + else "" + ) + ) + lines.append("=== Per-resource totals ===") + lines.append(grand_line) + lines.extend(per_resource_lines) + lines.append("===========================") + lines.append("") + if not records: lines.append( "No rows returned. Try broadening filters or pick a different " @@ -1842,17 +2939,15 @@ def _format_search_and_query( else: if total is not None and total > n_returned: preview_caption = ( - f"Showing first 5 of {n_returned} returned " - f"(true total: {total}):" + f"Showing first 5 of {n_returned} returned (true total: {total}):" ) else: preview_caption = f"Showing first 5 of {n_returned} returned:" lines.append(preview_caption) + field_types = self._field_types_by_id(fields) for i, record in enumerate(records[:5], 1): lines.append(f"Record {i}:") - for key, value in record.items(): - if key != "_id": - lines.append(f" {key}: {value}") + lines.extend(self._format_record_lines(record, field_types)) lines.append("") if n_returned > 5: lines.append(f"... and {n_returned - 5} more record(s) returned") @@ -1864,17 +2959,15 @@ def _format_search_and_query( # Sibling queryable resources within the SAME dataset. Boston's 311 # dataset has 22 (a rolling view + per-year archives back to 2011) - # — without this block the model can't see them. - sibling_queryables = self._queryable_resources(dataset) - chosen_resource_id = resource.get("id") - siblings = [ - r for r in sibling_queryables if r.get("id") != chosen_resource_id - ] - if siblings: + # -- without this block the model can't see them. Only render here + # if include_resource_totals=false; the per-resource-totals block + # above already lists everything with counts. + if siblings and sibling_totals is None: lines.append("") lines.append( "Other queryable resources in this dataset " - "(pass resource_name=... to pick one):" + "(pass resource_name=... to pick one, or " + "include_resource_totals=true for a full breakdown):" ) for r in siblings: r_name = r.get("name") or "(unnamed)" @@ -1885,9 +2978,7 @@ def _format_search_and_query( if len(alternates) > 1: lines.append("") - lines.append( - "Other matching datasets (pass dataset_index=N to switch):" - ) + lines.append("Other matching datasets (pass dataset_index=N to switch):") chosen_dataset_id = dataset.get("id") for i, alt in enumerate(alternates): if alt.get("id") == chosen_dataset_id: @@ -1899,14 +2990,10 @@ def _format_search_and_query( lines.append(f" dataset_id: {alt_id}") if alt_queryable: lines.append( - f" resource_id (queryable): " - f"{alt_queryable.get('id')}" + f" resource_id (queryable): {alt_queryable.get('id')}" ) else: - lines.append( - " (no datastore-loaded resource — " - "download-only)" - ) + lines.append(" (no datastore-loaded resource -- download-only)") return "\n".join(lines) @@ -1915,13 +3002,14 @@ def _format_sql_results( records: List[Dict[str, Any]], fields: List[Dict[str, Any]], effective_limit: Optional[int] = None, + is_passthrough: bool = False, ) -> str: """Format SQL query results for user display. Args: records: List of record dictionaries fields: List of field metadata dictionaries - effective_limit: The LIMIT clause that was actually executed — + effective_limit: The LIMIT clause that was actually executed -- either user-supplied or the enforced default. Used to detect truncation: if len(records) >= effective_limit, the result was almost certainly capped. @@ -1931,7 +3019,7 @@ def _format_sql_results( """ n_returned = len(records) - # Heuristic truncation detection — datastore_search_sql doesn't + # Heuristic truncation detection -- datastore_search_sql doesn't # return a "total"; the only signal is "did we hit our LIMIT?" truncation_block = "" if effective_limit is not None and n_returned >= effective_limit: @@ -1940,45 +3028,88 @@ def _format_sql_results( f"This SQL returned exactly the LIMIT ({effective_limit}) " "rows. The true total could not be determined from " "datastore_search_sql alone. For counting questions, do " - f"NOT report {n_returned} as the answer — instead run a " + f"NOT report {n_returned} as the answer -- instead run a " "separate SELECT COUNT(*) with the same WHERE clause, or " "use ckan__aggregate_data with metrics=" '{"count": "count(*)"}.\n' "========================" ) - if not records: - text = "No records found matching the SQL query." - return f"{truncation_block}\n\n{text}" if truncation_block else text + # SQL path: when len < limit, every returned row is the full + # result, so we know the true total and can fire the sample-size + # caveat. When len == limit, total is unknown and the truncation + # block above already warns the model not to infer counts. + sample_caveat = "" + if effective_limit is None or n_returned < effective_limit: + sample_caveat = self._format_sample_size_caveat(n_returned, unit="row") + stringly_caveat = self._format_stringly_typed_caveat(records, fields) + null_caveat = self._format_null_like_frequency_caveat(records, fields) + + # Civic-AI #13: execute_sql is a direct passthrough -- name it so + # the model knows the SQL it ran was something *it wrote*, not a + # canonical query. Suppressed for aggregate_data, where the SQL + # is built from validated structured input. + sql_passthrough = ( + ( + "[SQL PASSTHROUGH]\n" + "This response came from a direct SQL query. Confirm the SQL " + "matched the user's actual question -- the model wrote it.\n" + "[/SQL PASSTHROUGH]" + ) + if is_passthrough + else "" + ) - lines: List[str] = [] - if truncation_block: - lines.append(truncation_block) - lines.append("") + caveats = [ + c + for c in ( + sql_passthrough, + truncation_block, + sample_caveat, + stringly_caveat, + null_caveat, + ) + if c + ] + + parts: List[str] = [] + if caveats: + parts.append("## Caveats") + parts.extend(caveats) + parts.append("") + + parts.append("## Records") + if not records: + parts.append( + "No records matched the SQL query.\n" + "NOTE: zero rows does NOT mean zero data. Check the WHERE " + "clause column names against get_schema, verify date " + "ranges, and confirm filter values match the dataset's " + "vocabulary (case-sensitive)." + ) + return "\n".join(parts) - # Header — total is unknown for raw SQL, so show "X rows returned". if effective_limit is not None: - lines.append( + parts.append( f"{n_returned} rows returned (limit={effective_limit}, " - "true total unknown — see warning above if any).\n" + "true total unknown -- see warning above if any)." ) else: - lines.append(f"{n_returned} rows returned.\n") + parts.append(f"{n_returned} rows returned.") + parts.append("") - # Show field names if available if fields: field_names = [field.get("id", "unknown") for field in fields] - lines.append(f"Fields: {', '.join(field_names)}\n") + parts.append(f"Fields: {', '.join(field_names)}") + parts.append("") - # Show first few records as examples + field_types = self._field_types_by_id(fields) for i, record in enumerate(records[:10], 1): - lines.append(f"Record {i}:") - for key, value in record.items(): - if key != "_id": # Skip internal ID - lines.append(f" {key}: {value}") - lines.append("") + parts.append(f"Record {i}:") + parts.extend(self._format_record_lines(record, field_types)) + parts.append("") if n_returned > 10: - lines.append(f"... and {n_returned - 10} more record(s) returned") + parts.append(f"... and {n_returned - 10} more record(s) returned") - return "\n".join(lines) + return "\n".join(parts) diff --git a/terraform/aws/api_gateway.tf b/terraform/aws/api_gateway.tf index e35884f..195f664 100644 --- a/terraform/aws/api_gateway.tf +++ b/terraform/aws/api_gateway.tf @@ -144,6 +144,16 @@ resource "aws_api_gateway_deployment" "mcp_deployment" { ] } +# CloudWatch Log Group for API Gateway access logs +resource "aws_cloudwatch_log_group" "apigw_access_logs" { + name = "/aws/apigateway/${local.lambda_name}-access" + retention_in_days = 14 + + tags = { + Project = "mcp-server" + } +} + # API Gateway Stage resource "aws_api_gateway_stage" "prod" { deployment_id = aws_api_gateway_deployment.mcp_deployment.id @@ -151,6 +161,20 @@ resource "aws_api_gateway_stage" "prod" { stage_name = var.stage_name xray_tracing_enabled = true + + access_log_settings { + destination_arn = aws_cloudwatch_log_group.apigw_access_logs.arn + format = jsonencode({ + requestId = "$context.requestId" + ip = "$context.identity.sourceIp" + requestTime = "$context.requestTime" + httpMethod = "$context.httpMethod" + resourcePath = "$context.resourcePath" + status = "$context.status" + protocol = "$context.protocol" + responseLength = "$context.responseLength" + }) + } } # Method Settings: Throttling for all methods in stage (AWS format: */* not /*/*) diff --git a/terraform/aws/main.tf b/terraform/aws/main.tf index 2093d6a..640745a 100644 --- a/terraform/aws/main.tf +++ b/terraform/aws/main.tf @@ -105,4 +105,8 @@ resource "aws_lambda_function_url" "mcp_server_url" { resource "aws_cloudwatch_log_group" "lambda_logs" { name = "/aws/lambda/${local.lambda_name}" retention_in_days = 14 + + tags = { + Project = "mcp-server" + } } diff --git a/tests/test_ckan_plugin.py b/tests/test_ckan_plugin.py index 0e22285..86fc91e 100644 --- a/tests/test_ckan_plugin.py +++ b/tests/test_ckan_plugin.py @@ -495,9 +495,7 @@ async def test_execute_tool_execute_sql_succeeds(self, ckan_config): await plugin.initialize() result = await plugin.execute_tool( "execute_sql", - { - "sql": 'SELECT * FROM "11111111-2222-3333-4444-555555555555" LIMIT 1' - }, + {"sql": 'SELECT * FROM "11111111-2222-3333-4444-555555555555" LIMIT 1'}, ) assert result.success is True @@ -545,9 +543,7 @@ async def test_execute_tool_execute_sql_missing_param(self, ckan_config): assert "required" in result.error_message.lower() @pytest.mark.asyncio - async def test_execute_tool_search_datasets_surfaces_total_count( - self, ckan_config - ): + async def test_execute_tool_search_datasets_surfaces_total_count(self, ckan_config): """search_datasets reads CKAN's `count` and renders X-of-Y.""" plugin = CKANPlugin(ckan_config) @@ -609,8 +605,7 @@ async def test_execute_tool_execute_sql_truncated_warning(self, ckan_config): "execute_sql", { "sql": ( - 'SELECT * FROM "11111111-2222-3333-4444-555555555555" ' - "LIMIT 100" + 'SELECT * FROM "11111111-2222-3333-4444-555555555555" LIMIT 100' ) }, ) @@ -621,9 +616,7 @@ async def test_execute_tool_execute_sql_truncated_warning(self, ckan_config): assert "ckan__aggregate_data" in text or "COUNT(*)" in text @pytest.mark.asyncio - async def test_execute_tool_execute_sql_no_warning_under_limit( - self, ckan_config - ): + async def test_execute_tool_execute_sql_no_warning_under_limit(self, ckan_config): """execute_sql does not warn when fewer rows returned than LIMIT.""" plugin = CKANPlugin(ckan_config) @@ -650,8 +643,7 @@ async def test_execute_tool_execute_sql_no_warning_under_limit( "execute_sql", { "sql": ( - 'SELECT * FROM "11111111-2222-3333-4444-555555555555" ' - "LIMIT 100" + 'SELECT * FROM "11111111-2222-3333-4444-555555555555" LIMIT 100' ) }, ) @@ -851,9 +843,7 @@ async def test_execute_tool_search_and_query_skips_download_only_resources( mock_client_class.return_value = mock_client await plugin.initialize() - result = await plugin.execute_tool( - "search_and_query", {"query": "parks"} - ) + result = await plugin.execute_tool("search_and_query", {"query": "parks"}) assert result.success is True text = result.content[0]["text"] @@ -1068,9 +1058,7 @@ async def test_execute_tool_search_and_query_siblings_block_lists_archives( mock_client_class.return_value = mock_client await plugin.initialize() - result = await plugin.execute_tool( - "search_and_query", {"query": "311"} - ) + result = await plugin.execute_tool("search_and_query", {"query": "311"}) assert result.success is True text = result.content[0]["text"] @@ -1079,9 +1067,242 @@ async def test_execute_tool_search_and_query_siblings_block_lists_archives( assert "311 - 2024" in text # Only QUERYABLE siblings — the GeoJSON should not appear # in the siblings block - assert "GeoJSON" not in text.split( - "Other queryable resources in this dataset" - )[1] + assert ( + "GeoJSON" + not in text.split("Other queryable resources in this dataset")[1] + ) + + @pytest.mark.asyncio + async def test_search_and_query_emits_partial_warning_when_auto_picked( + self, ckan_config + ): + """When the model auto-picks a resource and queryable siblings + exist, the response must include a PARTIAL DATASET ANSWER block + — otherwise GPT-4o reads the one-resource count as the dataset + total. Regression test for: 'How many 311 requests in total?' + returning 9,790 (NEW SYSTEM) instead of walking 22 archives.""" + plugin = CKANPlugin(ckan_config) + + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + mock_response_init = Mock() + mock_response_init.json.return_value = {"success": True} + mock_response_init.raise_for_status = Mock() + mock_response_search = Mock() + mock_response_search.json.return_value = { + "result": { + "results": [ + { + "id": "ds-311", + "title": "311 Service Requests", + "resources": [ + { + "id": "new-uuid", + "name": "311 - NEW SYSTEM", + "format": "CSV", + "datastore_active": True, + }, + { + "id": "y2025", + "name": "311 - 2025", + "format": "CSV", + "datastore_active": True, + }, + { + "id": "y2024", + "name": "311 - 2024", + "format": "CSV", + "datastore_active": True, + }, + ], + } + ] + } + } + mock_response_search.raise_for_status = Mock() + mock_response_query = Mock() + mock_response_query.json.return_value = { + "result": { + "records": [{"_id": 1}], + "fields": [], + "total": 9790, + } + } + mock_response_query.raise_for_status = Mock() + mock_client.post = AsyncMock( + side_effect=[ + mock_response_init, + mock_response_search, + mock_response_query, + ] + ) + mock_client_class.return_value = mock_client + + await plugin.initialize() + result = await plugin.execute_tool("search_and_query", {"query": "311"}) + + assert result.success is True + text = result.content[0]["text"] + assert "PARTIAL DATASET ANSWER" in text + assert "311 - NEW SYSTEM" in text + assert "include_resource_totals=true" in text + + @pytest.mark.asyncio + async def test_search_and_query_no_partial_warning_when_resource_name( + self, ckan_config + ): + """When the model explicitly picks a resource via resource_name, + no PARTIAL warning — they got what they asked for.""" + plugin = CKANPlugin(ckan_config) + + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + mock_response_init = Mock() + mock_response_init.json.return_value = {"success": True} + mock_response_init.raise_for_status = Mock() + mock_response_search = Mock() + mock_response_search.json.return_value = { + "result": { + "results": [ + { + "id": "ds-311", + "title": "311", + "resources": [ + { + "id": "new", + "name": "NEW SYSTEM", + "format": "CSV", + "datastore_active": True, + }, + { + "id": "y2018", + "name": "311 - 2018", + "format": "CSV", + "datastore_active": True, + }, + ], + } + ] + } + } + mock_response_search.raise_for_status = Mock() + mock_response_query = Mock() + mock_response_query.json.return_value = { + "result": {"records": [{"_id": 1}], "fields": [], "total": 5} + } + mock_response_query.raise_for_status = Mock() + mock_client.post = AsyncMock( + side_effect=[ + mock_response_init, + mock_response_search, + mock_response_query, + ] + ) + mock_client_class.return_value = mock_client + + await plugin.initialize() + result = await plugin.execute_tool( + "search_and_query", + {"query": "311", "resource_name": "2018"}, + ) + + assert result.success is True + text = result.content[0]["text"] + assert "PARTIAL DATASET ANSWER" not in text + + @pytest.mark.asyncio + async def test_search_and_query_include_resource_totals_runs_parallel_counts( + self, ckan_config + ): + """include_resource_totals=true must run COUNT(*) against EVERY + queryable resource and surface a grand-total + per-resource + breakdown, so 'total across all years' resolves in one call.""" + plugin = CKANPlugin(ckan_config) + + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + mock_response_init = Mock() + mock_response_init.json.return_value = {"success": True} + mock_response_init.raise_for_status = Mock() + mock_response_search = Mock() + mock_response_search.json.return_value = { + "result": { + "results": [ + { + "id": "ds-311", + "title": "311", + "resources": [ + { + "id": "11111111-2222-3333-4444-555555555555", + "name": "NEW SYSTEM", + "format": "CSV", + "datastore_active": True, + }, + { + "id": "22222222-3333-4444-5555-666666666666", + "name": "311 - 2025", + "format": "CSV", + "datastore_active": True, + }, + { + "id": "33333333-4444-5555-6666-777777777777", + "name": "311 - 2024", + "format": "CSV", + "datastore_active": True, + }, + ], + } + ] + } + } + mock_response_search.raise_for_status = Mock() + # Main query (NEW SYSTEM) returns sample rows + mock_response_query = Mock() + mock_response_query.json.return_value = { + "result": { + "records": [{"_id": i} for i in range(10)], + "fields": [{"id": "_id", "type": "int"}], + "total": 9790, + } + } + mock_response_query.raise_for_status = Mock() + + # Three COUNT(*) calls — return totals for each archive + def make_count_response(n): + m = Mock() + m.json.return_value = {"result": {"records": [{"n": n}]}} + m.raise_for_status = Mock() + return m + + mock_client.post = AsyncMock( + side_effect=[ + mock_response_init, + mock_response_search, + mock_response_query, + make_count_response(9790), + make_count_response(267187), + make_count_response(282836), + ] + ) + mock_client_class.return_value = mock_client + + await plugin.initialize() + result = await plugin.execute_tool( + "search_and_query", + {"query": "311", "include_resource_totals": True}, + ) + + assert result.success is True + text = result.content[0]["text"] + # Per-resource breakdown rendered + assert "Per-resource totals" in text + assert "9790" in text + assert "267187" in text + assert "282836" in text + # Grand total = sum of all three + assert "GRAND TOTAL across 3 resources: 559813" in text + # Three COUNT(*) calls beyond init + search + main query + assert mock_client.post.call_count == 6 @pytest.mark.asyncio async def test_execute_tool_search_and_query_walks_to_next_dataset( @@ -1164,6 +1385,20 @@ async def test_execute_tool_query_data_with_where_uses_sql_endpoint( mock_response_init = Mock() mock_response_init.json.return_value = {"success": True} mock_response_init.raise_for_status = Mock() + # Pre-flight field-name validation fetches the schema before the + # SQL call; mock that response with the same fields the SQL call + # would expose. + mock_response_schema = Mock() + mock_response_schema.json.return_value = { + "result": { + "fields": [ + {"id": "case_id", "type": "text"}, + {"id": "close_date", "type": "timestamp"}, + {"id": "case_status", "type": "text"}, + ], + } + } + mock_response_schema.raise_for_status = Mock() mock_response_sql = Mock() mock_response_sql.json.return_value = { "result": { @@ -1179,7 +1414,11 @@ async def test_execute_tool_query_data_with_where_uses_sql_endpoint( } mock_response_sql.raise_for_status = Mock() mock_client.post = AsyncMock( - side_effect=[mock_response_init, mock_response_sql] + side_effect=[ + mock_response_init, + mock_response_schema, + mock_response_sql, + ] ) mock_client_class.return_value = mock_client @@ -1205,24 +1444,27 @@ async def test_execute_tool_query_data_with_where_uses_sql_endpoint( # Schema footer surfaces filterable columns assert "Filterable columns" in text assert "close_date" in text - # Verify the second POST hit datastore_search_sql with a SQL - # body containing the expected WHERE clause. - second_call = mock_client.post.call_args_list[1] - assert second_call[0][0] == "/api/3/action/datastore_search_sql" - sql = second_call[1]["json"]["sql"] - assert ( - 'FROM "11111111-2222-3333-4444-555555555555"' in sql - ) - assert '"close_date" >= \'2026-04-29\'' in sql - assert '"close_date" < \'2026-04-30\'' in sql - assert '"case_status" = \'Closed\'' in sql + # Verify the SQL POST (3rd call: init, schema preflight, SQL) + # hit datastore_search_sql with the expected WHERE clause. + sql_call = mock_client.post.call_args_list[2] + assert sql_call[0][0] == "/api/3/action/datastore_search_sql" + sql = sql_call[1]["json"]["sql"] + assert 'FROM "11111111-2222-3333-4444-555555555555"' in sql + assert "\"close_date\" >= '2026-04-29'" in sql + assert "\"close_date\" < '2026-04-30'" in sql + assert "\"case_status\" = 'Closed'" in sql assert "LIMIT 5" in sql @pytest.mark.asyncio async def test_execute_tool_query_data_where_validation_error_surfaces( self, ckan_config ): - """A bad `where` operator returns a clean error — no API call.""" + """A bad `where` operator returns a clean error — no SQL call. + + Pre-flight field-name validation may also hit /datastore_search to + fetch the schema, so we only require that the SQL endpoint + (/datastore_search_sql) is never reached. + """ plugin = CKANPlugin(ckan_config) with patch("httpx.AsyncClient") as mock_client_class: @@ -1244,8 +1486,10 @@ async def test_execute_tool_query_data_where_validation_error_surfaces( assert result.success is False assert "Unknown operator" in (result.error_message or "") - # Only the init POST should have happened — no SQL call. - assert mock_client.post.call_count == 1 + # SQL endpoint must NOT have been called — the error fires + # before SQL is built. + for call in mock_client.post.call_args_list: + assert call[0][0] != "/api/3/action/datastore_search_sql" @pytest.mark.asyncio async def test_execute_tool_query_data_schema_footer_in_normal_path( @@ -1288,9 +1532,7 @@ async def test_execute_tool_query_data_schema_footer_in_normal_path( assert "z (int)" in text @pytest.mark.asyncio - async def test_query_data_surfaces_total_from_datastore_search( - self, ckan_config - ): + async def test_query_data_surfaces_total_from_datastore_search(self, ckan_config): """When CKAN returns `total`, format prefers total_matching_rows over returned_rows.""" plugin = CKANPlugin(ckan_config) @@ -1331,9 +1573,7 @@ async def test_query_data_surfaces_total_from_datastore_search( assert "ckan__aggregate_data" in text @pytest.mark.asyncio - async def test_query_data_no_truncation_warning_when_under_limit( - self, ckan_config - ): + async def test_query_data_no_truncation_warning_when_under_limit(self, ckan_config): """When records returned < limit, no truncation warning shown.""" plugin = CKANPlugin(ckan_config) @@ -1384,13 +1624,22 @@ async def test_query_data_where_path_does_count_followup_when_truncated( mock_response_init = Mock() mock_response_init.json.return_value = {"success": True} mock_response_init.raise_for_status = Mock() + # Pre-flight schema fetch (civic-AI field-name validation) + mock_response_schema = Mock() + mock_response_schema.json.return_value = { + "result": { + "fields": [ + {"id": "case_id", "type": "text"}, + {"id": "closed_dt", "type": "timestamp"}, + ], + } + } + mock_response_schema.raise_for_status = Mock() # First SQL call: SELECT * returns exactly limit rows mock_response_select = Mock() mock_response_select.json.return_value = { "result": { - "records": [ - {"_id": i, "case_id": f"c{i}"} for i in range(100) - ], + "records": [{"_id": i, "case_id": f"c{i}"} for i in range(100)], "fields": [ {"id": "case_id", "type": "text"}, {"id": "closed_dt", "type": "timestamp"}, @@ -1407,6 +1656,7 @@ async def test_query_data_where_path_does_count_followup_when_truncated( mock_client.post = AsyncMock( side_effect=[ mock_response_init, + mock_response_schema, mock_response_select, mock_response_count, ] @@ -1433,17 +1683,16 @@ async def test_query_data_where_path_does_count_followup_when_truncated( assert "100 of 531" in text assert "TRUNCATED" in text assert "the answer is 531, NOT 100" in text - # Follow-up COUNT(*) actually issued - assert mock_client.post.call_count == 3 - count_call = mock_client.post.call_args_list[2] + # Follow-up COUNT(*) is the last call; init + schema + SQL + + # count = 4 total. + assert mock_client.post.call_count == 4 + count_call = mock_client.post.call_args_list[3] count_sql = count_call[1]["json"]["sql"] assert "COUNT(*)" in count_sql - assert '"closed_dt" >= \'2016-04-29\'' in count_sql + assert "\"closed_dt\" >= '2016-04-29'" in count_sql @pytest.mark.asyncio - async def test_query_data_where_path_no_count_when_under_limit( - self, ckan_config - ): + async def test_query_data_where_path_no_count_when_under_limit(self, ckan_config): """SQL path: if records returned < limit we already know the total — no extra COUNT(*) call should fire.""" plugin = CKANPlugin(ckan_config) @@ -1453,6 +1702,12 @@ async def test_query_data_where_path_no_count_when_under_limit( mock_response_init = Mock() mock_response_init.json.return_value = {"success": True} mock_response_init.raise_for_status = Mock() + # Pre-flight schema (empty fields is fine — validator returns None) + mock_response_schema = Mock() + mock_response_schema.json.return_value = { + "result": {"fields": [{"id": "x", "type": "int"}]} + } + mock_response_schema.raise_for_status = Mock() mock_response_select = Mock() mock_response_select.json.return_value = { "result": { @@ -1462,7 +1717,11 @@ async def test_query_data_where_path_no_count_when_under_limit( } mock_response_select.raise_for_status = Mock() mock_client.post = AsyncMock( - side_effect=[mock_response_init, mock_response_select] + side_effect=[ + mock_response_init, + mock_response_schema, + mock_response_select, + ] ) mock_client_class.return_value = mock_client @@ -1480,8 +1739,15 @@ async def test_query_data_where_path_no_count_when_under_limit( text = result.content[0]["text"] assert "85 rows returned" in text assert "TRUNCATED" not in text - # init + SELECT only, no COUNT(*) - assert mock_client.post.call_count == 2 + # init + schema + SELECT only, no COUNT(*) + assert mock_client.post.call_count == 3 + # And the SQL endpoint was called exactly once (just the SELECT). + sql_calls = [ + c + for c in mock_client.post.call_args_list + if c[0][0] == "/api/3/action/datastore_search_sql" + ] + assert len(sql_calls) == 1 @pytest.mark.asyncio async def test_query_data_where_path_count_failure_falls_back_to_warning( @@ -1496,6 +1762,11 @@ async def test_query_data_where_path_count_failure_falls_back_to_warning( mock_response_init = Mock() mock_response_init.json.return_value = {"success": True} mock_response_init.raise_for_status = Mock() + mock_response_schema = Mock() + mock_response_schema.json.return_value = { + "result": {"fields": [{"id": "x", "type": "int"}]} + } + mock_response_schema.raise_for_status = Mock() mock_response_select = Mock() mock_response_select.json.return_value = { "result": { @@ -1514,6 +1785,7 @@ async def test_query_data_where_path_count_failure_falls_back_to_warning( mock_client.post = AsyncMock( side_effect=[ mock_response_init, + mock_response_schema, mock_response_select, mock_response_count_fail, ] @@ -1629,8 +1901,7 @@ async def test_execute_sql_returns_error_when_ckan_body_has_success_false( "success": False, "error": { "message": ( - 'relation "11111111-2222-3333-4444-555555555555" ' - "does not exist" + 'relation "11111111-2222-3333-4444-555555555555" does not exist' ) }, } @@ -1645,8 +1916,7 @@ async def test_execute_sql_returns_error_when_ckan_body_has_success_false( "execute_sql", { "sql": ( - 'SELECT * FROM ' - '"11111111-2222-3333-4444-555555555555" LIMIT 1' + 'SELECT * FROM "11111111-2222-3333-4444-555555555555" LIMIT 1' ) }, ) @@ -1670,6 +1940,11 @@ async def test_aggregate_data_returns_error_when_ckan_body_has_success_false( mock_response_init = Mock() mock_response_init.json.return_value = {"success": True} mock_response_init.raise_for_status = Mock() + # Pre-flight schema fetch (civic-AI field-name validation in + # aggregate_data validates group_by/metrics/filters names). + mock_response_schema = Mock() + mock_response_schema.json.return_value = {"result": {"fields": []}} + mock_response_schema.raise_for_status = Mock() mock_response_sql = Mock() mock_response_sql.json.return_value = { "success": False, @@ -1677,7 +1952,11 @@ async def test_aggregate_data_returns_error_when_ckan_body_has_success_false( } mock_response_sql.raise_for_status = Mock() mock_client.post = AsyncMock( - side_effect=[mock_response_init, mock_response_sql] + side_effect=[ + mock_response_init, + mock_response_schema, + mock_response_sql, + ] ) mock_client_class.return_value = mock_client @@ -1860,3 +2139,1613 @@ async def test_retry_on_transient_error(self, ckan_config): except Exception: # If retry fails, exception is raised pass + + +class TestAbandonmentDetector: + """Civic-AI #5/#6: APPARENT ABANDONMENT + NO UPDATE CADENCE DECLARED.""" + + @pytest.fixture + def ckan_config(self): + return { + "base_url": "https://data.example.com", + "portal_url": "https://data.example.com", + "city_name": "TestCity", + } + + def _build_mocks(self, dataset_dict): + init = Mock() + init.json.return_value = {"success": True} + init.raise_for_status = Mock() + pkg = Mock() + pkg.json.return_value = {"result": dataset_dict} + pkg.raise_for_status = Mock() + return init, pkg + + @pytest.mark.asyncio + async def test_abandonment_fires_when_weekly_dataset_months_stale( + self, ckan_config + ): + """Dataset declares weekly updates but resource last_modified is + over a year old -- well past 4x the 7-day cadence.""" + plugin = CKANPlugin(ckan_config) + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + init, pkg = self._build_mocks( + { + "id": "stale-weekly", + "name": "stale-weekly", + "title": "Stale Weekly Dataset", + "metadata_modified": "2026-05-01T00:00:00", + "frequency": "weekly", + "organization": {"title": "Test"}, + "resources": [ + { + "id": "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa", + "name": "data", + "datastore_active": True, + "last_modified": "2023-01-01T00:00:00", + } + ], + } + ) + mock_client.post = AsyncMock(side_effect=[init, pkg]) + mock_client_class.return_value = mock_client + + await plugin.initialize() + result = await plugin.execute_tool( + "get_dataset", {"dataset_id": "stale-weekly"} + ) + + assert result.success is True + text = result.content[0]["text"] + assert "APPARENT ABANDONMENT" in text + assert "'weekly'" in text + # Dual timestamps must both appear + assert "Data last updated: 2023-01-01" in text + assert "Metadata last touched: 2026-05-01" in text + + @pytest.mark.asyncio + async def test_abandonment_silent_when_within_cadence(self, ckan_config): + """Weekly dataset modified within the last week -- abandonment + must stay silent. False alarms erode trust faster than silences.""" + plugin = CKANPlugin(ckan_config) + from datetime import datetime, timezone, timedelta + + recent = (datetime.now(timezone.utc) - timedelta(days=3)).strftime( + "%Y-%m-%dT%H:%M:%S" + ) + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + init, pkg = self._build_mocks( + { + "id": "live-weekly", + "name": "live-weekly", + "title": "Live Weekly Dataset", + "metadata_modified": recent, + "frequency": "weekly", + "organization": {"title": "Test"}, + "resources": [ + { + "id": "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa", + "name": "data", + "datastore_active": True, + "last_modified": recent, + } + ], + } + ) + mock_client.post = AsyncMock(side_effect=[init, pkg]) + mock_client_class.return_value = mock_client + + await plugin.initialize() + result = await plugin.execute_tool( + "get_dataset", {"dataset_id": "live-weekly"} + ) + + assert result.success is True + text = result.content[0]["text"] + assert "APPARENT ABANDONMENT" not in text + assert "DATA FRESHNESS" not in text # recent enough + + @pytest.mark.asyncio + async def test_no_frequency_note_fires_on_old_undeclared_dataset(self, ckan_config): + """No frequency declared + resource > 2yr old -> the softer + 'cannot tell if current' note.""" + plugin = CKANPlugin(ckan_config) + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + init, pkg = self._build_mocks( + { + "id": "no-freq", + "name": "no-freq", + "title": "Undeclared Dataset", + # frequency intentionally omitted + "organization": {"title": "Test"}, + "resources": [ + { + "id": "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa", + "name": "data", + "datastore_active": True, + "last_modified": "2020-01-01T00:00:00", + } + ], + } + ) + mock_client.post = AsyncMock(side_effect=[init, pkg]) + mock_client_class.return_value = mock_client + + await plugin.initialize() + result = await plugin.execute_tool("get_dataset", {"dataset_id": "no-freq"}) + + assert result.success is True + text = result.content[0]["text"] + assert "NO UPDATE CADENCE DECLARED" in text + # And the abandonment banner must NOT fire (no cadence to + # measure against) + assert "APPARENT ABANDONMENT" not in text + + +class TestStringlyTypedDetection: + """Civic-AI #9: TEXT columns holding dates/numbers.""" + + @pytest.fixture + def ckan_config(self): + return { + "base_url": "https://data.example.com", + "portal_url": "https://data.example.com", + "city_name": "TestCity", + } + + @pytest.mark.asyncio + async def test_text_column_with_iso_dates_fires_type_note(self, ckan_config): + plugin = CKANPlugin(ckan_config) + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + init = Mock() + init.json.return_value = {"success": True} + init.raise_for_status = Mock() + query = Mock() + query.json.return_value = { + "result": { + "records": [ + {"_id": 1, "open_dt": "2024-06-15"}, + {"_id": 2, "open_dt": "2024-06-16"}, + {"_id": 3, "open_dt": "2024-06-17"}, + {"_id": 4, "open_dt": "2024-06-18"}, + ], + "fields": [{"id": "open_dt", "type": "text"}], + "total": 4, + } + } + query.raise_for_status = Mock() + mock_client.post = AsyncMock(side_effect=[init, query]) + mock_client_class.return_value = mock_client + + await plugin.initialize() + result = await plugin.execute_tool( + "query_data", + {"resource_id": "11111111-2222-3333-4444-555555555555"}, + ) + + assert result.success is True + text = result.content[0]["text"] + assert "STRINGLY-TYPED FIELDS" in text + assert "'open_dt' is stored as TEXT but values look like dates" in text + + @pytest.mark.asyncio + async def test_text_column_with_real_text_stays_silent(self, ckan_config): + plugin = CKANPlugin(ckan_config) + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + init = Mock() + init.json.return_value = {"success": True} + init.raise_for_status = Mock() + query = Mock() + query.json.return_value = { + "result": { + "records": [ + {"_id": 1, "name": "Alice"}, + {"_id": 2, "name": "Bob"}, + {"_id": 3, "name": "Carol"}, + {"_id": 4, "name": "Dan"}, + ], + "fields": [{"id": "name", "type": "text"}], + "total": 4, + } + } + query.raise_for_status = Mock() + mock_client.post = AsyncMock(side_effect=[init, query]) + mock_client_class.return_value = mock_client + + await plugin.initialize() + result = await plugin.execute_tool( + "query_data", + {"resource_id": "11111111-2222-3333-4444-555555555555"}, + ) + + assert result.success is True + text = result.content[0]["text"] + assert "STRINGLY-TYPED FIELDS" not in text + + @pytest.mark.asyncio + async def test_text_column_with_numbers_fires_type_note(self, ckan_config): + plugin = CKANPlugin(ckan_config) + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + init = Mock() + init.json.return_value = {"success": True} + init.raise_for_status = Mock() + query = Mock() + query.json.return_value = { + "result": { + "records": [ + {"_id": 1, "amount": "10"}, + {"_id": 2, "amount": "42"}, + {"_id": 3, "amount": "100"}, + {"_id": 4, "amount": "3"}, + ], + "fields": [{"id": "amount", "type": "text"}], + "total": 4, + } + } + query.raise_for_status = Mock() + mock_client.post = AsyncMock(side_effect=[init, query]) + mock_client_class.return_value = mock_client + + await plugin.initialize() + result = await plugin.execute_tool( + "query_data", + {"resource_id": "11111111-2222-3333-4444-555555555555"}, + ) + + assert result.success is True + text = result.content[0]["text"] + assert "STRINGLY-TYPED FIELDS" in text + assert "look like numbers" in text + + +class TestNullLikeNormalization: + """Civic-AI #10/#11: null-like rendering + DATA QUALITY frequency.""" + + @pytest.fixture + def ckan_config(self): + return { + "base_url": "https://data.example.com", + "portal_url": "https://data.example.com", + "city_name": "TestCity", + } + + @pytest.mark.asyncio + async def test_null_like_strings_rendered_distinctly(self, ckan_config): + """'Unknown' / 'N/A' / '' / None all render distinctly so the + model can't treat them as ordinary categories.""" + plugin = CKANPlugin(ckan_config) + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + init = Mock() + init.json.return_value = {"success": True} + init.raise_for_status = Mock() + query = Mock() + query.json.return_value = { + "result": { + "records": [ + {"_id": 1, "status": "Unknown"}, + {"_id": 2, "status": "N/A"}, + {"_id": 3, "status": ""}, + {"_id": 4, "status": None}, + {"_id": 5, "status": "Open"}, + ], + "fields": [{"id": "status", "type": "text"}], + "total": 5, + } + } + query.raise_for_status = Mock() + mock_client.post = AsyncMock(side_effect=[init, query]) + mock_client_class.return_value = mock_client + + await plugin.initialize() + result = await plugin.execute_tool( + "query_data", + {"resource_id": "11111111-2222-3333-4444-555555555555"}, + ) + + assert result.success is True + text = result.content[0]["text"] + assert 'status: <"Unknown">' in text + assert 'status: <"N/A">' in text + assert "status: " in text + assert "status: " in text + assert "status: Open" in text # real value rendered as-is + + @pytest.mark.asyncio + async def test_high_missing_rate_fires_data_quality_caveat(self, ckan_config): + """When > 20% of a column's values are null-like, DATA QUALITY + caveat fires naming the column and the missing-rate.""" + plugin = CKANPlugin(ckan_config) + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + init = Mock() + init.json.return_value = {"success": True} + init.raise_for_status = Mock() + query = Mock() + query.json.return_value = { + "result": { + "records": [ + {"_id": 1, "assigned_to": "Unknown"}, + {"_id": 2, "assigned_to": "Unknown"}, + {"_id": 3, "assigned_to": "N/A"}, + {"_id": 4, "assigned_to": "Alice"}, + {"_id": 5, "assigned_to": "Bob"}, + ], + "fields": [{"id": "assigned_to", "type": "text"}], + "total": 5, + } + } + query.raise_for_status = Mock() + mock_client.post = AsyncMock(side_effect=[init, query]) + mock_client_class.return_value = mock_client + + await plugin.initialize() + result = await plugin.execute_tool( + "query_data", + {"resource_id": "11111111-2222-3333-4444-555555555555"}, + ) + + assert result.success is True + text = result.content[0]["text"] + assert "DATA QUALITY" in text + assert "'assigned_to'" in text + assert "60%" in text # 3 of 5 + + @pytest.mark.asyncio + async def test_low_missing_rate_silent(self, ckan_config): + """One missing value out of many should not fire DATA QUALITY.""" + plugin = CKANPlugin(ckan_config) + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + init = Mock() + init.json.return_value = {"success": True} + init.raise_for_status = Mock() + query = Mock() + query.json.return_value = { + "result": { + "records": [ + {"_id": i, "assigned_to": f"user_{i}"} for i in range(20) + ] + + [{"_id": 21, "assigned_to": "Unknown"}], + "fields": [{"id": "assigned_to", "type": "text"}], + "total": 21, + } + } + query.raise_for_status = Mock() + mock_client.post = AsyncMock(side_effect=[init, query]) + mock_client_class.return_value = mock_client + + await plugin.initialize() + result = await plugin.execute_tool( + "query_data", + {"resource_id": "11111111-2222-3333-4444-555555555555"}, + ) + + assert result.success is True + text = result.content[0]["text"] + assert "DATA QUALITY" not in text + + +class TestSqlPassthroughCaveat: + """Civic-AI #13: SQL PASSTHROUGH warning on execute_sql only.""" + + @pytest.fixture + def ckan_config(self): + return { + "base_url": "https://data.example.com", + "portal_url": "https://data.example.com", + "city_name": "TestCity", + } + + @pytest.mark.asyncio + async def test_execute_sql_carries_passthrough_warning(self, ckan_config): + plugin = CKANPlugin(ckan_config) + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + init = Mock() + init.json.return_value = {"success": True} + init.raise_for_status = Mock() + sql_resp = Mock() + sql_resp.json.return_value = { + "result": { + "records": [{"_id": 1, "n": 42}], + "fields": [{"id": "n", "type": "int4"}], + } + } + sql_resp.raise_for_status = Mock() + mock_client.post = AsyncMock(side_effect=[init, sql_resp]) + mock_client_class.return_value = mock_client + + await plugin.initialize() + result = await plugin.execute_tool( + "execute_sql", + { + "sql": ( + 'SELECT * FROM "11111111-2222-3333-4444-555555555555" LIMIT 1' + ) + }, + ) + + assert result.success is True + text = result.content[0]["text"] + assert "SQL PASSTHROUGH" in text + assert "the model wrote it" in text + + @pytest.mark.asyncio + async def test_aggregate_data_does_not_carry_passthrough_warning(self, ckan_config): + """aggregate_data builds SQL from validated parts -- not a + passthrough, so the warning must stay silent.""" + plugin = CKANPlugin(ckan_config) + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + init = Mock() + init.json.return_value = {"success": True} + init.raise_for_status = Mock() + schema = Mock() + schema.json.return_value = { + "result": {"fields": [{"id": "neighborhood", "type": "text"}]} + } + schema.raise_for_status = Mock() + agg_resp = Mock() + agg_resp.json.return_value = { + "result": { + "records": [{"neighborhood": "Allston", "n": 100}], + "fields": [ + {"id": "neighborhood", "type": "text"}, + {"id": "n", "type": "int4"}, + ], + } + } + agg_resp.raise_for_status = Mock() + mock_client.post = AsyncMock(side_effect=[init, schema, agg_resp]) + mock_client_class.return_value = mock_client + + await plugin.initialize() + result = await plugin.execute_tool( + "aggregate_data", + { + "resource_id": "11111111-2222-3333-4444-555555555555", + "group_by": ["neighborhood"], + "metrics": {"n": "count(*)"}, + }, + ) + + assert result.success is True + text = result.content[0]["text"] + assert "SQL PASSTHROUGH" not in text + + +class TestSearchAmbiguityCaveat: + """Civic-AI #14: warn when multiple plausible matches surface.""" + + @pytest.fixture + def ckan_config(self): + return { + "base_url": "https://data.example.com", + "portal_url": "https://data.example.com", + "city_name": "TestCity", + } + + @pytest.mark.asyncio + async def test_ambiguity_fires_with_overlapping_titles(self, ckan_config): + plugin = CKANPlugin(ckan_config) + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + init = Mock() + init.json.return_value = {"success": True} + init.raise_for_status = Mock() + search = Mock() + search.json.return_value = { + "result": { + "count": 3, + "results": [ + { + "id": "1", + "title": "Crime Incident Reports", + "resources": [], + }, + { + "id": "2", + "title": "Crime Stats Summary", + "resources": [], + }, + { + "id": "3", + "title": "Crime Mapping Data", + "resources": [], + }, + ], + } + } + search.raise_for_status = Mock() + mock_client.post = AsyncMock(side_effect=[init, search]) + mock_client_class.return_value = mock_client + + await plugin.initialize() + result = await plugin.execute_tool( + "search_datasets", {"query": "crime", "limit": 5} + ) + + assert result.success is True + text = result.content[0]["text"] + assert "AMBIGUOUS SEARCH" in text + assert "Crime Stats Summary" in text + + @pytest.mark.asyncio + async def test_ambiguity_silent_with_unrelated_topics(self, ckan_config): + """When the top result and others share no title tokens, no + ambiguity warning.""" + plugin = CKANPlugin(ckan_config) + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + init = Mock() + init.json.return_value = {"success": True} + init.raise_for_status = Mock() + search = Mock() + search.json.return_value = { + "result": { + "count": 2, + "results": [ + { + "id": "1", + "title": "Building Permits", + "resources": [], + }, + { + "id": "2", + "title": "Tree Census", + "resources": [], + }, + ], + } + } + search.raise_for_status = Mock() + mock_client.post = AsyncMock(side_effect=[init, search]) + mock_client_class.return_value = mock_client + + await plugin.initialize() + result = await plugin.execute_tool( + "search_datasets", {"query": "city data", "limit": 5} + ) + + assert result.success is True + text = result.content[0]["text"] + assert "AMBIGUOUS SEARCH" not in text + + +class TestAsciiOnlyOutput: + """Copilot C1: every formatter output must be ASCII-only.""" + + @pytest.fixture + def ckan_config(self): + return { + "base_url": "https://data.example.com", + "portal_url": "https://data.example.com", + "city_name": "TestCity", + } + + @pytest.mark.asyncio + async def test_search_and_query_output_is_pure_ascii(self, ckan_config): + plugin = CKANPlugin(ckan_config) + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + init = Mock() + init.json.return_value = {"success": True} + init.raise_for_status = Mock() + search = Mock() + search.json.return_value = { + "result": { + "count": 1, + "results": [ + { + "id": "x", + "name": "x", + "title": "X", + "metadata_modified": "2026-05-01T00:00:00", + "resources": [ + { + "id": "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa", + "name": "y", + "datastore_active": True, + "last_modified": "2023-01-01T00:00:00", + } + ], + } + ], + } + } + search.raise_for_status = Mock() + query = Mock() + query.json.return_value = { + "result": { + "records": [{"_id": 1, "v": "ok"}], + "fields": [{"id": "v", "type": "text"}], + "total": 1, + } + } + query.raise_for_status = Mock() + mock_client.post = AsyncMock(side_effect=[init, search, query]) + mock_client_class.return_value = mock_client + + await plugin.initialize() + result = await plugin.execute_tool( + "search_and_query", {"query": "x", "limit": 5} + ) + + assert result.success is True + text = result.content[0]["text"] + # Every character must be ASCII -- Copilot has dropped + # non-ASCII glyphs into '?' / boxes in production. + assert text.isascii(), "non-ASCII characters in output: " + repr( + [c for c in text if not c.isascii()][:10] + ) + + +class TestZeroRecordsAbsenceOfEvidence: + """Copilot C11: empty responses must spell out that absence is not + evidence of absence.""" + + @pytest.fixture + def ckan_config(self): + return { + "base_url": "https://data.example.com", + "portal_url": "https://data.example.com", + "city_name": "TestCity", + } + + @pytest.mark.asyncio + async def test_empty_query_response_includes_no_evidence_note(self, ckan_config): + plugin = CKANPlugin(ckan_config) + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + init = Mock() + init.json.return_value = {"success": True} + init.raise_for_status = Mock() + query = Mock() + query.json.return_value = { + "result": { + "records": [], + "fields": [{"id": "name", "type": "text"}], + "total": 0, + } + } + query.raise_for_status = Mock() + mock_client.post = AsyncMock(side_effect=[init, query]) + mock_client_class.return_value = mock_client + + await plugin.initialize() + result = await plugin.execute_tool( + "query_data", + {"resource_id": "11111111-2222-3333-4444-555555555555"}, + ) + + assert result.success is True + text = result.content[0]["text"] + assert "zero records does NOT mean zero data" in text + + +class TestProseRemindersForCriticalCaveats: + """Copilot A3: critical caveats must also appear as bottom-of-response + prose reminders so GPT-4o doesn't drop the structured marker.""" + + @pytest.fixture + def ckan_config(self): + return { + "base_url": "https://data.example.com", + "portal_url": "https://data.example.com", + "city_name": "TestCity", + } + + @pytest.mark.asyncio + async def test_single_record_emits_prose_reminder(self, ckan_config): + plugin = CKANPlugin(ckan_config) + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + init = Mock() + init.json.return_value = {"success": True} + init.raise_for_status = Mock() + query = Mock() + query.json.return_value = { + "result": { + "records": [{"_id": 1, "v": "only-one"}], + "fields": [{"id": "v", "type": "text"}], + "total": 1, + } + } + query.raise_for_status = Mock() + mock_client.post = AsyncMock(side_effect=[init, query]) + mock_client_class.return_value = mock_client + + await plugin.initialize() + result = await plugin.execute_tool( + "query_data", + {"resource_id": "11111111-2222-3333-4444-555555555555"}, + ) + + text = result.content[0]["text"] + assert "SINGLE-RECORD CLAIM" in text + # Prose reminder must also appear (Copilot A3) + assert "(Reminder: only ONE record matched" in text + # And the prose reminder appears AFTER the structured banner + assert text.index("SINGLE-RECORD CLAIM") < text.index( + "(Reminder: only ONE record" + ) + + @pytest.mark.asyncio + async def test_abandonment_emits_prose_reminder(self, ckan_config): + plugin = CKANPlugin(ckan_config) + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + init = Mock() + init.json.return_value = {"success": True} + init.raise_for_status = Mock() + pkg = Mock() + pkg.json.return_value = { + "result": { + "id": "stale", + "name": "stale", + "title": "Stale", + "metadata_modified": "2026-05-01T00:00:00", + "frequency": "weekly", + "organization": {"title": "T"}, + "resources": [ + { + "id": "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa", + "datastore_active": True, + "last_modified": "2023-01-01T00:00:00", + } + ], + } + } + pkg.raise_for_status = Mock() + mock_client.post = AsyncMock(side_effect=[init, pkg]) + mock_client_class.return_value = mock_client + + await plugin.initialize() + result = await plugin.execute_tool("get_dataset", {"dataset_id": "stale"}) + + text = result.content[0]["text"] + assert "APPARENT ABANDONMENT" in text + assert "(Reminder: this dataset shows APPARENT ABANDONMENT" in text + + @pytest.mark.asyncio + async def test_sql_passthrough_emits_prose_reminder(self, ckan_config): + plugin = CKANPlugin(ckan_config) + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + init = Mock() + init.json.return_value = {"success": True} + init.raise_for_status = Mock() + sql_resp = Mock() + sql_resp.json.return_value = { + "result": { + "records": [{"_id": 1, "n": 5}], + "fields": [{"id": "n", "type": "int4"}], + } + } + sql_resp.raise_for_status = Mock() + mock_client.post = AsyncMock(side_effect=[init, sql_resp]) + mock_client_class.return_value = mock_client + + await plugin.initialize() + result = await plugin.execute_tool( + "execute_sql", + { + "sql": ( + 'SELECT * FROM "11111111-2222-3333-4444-555555555555" LIMIT 1' + ) + }, + ) + + text = result.content[0]["text"] + assert "SQL PASSTHROUGH" in text + assert "(Reminder: the SQL above was written by the model" in text + + @pytest.mark.asyncio + async def test_no_prose_reminders_when_no_critical_caveats(self, ckan_config): + """Stays silent when nothing critical fired.""" + plugin = CKANPlugin(ckan_config) + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + init = Mock() + init.json.return_value = {"success": True} + init.raise_for_status = Mock() + query = Mock() + query.json.return_value = { + "result": { + "records": [{"_id": i, "v": f"r{i}"} for i in range(50)], + "fields": [{"id": "v", "type": "text"}], + "total": 50, + } + } + query.raise_for_status = Mock() + mock_client.post = AsyncMock(side_effect=[init, query]) + mock_client_class.return_value = mock_client + + await plugin.initialize() + result = await plugin.execute_tool( + "query_data", + {"resource_id": "11111111-2222-3333-4444-555555555555"}, + ) + + text = result.content[0]["text"] + assert "(Reminder:" not in text + + +class TestErrorMessageShape: + """Copilot C3/C4 + D4: error messages stay ASCII, don't carry + magic-string templates, and push the model back toward discovery.""" + + @pytest.fixture + def ckan_config(self): + return { + "base_url": "https://data.example.com", + "portal_url": "https://data.example.com", + "city_name": "TestCity", + } + + def test_no_fake_uuid_example_in_any_tool_description(self, ckan_config): + """The fake UUID pattern was once present in query_data's + resource_id description; GPT-4o can parrot it back as an + invented ID. Make sure it's gone from every description.""" + plugin = CKANPlugin(ckan_config) + for tool in plugin.get_tools(): + blob = repr(tool.input_schema) + tool.description + assert "11111111-2222-3333-4444-555555555555" not in blob, ( + f"Tool {tool.name!r} still contains the worked-example " + "UUID; GPT-4o may parrot this back as a real ID." + ) + + def test_typo_error_routes_to_discovery_not_template(self, ckan_config): + """The 'did you mean' error must suggest a REAL field from the + actual schema (not a synthetic template) and point at + discovery tools.""" + import asyncio + + plugin = CKANPlugin(ckan_config) + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + init = Mock() + init.json.return_value = {"success": True} + init.raise_for_status = Mock() + schema = Mock() + schema.json.return_value = { + "result": { + "fields": [ + {"id": "case_status", "type": "text"}, + {"id": "case_id", "type": "text"}, + ] + } + } + schema.raise_for_status = Mock() + mock_client.post = AsyncMock(side_effect=[init, schema]) + mock_client_class.return_value = mock_client + + async def run(): + await plugin.initialize() + return await plugin.execute_tool( + "query_data", + { + "resource_id": "11111111-2222-3333-4444-555555555555", + "filters": {"case_staus": "Closed"}, + }, + ) + + result = asyncio.run(run()) + assert result.success is False + msg = result.error_message or "" + # Must contain the real schema column, not a synthetic template + assert "case_status" in msg + assert "did you mean" in msg + # And the suggestion is a REAL column (matches the mocked schema) + assert "Valid columns: case_id, case_status" in msg or ( + "Valid columns: case_status, case_id" in msg + ) + + def test_error_messages_are_ascii(self, ckan_config): + """D4: error paths must stay ASCII too -- a non-ASCII error in + Copilot can render as '?'/boxes and obscure actionable text.""" + import asyncio + + plugin = CKANPlugin(ckan_config) + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + init = Mock() + init.json.return_value = {"success": True} + init.raise_for_status = Mock() + mock_client.post = AsyncMock(return_value=init) + mock_client_class.return_value = mock_client + + async def run(): + await plugin.initialize() + return await plugin.execute_tool( + "query_data", + { + "resource_id": "11111111-2222-3333-4444-555555555555", + "where": {"col": {"regex": "."}}, + }, + ) + + result = asyncio.run(run()) + assert result.success is False + msg = result.error_message or "" + assert msg.isascii(), "Error message contains non-ASCII: " + repr( + [c for c in msg if not c.isascii()][:10] + ) + + +class TestToolDescriptionBudget: + """Copilot B1: every tool description fits in ~150 tokens / 600 chars + so GPT-4o actually attends to it.""" + + @pytest.fixture + def ckan_config(self): + return { + "base_url": "https://data.example.com", + "portal_url": "https://data.example.com", + "city_name": "TestCity", + } + + def test_all_descriptions_under_budget(self, ckan_config): + plugin = CKANPlugin(ckan_config) + for tool in plugin.get_tools(): + assert len(tool.description) <= 600, ( + f"{tool.name!r} description is " + f"{len(tool.description)} chars; trim to <=600." + ) + + +class TestLoweredDefaultLimits: + """Copilot C8: smaller default limits (20 / 10) than CKAN's 100.""" + + @pytest.fixture + def ckan_config(self): + return { + "base_url": "https://data.example.com", + "portal_url": "https://data.example.com", + "city_name": "TestCity", + } + + def test_tool_defaults_are_copilot_friendly(self, ckan_config): + plugin = CKANPlugin(ckan_config) + tools = {t.name: t for t in plugin.get_tools()} + # search_datasets default limit + assert ( + tools["search_datasets"].input_schema["properties"]["limit"]["default"] + <= 20 + ) + # query_data default limit + assert tools["query_data"].input_schema["properties"]["limit"]["default"] <= 25 + # search_and_query default limit + assert ( + tools["search_and_query"].input_schema["properties"]["limit"]["default"] + <= 25 + ) + + +# --------------------------------------------------------------------------- +# Civic-AI tool-design caveats +# +# These tests follow the civicaitools.org pattern: every devil's-advocate +# check needs a "fires when expected" test AND a "silent when not +# applicable" test. False alarms erode trust faster than false silences. +# --------------------------------------------------------------------------- + + +class TestProvenanceHeaderAndRetrievedFooter: + """Civic-AI #1, #2, #3: Source line + echoed Query + Retrieved + timestamp must appear on every successful tool response.""" + + @pytest.fixture + def ckan_config(self): + return { + "base_url": "https://data.example.com", + "portal_url": "https://data.example.com", + "city_name": "TestCity", + } + + @pytest.mark.asyncio + async def test_search_datasets_response_has_source_query_and_retrieved( + self, ckan_config + ): + plugin = CKANPlugin(ckan_config) + + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + init = Mock() + init.json.return_value = {"success": True} + init.raise_for_status = Mock() + search = Mock() + search.json.return_value = {"result": {"results": [], "count": 0}} + search.raise_for_status = Mock() + mock_client.post = AsyncMock(side_effect=[init, search]) + mock_client_class.return_value = mock_client + + await plugin.initialize() + result = await plugin.execute_tool( + "search_datasets", {"query": "311", "limit": 5} + ) + + assert result.success is True + text = result.content[0]["text"] + # Section header survives Copilot markdown stripping (C4) + assert "## Source" in text + # Human-readable Source line names the portal + query + assert "Source: https://data.example.com search for '311'" in text + # API line points at the exact CKAN action endpoint + assert ( + "API: POST https://data.example.com/api/3/action/package_search" in text + ) + # Echoed query repeats the params we sent + assert "Query [package_search]: q='311', rows=5" in text + # Retrieved footer is ISO-8601 with Z suffix + assert "_Retrieved: " in text + assert text.rstrip().endswith("Z_") + + @pytest.mark.asyncio + async def test_query_data_provenance_shows_sql_action_when_where_used( + self, ckan_config + ): + """When `where` routes through datastore_search_sql, the Source + line must reflect that — not the equality-only endpoint.""" + plugin = CKANPlugin(ckan_config) + + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + init = Mock() + init.json.return_value = {"success": True} + init.raise_for_status = Mock() + schema = Mock() + schema.json.return_value = { + "result": {"fields": [{"id": "x", "type": "int"}]} + } + schema.raise_for_status = Mock() + sql = Mock() + sql.json.return_value = { + "result": { + "records": [{"_id": 1, "x": 42}], + "fields": [{"id": "x", "type": "int"}], + } + } + sql.raise_for_status = Mock() + mock_client.post = AsyncMock(side_effect=[init, schema, sql]) + mock_client_class.return_value = mock_client + + await plugin.initialize() + result = await plugin.execute_tool( + "query_data", + { + "resource_id": "11111111-2222-3333-4444-555555555555", + "where": {"x": {"gt": 1}}, + "limit": 5, + }, + ) + + assert result.success is True + text = result.content[0]["text"] + # API line names the action that actually fired + api_line = next( + line for line in text.splitlines() if line.startswith("API:") + ) + assert "datastore_search_sql" in api_line + # Equality-only action must NOT show up in the API line + assert "/api/3/action/datastore_search;" not in api_line + assert not api_line.endswith("/api/3/action/datastore_search") + + @pytest.mark.asyncio + async def test_long_sql_in_echoed_query_is_truncated(self, ckan_config): + """The Query line must not blow up when SQL is long.""" + plugin = CKANPlugin(ckan_config) + + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + init = Mock() + init.json.return_value = {"success": True} + init.raise_for_status = Mock() + sql_resp = Mock() + sql_resp.json.return_value = {"result": {"records": [], "fields": []}} + sql_resp.raise_for_status = Mock() + mock_client.post = AsyncMock(side_effect=[init, sql_resp]) + mock_client_class.return_value = mock_client + + long_sql = ( + 'SELECT * FROM "11111111-2222-3333-4444-555555555555" ' + "WHERE " + " AND ".join(f'"c{i}" = {i}' for i in range(80)) + " LIMIT 1" + ) + await plugin.initialize() + result = await plugin.execute_tool("execute_sql", {"sql": long_sql}) + + assert result.success is True + text = result.content[0]["text"] + # Should still have a Query line, and the value should have + # been truncated with an ellipsis (not the full 3000+ char SQL) + query_line = next( + line for line in text.splitlines() if line.startswith("Query [") + ) + assert "..." in query_line + assert len(query_line) < 400 + + +class TestSampleSizeCaveats: + """Civic-AI #5, #6: SINGLE-RECORD and SMALL SAMPLE banners must fire + on tiny result sets and stay silent on large ones.""" + + @pytest.fixture + def ckan_config(self): + return { + "base_url": "https://data.example.com", + "portal_url": "https://data.example.com", + "city_name": "TestCity", + } + + @pytest.mark.asyncio + async def test_single_record_caveat_fires_when_total_is_one(self, ckan_config): + plugin = CKANPlugin(ckan_config) + + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + init = Mock() + init.json.return_value = {"success": True} + init.raise_for_status = Mock() + query = Mock() + query.json.return_value = { + "result": { + "records": [{"_id": 1, "name": "only one"}], + "fields": [{"id": "name", "type": "text"}], + "total": 1, + } + } + query.raise_for_status = Mock() + mock_client.post = AsyncMock(side_effect=[init, query]) + mock_client_class.return_value = mock_client + + await plugin.initialize() + result = await plugin.execute_tool( + "query_data", + {"resource_id": "11111111-2222-3333-4444-555555555555"}, + ) + + assert result.success is True + text = result.content[0]["text"] + assert "SINGLE-RECORD CLAIM" in text + assert "N=1" in text + assert "SMALL SAMPLE" not in text # mutually exclusive + + @pytest.mark.asyncio + async def test_small_sample_caveat_fires_for_total_between_2_and_10( + self, ckan_config + ): + plugin = CKANPlugin(ckan_config) + + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + init = Mock() + init.json.return_value = {"success": True} + init.raise_for_status = Mock() + query = Mock() + query.json.return_value = { + "result": { + "records": [{"_id": i} for i in range(5)], + "fields": [], + "total": 5, + } + } + query.raise_for_status = Mock() + mock_client.post = AsyncMock(side_effect=[init, query]) + mock_client_class.return_value = mock_client + + await plugin.initialize() + result = await plugin.execute_tool( + "query_data", + {"resource_id": "11111111-2222-3333-4444-555555555555"}, + ) + + assert result.success is True + text = result.content[0]["text"] + assert "SMALL SAMPLE" in text + assert "Only 5 row" in text + assert "SINGLE-RECORD" not in text + + @pytest.mark.asyncio + async def test_no_sample_caveat_when_total_is_large(self, ckan_config): + """Stays silent for totals above the small-sample threshold.""" + plugin = CKANPlugin(ckan_config) + + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + init = Mock() + init.json.return_value = {"success": True} + init.raise_for_status = Mock() + query = Mock() + query.json.return_value = { + "result": { + "records": [{"_id": i} for i in range(50)], + "fields": [], + "total": 50, + } + } + query.raise_for_status = Mock() + mock_client.post = AsyncMock(side_effect=[init, query]) + mock_client_class.return_value = mock_client + + await plugin.initialize() + result = await plugin.execute_tool( + "query_data", + {"resource_id": "11111111-2222-3333-4444-555555555555"}, + ) + + assert result.success is True + text = result.content[0]["text"] + assert "SMALL SAMPLE" not in text + assert "SINGLE-RECORD" not in text + + +class TestDataFreshnessCaveat: + """Civic-AI #10: stale dataset warning.""" + + @pytest.fixture + def ckan_config(self): + return { + "base_url": "https://data.example.com", + "portal_url": "https://data.example.com", + "city_name": "TestCity", + } + + @pytest.mark.asyncio + async def test_freshness_caveat_fires_on_old_dataset(self, ckan_config): + plugin = CKANPlugin(ckan_config) + + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + init = Mock() + init.json.return_value = {"success": True} + init.raise_for_status = Mock() + pkg = Mock() + pkg.json.return_value = { + "result": { + "id": "old-dataset", + "name": "old-dataset", + "title": "Old Dataset", + "metadata_modified": "2018-01-15T12:00:00", + "organization": {"title": "Test"}, + "resources": [], + } + } + pkg.raise_for_status = Mock() + mock_client.post = AsyncMock(side_effect=[init, pkg]) + mock_client_class.return_value = mock_client + + await plugin.initialize() + result = await plugin.execute_tool( + "get_dataset", {"dataset_id": "old-dataset"} + ) + + assert result.success is True + text = result.content[0]["text"] + assert "DATA FRESHNESS" in text + assert "2018-01-15" in text + + @pytest.mark.asyncio + async def test_freshness_caveat_silent_on_recent_dataset(self, ckan_config): + """Datasets edited within the last year must not trigger the + freshness banner.""" + from datetime import datetime, timezone + + plugin = CKANPlugin(ckan_config) + + recent = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + init = Mock() + init.json.return_value = {"success": True} + init.raise_for_status = Mock() + pkg = Mock() + pkg.json.return_value = { + "result": { + "id": "fresh-dataset", + "name": "fresh-dataset", + "title": "Fresh Dataset", + "metadata_modified": recent, + "organization": {"title": "Test"}, + "resources": [], + } + } + pkg.raise_for_status = Mock() + mock_client.post = AsyncMock(side_effect=[init, pkg]) + mock_client_class.return_value = mock_client + + await plugin.initialize() + result = await plugin.execute_tool( + "get_dataset", {"dataset_id": "fresh-dataset"} + ) + + assert result.success is True + text = result.content[0]["text"] + assert "DATA FRESHNESS" not in text + + @pytest.mark.asyncio + async def test_freshness_caveat_silent_on_missing_metadata_modified( + self, ckan_config + ): + """If the field isn't present, degrade silently — false silences + beat false alarms (civic-AI #15).""" + plugin = CKANPlugin(ckan_config) + + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + init = Mock() + init.json.return_value = {"success": True} + init.raise_for_status = Mock() + pkg = Mock() + pkg.json.return_value = { + "result": { + "id": "x", + "name": "x", + "title": "X", + "organization": {"title": "Test"}, + "resources": [], + } + } + pkg.raise_for_status = Mock() + mock_client.post = AsyncMock(side_effect=[init, pkg]) + mock_client_class.return_value = mock_client + + await plugin.initialize() + result = await plugin.execute_tool("get_dataset", {"dataset_id": "x"}) + + assert result.success is True + text = result.content[0]["text"] + assert "DATA FRESHNESS" not in text + + +class TestFieldNameValidation: + """Civic-AI #9: difflib-based 'did you mean?' on typos.""" + + @pytest.fixture + def ckan_config(self): + return { + "base_url": "https://data.example.com", + "portal_url": "https://data.example.com", + "city_name": "TestCity", + } + + @pytest.mark.asyncio + async def test_typo_in_filters_returns_did_you_mean_hint(self, ckan_config): + """A misspelled column in `filters` is caught pre-flight with a + suggestion instead of leaking through to an upstream 409.""" + plugin = CKANPlugin(ckan_config) + + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + init = Mock() + init.json.return_value = {"success": True} + init.raise_for_status = Mock() + schema = Mock() + schema.json.return_value = { + "result": { + "fields": [ + {"id": "case_status", "type": "text"}, + {"id": "close_date", "type": "timestamp"}, + ] + } + } + schema.raise_for_status = Mock() + mock_client.post = AsyncMock(side_effect=[init, schema]) + mock_client_class.return_value = mock_client + + await plugin.initialize() + result = await plugin.execute_tool( + "query_data", + { + "resource_id": "11111111-2222-3333-4444-555555555555", + "filters": {"case_staus": "Closed"}, # typo: missing 't' + }, + ) + + assert result.success is False + assert "Unknown field" in (result.error_message or "") + assert "case_staus" in result.error_message + assert "did you mean 'case_status'" in result.error_message + + @pytest.mark.asyncio + async def test_valid_field_names_pass_through_silently(self, ckan_config): + """When every field name is valid, the validator is silent and the + query proceeds.""" + plugin = CKANPlugin(ckan_config) + + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + init = Mock() + init.json.return_value = {"success": True} + init.raise_for_status = Mock() + schema = Mock() + schema.json.return_value = { + "result": {"fields": [{"id": "case_status", "type": "text"}]} + } + schema.raise_for_status = Mock() + query = Mock() + query.json.return_value = { + "result": { + "records": [{"_id": 1, "case_status": "Closed"}], + "fields": [{"id": "case_status", "type": "text"}], + "total": 1, + } + } + query.raise_for_status = Mock() + mock_client.post = AsyncMock(side_effect=[init, schema, query]) + mock_client_class.return_value = mock_client + + await plugin.initialize() + result = await plugin.execute_tool( + "query_data", + { + "resource_id": "11111111-2222-3333-4444-555555555555", + "filters": {"case_status": "Closed"}, + }, + ) + + assert result.success is True + text = result.content[0]["text"] + assert "Unknown field" not in text + + @pytest.mark.asyncio + async def test_schema_fetch_failure_does_not_block_query(self, ckan_config): + """If we can't fetch the schema, the validator degrades silently + and the upstream call still happens (civic-AI #15).""" + plugin = CKANPlugin(ckan_config) + + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + init = Mock() + init.json.return_value = {"success": True} + init.raise_for_status = Mock() + # Schema fetch fails server-side + schema_fail = Mock() + schema_fail.json.return_value = { + "success": False, + "error": {"message": "schema unavailable"}, + } + schema_fail.raise_for_status = Mock() + query = Mock() + query.json.return_value = { + "result": { + "records": [{"_id": 1, "case_status": "Closed"}], + "fields": [{"id": "case_status", "type": "text"}], + "total": 1, + } + } + query.raise_for_status = Mock() + mock_client.post = AsyncMock(side_effect=[init, schema_fail, query]) + mock_client_class.return_value = mock_client + + await plugin.initialize() + result = await plugin.execute_tool( + "query_data", + { + "resource_id": "11111111-2222-3333-4444-555555555555", + "filters": {"case_status": "Closed"}, + }, + ) + + # Despite schema failing, the actual query still ran + assert result.success is True + + +class TestDateFieldNormalization: + """Civic-AI #7: timestamp/date columns render as ISO 8601.""" + + @pytest.fixture + def ckan_config(self): + return { + "base_url": "https://data.example.com", + "portal_url": "https://data.example.com", + "city_name": "TestCity", + } + + @pytest.mark.asyncio + async def test_midnight_timestamp_renders_as_date_only(self, ckan_config): + plugin = CKANPlugin(ckan_config) + + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + init = Mock() + init.json.return_value = {"success": True} + init.raise_for_status = Mock() + query = Mock() + query.json.return_value = { + "result": { + "records": [{"_id": 1, "close_date": "2024-06-15T00:00:00"}], + "fields": [ + {"id": "close_date", "type": "timestamp"}, + ], + "total": 1, + } + } + query.raise_for_status = Mock() + mock_client.post = AsyncMock(side_effect=[init, query]) + mock_client_class.return_value = mock_client + + await plugin.initialize() + result = await plugin.execute_tool( + "query_data", + {"resource_id": "11111111-2222-3333-4444-555555555555"}, + ) + + assert result.success is True + text = result.content[0]["text"] + assert "close_date: 2024-06-15" in text + # Midnight time-of-day must NOT be displayed + assert "T00:00:00" not in text + + @pytest.mark.asyncio + async def test_non_midnight_timestamp_keeps_time_of_day(self, ckan_config): + plugin = CKANPlugin(ckan_config) + + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + init = Mock() + init.json.return_value = {"success": True} + init.raise_for_status = Mock() + query = Mock() + query.json.return_value = { + "result": { + "records": [{"_id": 1, "ts": "2024-06-15T14:30:00"}], + "fields": [{"id": "ts", "type": "timestamp"}], + "total": 1, + } + } + query.raise_for_status = Mock() + mock_client.post = AsyncMock(side_effect=[init, query]) + mock_client_class.return_value = mock_client + + await plugin.initialize() + result = await plugin.execute_tool( + "query_data", + {"resource_id": "11111111-2222-3333-4444-555555555555"}, + ) + + assert result.success is True + text = result.content[0]["text"] + assert "ts: 2024-06-15T14:30:00" in text + + @pytest.mark.asyncio + async def test_non_date_columns_pass_through_unchanged(self, ckan_config): + """Text/int columns are never normalized.""" + plugin = CKANPlugin(ckan_config) + + with patch("httpx.AsyncClient") as mock_client_class: + mock_client = AsyncMock() + init = Mock() + init.json.return_value = {"success": True} + init.raise_for_status = Mock() + query = Mock() + query.json.return_value = { + "result": { + "records": [{"_id": 1, "name": "X", "count": 42}], + "fields": [ + {"id": "name", "type": "text"}, + {"id": "count", "type": "int"}, + ], + "total": 1, + } + } + query.raise_for_status = Mock() + mock_client.post = AsyncMock(side_effect=[init, query]) + mock_client_class.return_value = mock_client + + await plugin.initialize() + result = await plugin.execute_tool( + "query_data", + {"resource_id": "11111111-2222-3333-4444-555555555555"}, + ) + + assert result.success is True + text = result.content[0]["text"] + assert "name: X" in text + assert "count: 42" in text diff --git a/tests/test_mcp_server.py b/tests/test_mcp_server.py index f1b10d7..47b96cf 100644 --- a/tests/test_mcp_server.py +++ b/tests/test_mcp_server.py @@ -379,8 +379,8 @@ class TestUnknownMethods: """Test handling of unknown methods.""" @pytest.mark.asyncio - async def test_unknown_method_raises_error(self): - """Test that unknown method raises ValueError.""" + async def test_unknown_method_returns_method_not_found(self): + """Test that an unknown method returns JSON-RPC -32601 (Method not found).""" plugin_manager = MagicMock(spec=PluginManager) server = MCPServer(plugin_manager) @@ -395,7 +395,8 @@ async def test_unknown_method_raises_error(self): assert response is not None assert "error" in response - assert response["error"]["code"] == -32603 + assert response["error"]["code"] == -32601 + assert response["error"]["message"] == "Method not found" assert "Unknown method" in response["error"]["data"]