Date: 2026-04-17 Status: Implemented Author: Qing Ye + Claude
Data/analytics engineers face two problems with AI agents querying their data:
- Resource runaway — agents burn unbounded compute, loop endlessly on retries, exceed cost ceilings
- Semantic inconsistency — agents compute metrics differently across runs, query wrong tables, ignore established definitions
No single existing tool addresses both. Semantic layers (dbt metrics, Cube) handle consistency but not resource governance. Agent frameworks (LangChain, Claude Agent SDK) provide execution but not data-specific governance.
Inspiration: Robert Yi's LinkedIn post on "agentic contract layers" for analytics — arguing that agents need a central authority governing how data logic is consumed.
| Aspect | v1 spec | v2 spec |
|---|---|---|
| Form factor | Python library tightly coupled to agent-contracts |
Reusable library with optional ai-agent-contracts dependency |
| Primary target runtime | Generic (LiteLLM, LangChain) | Claude Agent SDK (but framework-agnostic) |
ai-agent-contracts |
Required dependency | Optional — upgrades enforcement when installed |
| Dependency management | pip | uv |
| Database interaction | Validation only | Full tool set: validate, execute, describe, preview |
| Tool surface | Validator callback | 9 agent tools (factory + middleware) |
| Decision | Choice | Rationale |
|---|---|---|
| Target user | Data/analytics engineer | Feels the pain most, already thinks in contracts (dbt, schema tests) |
| Primary runtime | Claude Agent SDK | Concrete target, growing ecosystem, but tools are plain functions usable anywhere |
ai-agent-contracts |
Optional dependency | Lowers barrier to entry; library works standalone with lightweight enforcement |
| Database support | Adapter protocol | Clean interface, any database can be plugged in |
| Semantic governance | Reference-based | Point to external source of truth (dbt, Cube), don't replicate it |
| Developer experience | YAML-first | Data engineers live in YAML; zero Python knowledge required to define a contract |
| Enforcement | Configurable per-rule | block / warn / log per rule |
| Tool delivery | Factory + middleware | Quick start via factory, composable via middleware |
| Dependency management | uv | Modern, fast, lockfile-based |
data_contract.yml (data engineer writes this)
│
▼
┌─────────────────┐
│ DataContract │ Parsed YAML (Pydantic model)
│ .semantic │
│ .resources │
│ .temporal │
│ .rules │
└────────┬────────┘
│
┌─────┴──────┐
│ │
▼ ▼
Standalone Bridge (optional)
Mode ┌─────────────────┐
│ │ ai-agent-contracts│
│ │ Contract 7-tuple │
│ └────────┬────────┘
│ │
▼ ▼
┌──────────────────────┐
│ create_tools() │ 9 agent tools
│ contract_middleware() │ BYO tool wrapper
│ ContractSession │ Enforcement tracking
└──────────────────────┘
│
▼
Claude Agent SDK agent
(or any Python agent framework)
# data_contract.yml
version: "1.0"
name: revenue-analysis
# Where the semantic definitions live (external source of truth)
semantic:
source:
type: dbt # dbt | cube | yaml | custom
path: "./dbt/manifest.json" # resolved relative to contract file
# What the agent is allowed to access
allowed_tables:
- schema: analytics
description: "Curated analytics tables — prefer for reporting"
preferred: true # agent should prefer this schema
tables: [orders, customers, subscriptions]
- schema: raw
tables: [] # empty = nothing from this schema
# What the agent must NOT do
forbidden_operations: [DELETE, DROP, TRUNCATE, UPDATE, INSERT]
# Business domains — provide context for domain-specific questions
domains:
- name: revenue
summary: "Revenue and financial metrics from completed orders"
description: >
Revenue metrics track recognized revenue from completed orders.
Revenue is recognized at fulfillment, not at booking.
metrics: [total_revenue, gross_margin]
- name: engagement
summary: "Customer activity and retention patterns"
description: >
Customer engagement measures active usage patterns
and retention over time.
metrics: [active_customers, churn_rate]
# Governance rules (per-rule enforcement)
# Each rule has a query_check (pre-execution) or result_check (post-execution)
# Rules with neither are advisory (shown in prompt only)
rules:
- name: tenant_isolation
description: "All queries must include a WHERE tenant_id = filter"
enforcement: block # block | warn | log
query_check:
required_filter: tenant_id
- name: use_approved_metrics
description: "Revenue calculations must use the semantic layer definition"
enforcement: warn # advisory — no check block
- name: no_select_star
description: "Queries must specify explicit columns, no SELECT *"
enforcement: block
query_check:
no_select_star: true
# Resource governance
resources:
cost_limit_usd: 5.00
max_query_time_seconds: 30
max_retries: 3
max_rows_scanned: 1_000_000
token_budget: 50_000
# Time governance
temporal:
max_duration_seconds: 300
# What counts as success
success_criteria:
- name: query_uses_semantic_definitions
weight: 0.4
- name: results_are_reproducible
weight: 0.3
- name: output_includes_methodology
weight: 0.3The core layer handles contract loading, Pydantic models, and lightweight self-contained enforcement. Dependencies: pydantic, pyyaml only.
from agentic_data_contracts import DataContract
dc = DataContract.from_yaml("data_contract.yml")
# Generate contract section for the system prompt
contract_prompt = dc.to_system_prompt()
# Returns a section listing allowed tables, forbidden operations, active rules, semantic guidance
# Users compose their own system prompt and append the contract section:
system_prompt = f"""You are an analytics assistant for Acme Corp.
Always be concise and include methodology notes.
{contract_prompt}
"""YAML-level business assertions — domain.description, metric_impact.evidence — rot silently when the business changes. Both models carry an optional last_reviewed: date field, and DataContract.find_stale() flags any artefact whose timestamp is missing or older than a threshold (default 90 days).
dc = DataContract.from_yaml("data_contract.yml")
source = dc.load_semantic_source()
findings = dc.find_stale(source, threshold_days=90)
for f in findings:
print(f.kind, f.name, f.age_days)Missing timestamps report as stale (age_days=None) — otherwise adoption is optional and defeats the forcing function. During rollout, filter by f.age_days is not None to grandfather in un-reviewed entries. The detector is a pure function suitable for direct use in a pytest assertion or CI check.
Per-table access control is built on a thin resolver abstraction that normalises caller_principal into the identity string used for allowlist comparisons.
from agentic_data_contracts import Principal, resolve_principal
# Type alias — matches the keyword-only parameter on Validator and create_tools
Principal = str | Callable[[], str | None] | None
# Normalises to the current string (calls the callable if needed)
current: str | None = resolve_principal(principal)How it works:
str— returned as-is; suitable for single-user sessions (Chainlit, one session per authenticated user).Callable[[], str | None]— called per-query, not cached; the callable typically reads acontextvars.ContextVarset by the message handler for each incoming request. This allows one long-livedValidatorinstance to serve a Webex room bot where different users send messages concurrently.None— resolver returnsNone; all*_principalsrestrictions are fail-closed (caller treated as unauthenticated and denied).
Two-tier empty-string handling: resolve_principal passes through an empty string without normalisation. The access-policy layer (principal_in_scope, called from both DataContract.allowed_table_names_for and the per-rule scope check inside Validator) treats an empty string as unauthenticated — same as None — so callers should canonicalize identities before passing them in. Splitting the resolver from policy is intentional: the resolver stays neutral, and principal_in_scope is the single source of truth for the allow/block-list semantics.
allowed_principals and blocked_principals on AllowedTable are mutually exclusive (validated at YAML load time). Principals are opaque strings compared by exact equality — no normalisation is performed inside the library. The rule of thumb: any *_principals field on a table requires identification — symmetric for allowlist and blocklist. An unidentified caller (resolver returns None or "") is always denied for any table that declares either field.
Per-rule principal scoping uses the exact same model. SemanticRule accepts the same allowed_principals / blocked_principals pair (also mutually exclusive). Inside Validator._build_checkers, each rule's principal scope is captured once at construction time as a (allowed, blocked) snapshot and stored on a frozen _QueryRuleEntry / _ResultRuleEntry. At validate-time the caller is resolved once and rules whose scope excludes them are skipped — same fail-closed contract as table scoping. This generalises to every rule kind (blocked_columns, required_filter, no_select_star, max_joins, result_check) without touching any individual checker class. Note that pending_result_check_names() deliberately returns the full declared list (a superset of what runs for any given caller); the only consumer is run_query telemetry, and resolving a callable principal there would create a TOCTOU surface.
When ai-agent-contracts is NOT installed, ContractSession provides self-contained enforcement:
- Retry count — incremented on each failed query attempt, checked against
max_retries - Token usage — tracked via callback, checked against
token_budget - Wall-clock duration — lazy start on first
check_limits()call (not at construction), checked againstmax_duration_seconds. Can be reset viareset_timer()for frameworks that manage their own idle timeouts. - Cost estimate — if EXPLAIN adapter returns cost info, checked against
cost_limit_usd
These are simple counters/timers with guard checks before each tool call. No formal state machine.
When ai-agent-contracts IS installed, enforcement is delegated to the formal framework via the bridge layer (see below).
Three-phase validation architecture. Dependencies: sqlglot.
class Checker(Protocol):
def check_ast(self, ast: Expression, *args) -> CheckResult: ...SQL is parsed once into a sqlglot AST. The Validator passes the AST to all applicable checkers, respecting table and per-rule principal scoping (rules carrying allowed_principals / blocked_principals are skipped when the resolved caller is out of scope).
Structural checkers (from top-level config):
| Checker | What it validates |
|---|---|
TableAllowlistChecker |
All referenced tables are in allowed_tables, filtered per caller_principal if supplied |
OperationBlocklistChecker |
No forbidden SQL operations (DELETE, DROP, etc.) |
Rule-based query checkers (from query_check blocks):
| Check | Checker | What it validates |
|---|---|---|
required_filter |
RequiredFilterChecker |
Required WHERE column present in a non-tautological predicate |
no_select_star |
NoSelectStarChecker |
No SELECT * statements |
blocked_columns |
BlockedColumnsChecker |
Forbidden columns not in SELECT |
require_limit |
RequireLimitChecker |
LIMIT clause present |
max_joins |
MaxJoinsChecker |
JOIN count within limit |
CheckResult contains: passed: bool, severity: block | warn | log, message: str.
The validator runs all applicable checkers and aggregates results — any block result stops execution, warn results are prepended to the run_query response as a WARNINGS: preamble, log results are prepended as a LOG: preamble (also exposed via inspect_query under warnings and log_messages). log-level rules are omitted from the system prompt so the agent can't adapt behavior to avoid triggering them.
Rules that cannot be statically checked (e.g., "use semantic layer definition for revenue") become advisory rules — they appear in the system prompt but don't enforce anything. They can also be used as SuccessCriterion for post-hoc evaluation.
When a SemanticSource is passed to the Validator, the RelationshipChecker validates JOINs against declared relationships after Phase 1 completes (and only if the query is not already blocked).
| Check | What it validates |
|---|---|
RelationshipChecker (join-key) |
JOIN columns match declared from/to references |
RelationshipChecker (required-filter) |
required_filter column present in WHERE with a non-tautological predicate |
RelationshipChecker (fan-out) |
No aggregation across one_to_many joins |
All relationship checks produce warnings only — they never block queries. Undeclared joins (table pairs with no relationship definition) are silently ignored.
The checker does not implement the Checker protocol. It exposes check_joins(ast) -> list[str] which returns multiple independent warnings rather than a single pass/fail CheckResult.
class ExplainAdapter(Protocol):
def explain(self, sql: str) -> ExplainResult: ...
# ExplainResult:
# estimated_cost_usd: float | None
# estimated_rows: int | None
# schema_valid: bool
# errors: list[str]| Database | Method | Returns |
|---|---|---|
| BigQuery | jobs.query(dry_run=True) |
Bytes processed → cost |
| Snowflake | EXPLAIN |
Estimated rows/partitions |
| Postgres | EXPLAIN (no ANALYZE) |
Row estimates |
| DuckDB | EXPLAIN |
Row estimates |
After a query executes successfully, run_query calls validator.validate_results() to check the actual output against result_check rules.
Built-in result checks:
| Check | What it validates |
|---|---|
min_value / max_value |
Numeric column values within bounds |
not_null |
Column contains no null values |
min_rows / max_rows |
Result set row count within bounds |
If a result check with enforcement: block fails, the query data is discarded — the agent sees only the violation message (with actual violating values for debugging). If enforcement: warn, the data is returned with warnings prepended.
SQL string
→ sqlglot.parse(sql, dialect=contract.dialect) — parse once
→ Phase 1: structural checkers + rule-based query_check checkers (table-scoped)
→ any block? → return ValidationResult(blocked=True, reasons=[...])
→ Relationship checks (if semantic_source provided, warnings only)
→ Phase 2 available? → explain adapter
→ cost/rows exceed limits? → return ValidationResult(blocked=True, reasons=[...])
→ record estimated cost in session
→ execute query
→ Phase 3: result_check rules against actual output (table-scoped)
→ any block? → discard data, return violation
→ any warn? → prepend WARNINGS preamble to response
→ any log? → prepend LOG preamble to response
→ return results
Two modes: tool factory for quick starts, middleware for BYO tools.
describe_table(schema, table)— Column details, merging the database adapter's catalog view with authored descriptions from the semantic source (semantic wins; adapter fills gaps)preview_table(schema, table, limit?)— Sample rowslist_metrics(domain?, tier?, indicator_kind?)— Browse metrics with filterslookup_metric(metric_name)— Full metric definition with SQL and impact edgeslookup_domain(name)— Full domain description with metrics and tableslookup_relationships(table, target_table?)— Direct joins and multi-hop pathstrace_metric_impacts(metric_name, direction, max_depth?)— BFS over the impact graphinspect_query(sql)— Static + EXPLAIN check, no executionrun_query(sql)— Validate and execute; response includes remaining session budget
list_metrics → lookup_metric → lookup_relationships → describe_table
→ write SQL → inspect_query
→ (if valid) run_query
from agentic_data_contracts import DataContract, create_tools
from agentic_data_contracts.adapters.duckdb import DuckDBAdapter
dc = DataContract.from_yaml("contract.yml")
adapter = DuckDBAdapter("analytics.duckdb")
tools = create_tools(dc, adapter=adapter)
# Returns all 9 tools as @tool-decorated async functions
# compatible with claude_agent_sdk.create_sdk_mcp_server()
# Per-caller access control (optional)
tools = create_tools(dc, adapter=adapter, caller_principal="alice@co.com")
# Or with a callable for multi-user bots (identity read per-query from a ContextVar):
tools = create_tools(dc, adapter=adapter, caller_principal=lambda: current_sender.get())create_tools accepts caller_principal: Principal = None and forwards it into the Validator. Two of the nine tools are principal-aware: describe_table and preview_table check allowed_table_names_for(principal) before serving a response and return a "Table X is restricted (caller: 'Y')." message for inaccessible tables. The remaining seven tools are unchanged — inspect_query and run_query inherit principal gating through the underlying Validator.
Tools are returned as Claude Agent SDK @tool-decorated async functions. Each tool accepts args: dict and returns {"content": [{"type": "text", "text": ...}]}. The caller bundles them into an MCP server:
from claude_agent_sdk import create_sdk_mcp_server, ClaudeAgentOptions
server = create_sdk_mcp_server(name="data-contracts", version="1.0.0", tools=tools)
user_prompt = "You are an analytics assistant for Acme Corp."
system_prompt = f"{user_prompt}\n\n{dc.to_system_prompt()}"
options = ClaudeAgentOptions(
model="claude-sonnet-4-6",
system_prompt=system_prompt,
mcp_servers={"dc": server},
allowed_tools=[f"mcp__dc__{t.name}" for t in tools],
)from agentic_data_contracts import contract_middleware
@contract_middleware(contract, adapter=adapter)
async def my_custom_query_tool(args: dict) -> dict:
"""Existing query tool with custom logic."""
result = await my_database.execute(args["sql"])
return {"content": [{"type": "text", "text": str(result)}]}
# Middleware: intercept sql → validate → block/warn → call wrapped → track session
# Returns a @tool-decorated async function compatible with create_sdk_mcp_server()| Tool | Without adapter |
|---|---|
describe_table, preview_table, list_metrics, lookup_metric, lookup_domain, lookup_relationships, trace_metric_impacts |
Fully functional (contract + semantic source) |
run_query |
Fully functional when database adapter is configured |
inspect_query |
Layer 1 always runs; EXPLAIN fields populated when adapter is configured |
Reads external semantic definitions so the agent knows how metrics are defined.
class SemanticSource(Protocol):
def get_metrics(self) -> list[MetricDefinition]: ...
def get_metric(self, name: str) -> MetricDefinition | None: ...
def get_table_schema(self, schema: str, table: str) -> TableSchema | None: ...
def search_metrics(self, query: str) -> list[MetricDefinition]: ...
def get_relationships(self) -> list[Relationship]: ...
def get_relationships_for_table(self, table: str) -> list[Relationship]: ...
def get_metric_impacts(self) -> list[MetricImpact]: ...Fuzzy metric search: When lookup_metric receives a query that doesn't exactly match a metric name, it falls back to search_metrics() which uses thefuzz (token_set_ratio scorer, cutoff 50) to find the best matches by name + description. A shared fuzzy_search_metrics() helper in base.py provides this logic for all source implementations.
Metric-impact graph (v0.10.0+): get_metric_impacts() returns directed edges between metrics annotated with direction, confidence, and evidence. The build_metric_impact_index() / walk_metric_impacts() helpers in base.py mirror the build_relationship_index / find_join_path pattern — dual-keyed index (each edge under both endpoints), cycle-safe BFS traversal, direction disambiguated at walk time. YamlSource parses a top-level metric_impacts: block; DbtSource and CubeSource return [] (neither system has a native causal-graph concept — impacts live in the contract YAML regardless of where the metric itself comes from).
dbt relationship parsing (v0.17.0+): DbtSource.get_relationships() projects dbt's built-in relationships schema tests into Relationship instances. Each test node (resource_type == "test", test_metadata.name == "relationships") carries kwargs.column_name and kwargs.field; the owner model is resolved via attached_node (manifest v12+) and the referenced model via depends_on.nodes. The test's meta: block supplies preferred, required_filter, and relationship_type (defaulting to many_to_one). Tests that can't be resolved (missing attached_node, unmodelled dependencies, non-relationships test names) are skipped silently rather than raising — manifests are heterogeneous and some tests live on seeds or sources we don't model.
Cube relationship parsing (v0.18.0+): CubeSource.get_relationships() parses each cube's joins: block. The parser builds a cube_name -> sql_table map, regexes the single-equality form {X}.col1 = {Y}.col2 from each join's sql: field, and normalises so the from side is always the column on the cube declaring the join (independent of which side {CUBE} appears on in the SQL). Cube's relationship enum (belongsTo / hasOne / hasMany plus many_to_one / one_to_one / one_to_many aliases) maps to canonical Relationship.type strings; meta.relationship_type overrides. meta.preferred and meta.required_filter work the same way as YamlSource and DbtSource. Composite-key joins (multiple AND-chained equalities) and joins whose target cube can't be resolved by name are skipped — declare those in contract YAML via YamlSource instead.
Built-in sources:
| Source | Reads | Extracts |
|---|---|---|
DbtSource |
manifest.json |
Metrics (+ meta.tier / meta.indicator_kind / meta.domains), models, columns |
CubeSource |
Cube meta API or schema files | Metrics (+ meta.tier / meta.indicator_kind / meta.domains), dimensions |
YamlSource |
Inline YAML definitions | Metric / table / relationship / metric_impacts definitions for teams not using dbt/Cube |
MetricDefinition: name, description, sql_expression, source_model, filters, domains, tier, indicator_kind.
MetricImpact: from_metric, to_metric, direction, confidence, evidence, description.
Relationship: from_, to, type, description, required_filter, preferred. The preferred flag (default False) marks the canonical join when alternatives exist between the same table pair. build_relationship_index stable-sorts each adjacency list with preferred edges first, so find_join_path (BFS) and get_relationships_for_table both surface the canonical edge automatically. The flat list returned by get_relationships() deliberately keeps declaration order; that list feeds the prompt renderer, which renders preferred="true" as a per-edge attribute instead of via reordering.
TableSchema: columns: list[Column] with name, type, description.
class DatabaseAdapter(Protocol):
def execute(self, sql: str) -> QueryResult: ...
def explain(self, sql: str) -> ExplainResult: ...
def describe_table(self, schema: str, table: str) -> TableSchema: ...
@property
def dialect(self) -> str: ... # "bigquery", "snowflake", "postgres", "duckdb"
class SqlNormalizer(Protocol):
def normalize_sql(self, sql: str) -> str: ...Adapters for databases with proprietary SQL extensions (Denodo VQL, Teradata, ClickHouse) can implement SqlNormalizer alongside DatabaseAdapter. The Validator calls normalize_sql() before sqlglot.parse_one() to rewrite non-standard syntax into a form sqlglot can parse. The original SQL is preserved for execute() and explain().
Detection is automatic: create_tools() and contract_middleware() check isinstance(adapter, SqlNormalizer) and wire it into the Validator if present. Standard-dialect adapters are unaffected.
describe_table maps to native commands:
| Database | Command | What you get |
|---|---|---|
| BigQuery | INFORMATION_SCHEMA.COLUMNS or get_table() |
Column names, types, descriptions, partitioning |
| Snowflake | DESCRIBE TABLE |
Column names, types, nullable, default, comments |
| Postgres | information_schema.columns |
Column names, types, nullable, defaults, comments |
| DuckDB | DESCRIBE or information_schema.columns |
Column names, types |
Table schemas are cached for the lifetime of a ContractSession to avoid repeated round-trips.
Built-in adapters are optional extras:
[project.optional-dependencies]
bigquery = ["google-cloud-bigquery"]
snowflake = ["snowflake-connector-python"]
postgres = ["psycopg2-binary"]
duckdb = ["duckdb"]When ai-agent-contracts is installed, the bridge upgrades from lightweight enforcement to the formal system.
from agentic_data_contracts.bridge import compile_to_contract
contract_obj = compile_to_contract(data_contract)
# Returns: Contract(I, O, S, R, T, Phi, Psi)| DataContract field | Compiles to |
|---|---|
semantic.rules (block) |
TerminationCondition |
semantic.rules (warn) |
SuccessCriterion (low weight) |
semantic.rules (log) |
Contract.metadata |
resources.* |
ResourceConstraints |
temporal.* |
TemporalConstraints |
success_criteria |
list[SuccessCriterion] with weights |
semantic.source + allowed_tables |
Capabilities.instructions |
| Concern | Without ai-agent-contracts | With ai-agent-contracts |
|---|---|---|
| Retry/token/duration tracking | ContractSession counters |
ResourceConstraints formal enforcement |
| Block rule violation | ContractViolation exception |
TerminationCondition triggers agent stop |
| Warn rule violation | Warning in tool result | SuccessCriterion penalty |
| Success evaluation | Manual / log-based | Formal SuccessCriterion with weights, supports LLM judge |
| Integration with LangChain, LiteLLM | Not available | Full Contract works with all existing integrations |
try:
from agent_contracts import Contract
AGENT_CONTRACTS_AVAILABLE = True
except ImportError:
AGENT_CONTRACTS_AVAILABLE = FalseIf ai-agent-contracts is installed, ContractSession automatically uses formal enforcement. Tools behave the same from the agent's perspective.
agentic-data-contracts/
├── src/agentic_data_contracts/
│ ├── __init__.py # Public API: DataContract, create_tools, contract_middleware
│ ├── core/
│ │ ├── __init__.py
│ │ ├── schema.py # Pydantic models for YAML validation
│ │ ├── contract.py # DataContract class (load, to_system_prompt)
│ │ └── session.py # ContractSession (lightweight enforcement)
│ ├── validation/
│ │ ├── __init__.py
│ │ ├── validator.py # Orchestrates checkers, aggregates results
│ │ ├── checkers.py # Built-in checkers (7 query checkers + ResultCheckRunner)
│ │ └── explain.py # EXPLAIN adapter orchestration
│ ├── tools/
│ │ ├── __init__.py
│ │ ├── factory.py # create_tools() — returns 9 tools
│ │ └── middleware.py # contract_middleware decorator
│ ├── semantic/
│ │ ├── __init__.py
│ │ ├── base.py # SemanticSource protocol
│ │ ├── dbt.py # DbtSource
│ │ ├── cube.py # CubeSource
│ │ └── yaml_source.py # YamlSource
│ ├── adapters/
│ │ ├── __init__.py
│ │ ├── _normalizer.py # SqlNormalizer protocol (avoids circular import)
│ │ ├── base.py # DatabaseAdapter protocol + SqlNormalizer re-export
│ │ ├── bigquery.py # BigQuery adapter
│ │ ├── snowflake.py # Snowflake adapter
│ │ ├── postgres.py # Postgres adapter
│ │ └── duckdb.py # DuckDB adapter
│ └── bridge/
│ ├── __init__.py
│ └── compiler.py # DataContract → ai-agent-contracts Contract
├── tests/
│ ├── test_core/
│ ├── test_validation/
│ ├── test_tools/
│ ├── test_semantic/
│ ├── test_adapters/
│ ├── test_bridge/
│ └── fixtures/
│ ├── valid_contract.yml
│ ├── minimal_contract.yml
│ └── sample_dbt_manifest.json
├── examples/
│ └── revenue_agent/
│ ├── contract.yml
│ └── agent.py # Claude Agent SDK example
├── pyproject.toml
└── README.md
[project]
dependencies = [
"sqlglot>=23.0",
"pydantic>=2.0",
"pyyaml>=6.0",
]
[project.optional-dependencies]
agent-sdk = ["claude-agent-sdk"]
agent-contracts = ["ai-agent-contracts>=0.1.0"]
bigquery = ["google-cloud-bigquery"]
snowflake = ["snowflake-connector-python"]
postgres = ["psycopg2-binary"]
duckdb = ["duckdb"]
all = [
"agentic-data-contracts[agent-sdk,agent-contracts,bigquery,snowflake,postgres,duckdb]",
]Six test suites matching the layers:
| Suite | What it tests | Extra dependencies |
|---|---|---|
test_core/ |
YAML loading, Pydantic validation, ContractSession counters | None |
test_validation/ |
All 4 checkers, validator orchestration, multi-dialect SQL | None (sqlglot) |
test_tools/ |
Tool factory, middleware, graceful degradation | None |
test_semantic/ |
DbtSource parses manifest, YamlSource loads inline defs | None |
test_adapters/ |
Adapter protocol compliance, DuckDB integration tests | DuckDB |
test_bridge/ |
Compilation mapping, formal enforcement | ai-agent-contracts |
DuckDB for integration tests — zero setup, runs in CI without credentials.
# examples/revenue_agent/agent.py
import asyncio
from agentic_data_contracts import DataContract, create_tools
from agentic_data_contracts.adapters.duckdb import DuckDBAdapter
from claude_agent_sdk import (
query, ClaudeAgentOptions, create_sdk_mcp_server,
AssistantMessage, TextBlock,
)
dc = DataContract.from_yaml("contract.yml")
adapter = DuckDBAdapter("sample_data.duckdb")
# Create contract-aware tools and bundle into MCP server
sdk_tools = create_tools(dc, adapter=adapter)
server = create_sdk_mcp_server(
name="data-contracts", version="1.0.0", tools=sdk_tools
)
# User's own system prompt + contract rules appended
user_prompt = """You are a revenue analytics assistant for Acme Corp.
Always be concise and include methodology notes in your answers."""
options = ClaudeAgentOptions(
model="claude-sonnet-4-6",
system_prompt=f"{user_prompt}\n\n{dc.to_system_prompt()}",
mcp_servers={"dc": server},
allowed_tools=[f"mcp__dc__{t.name}" for t in sdk_tools],
)
async def main():
async for message in query(
prompt="What was total revenue by region in Q1 2025?",
options=options,
):
if isinstance(message, AssistantMessage):
for block in message.content:
if isinstance(block, TextBlock):
print(block.text)
asyncio.run(main())Runtime behavior:
Agent: "SELECT * FROM analytics.orders"
-> BLOCKED (no_select_star)
Agent: "SELECT order_id, amount FROM analytics.orders"
-> BLOCKED (tenant_isolation — missing WHERE tenant_id = ?)
Agent: "SELECT order_id, amount FROM analytics.orders WHERE tenant_id = 'acme'"
-> PASSED + WARN (consider using semantic revenue definition)
Agent: "SELECT order_id, amount FROM raw.payments WHERE tenant_id = 'acme'"
-> BLOCKED (raw.payments not in allowed_tables)
The example ships with a DuckDB setup script so users can run immediately:
uv run python examples/revenue_agent/agent.py "What was Q1 revenue by region?"- CLI tool:
agentic-data-contracts validate contract.yml - Claude Code MCP server wrapping the tool set
- dbt plugin: auto-generate contracts from
manifest.json - Compliance dashboard / audit reporting
- Contract versioning and migration
- Principal-aware system prompt rendering —
to_system_prompt()currently lists all declared tables regardless of caller. An agent serving Bob may be told about tables Bob can't query. Query-time gating remains authoritative (denied queries never reach the database), but UX could be improved by filtering the rendered prompt to only include tables accessible to the current principal. File an issue if your deployment needs this.