diff --git a/.github/workflows/test-mcp-examples.yaml b/.github/workflows/test-mcp-examples.yaml new file mode 100644 index 00000000..f0912e98 --- /dev/null +++ b/.github/workflows/test-mcp-examples.yaml @@ -0,0 +1,77 @@ +name: Test MCP Examples + +on: + push: + branches: [main] + paths: + - 'modules/ai-agents/examples/**/*.yaml' + - 'modules/ai-agents/examples/test-mcp-examples.sh' + pull_request: + branches: [main] + paths: + - 'modules/ai-agents/examples/**/*.yaml' + - 'modules/ai-agents/examples/test-mcp-examples.sh' + +jobs: + lint-and-test: + name: Lint and Test All Examples + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Install dependencies + run: npm install + + - name: Install yq + run: | + sudo wget -qO /usr/local/bin/yq https://github.com/mikefarah/yq/releases/latest/download/yq_linux_amd64 + sudo chmod +x /usr/local/bin/yq + + - name: Install tools + run: | + npx doc-tools install-test-dependencies + rpk connect install + + - name: Make test script executable + run: chmod +x modules/ai-agents/examples/test-mcp-examples.sh + + # Test all examples + - name: Run automated test script + run: | + cd modules/ai-agents/examples + ./test-mcp-examples.sh + + test-matrix: + name: Test Examples by Component Type + runs-on: ubuntu-latest + strategy: + matrix: + component: [processors, inputs, outputs, caches, o11y] + fail-fast: false + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Install dependencies + run: npm install + + - name: Install yq + run: | + sudo wget -qO /usr/local/bin/yq https://github.com/mikefarah/yq/releases/latest/download/yq_linux_amd64 + sudo chmod +x /usr/local/bin/yq + + - name: Install tools + run: | + npx doc-tools install-test-dependencies + rpk connect install + + - name: Make test script executable + run: chmod +x modules/ai-agents/examples/test-mcp-examples.sh + + - name: Test ${{ matrix.component }} examples + run: | + cd modules/ai-agents/examples + ./test-mcp-examples.sh ${{ matrix.component }} diff --git a/.github/workflows/update-docs.yml b/.github/workflows/update-docs.yml index 82a5f4d1..26138a22 100644 --- a/.github/workflows/update-docs.yml +++ b/.github/workflows/update-docs.yml @@ -18,7 +18,7 @@ jobs: cancel-in-progress: true env: - NODE_VERSION: '18' + NODE_VERSION: '22' DOCS_OVERRIDES: docs-data/overrides.json steps: @@ -51,13 +51,13 @@ jobs: - name: Install tools run: | - npx doc-tools install-test-dependencies + npx --no-install doc-tools install-test-dependencies rpk connect install - name: Generate RPCN Connector docs id: generate run: | - npx doc-tools generate rpcn-connector-docs \ + npx --no-install doc-tools generate rpcn-connector-docs \ --fetch-connectors \ --overrides $DOCS_OVERRIDES \ --draft-missing \ diff --git a/.github/workflows/update-extensions.yml b/.github/workflows/update-extensions.yml index e4f5c6c3..1544f113 100644 --- a/.github/workflows/update-extensions.yml +++ b/.github/workflows/update-extensions.yml @@ -32,7 +32,7 @@ jobs: - uses: actions/setup-node@v4 with: - node-version: '18' + node-version: '22' - run: npm install diff --git a/local-antora-playbook.yml b/local-antora-playbook.yml index add7a43e..d3c53483 100644 --- a/local-antora-playbook.yml +++ b/local-antora-playbook.yml @@ -32,6 +32,7 @@ asciidoc: - '@redpanda-data/docs-extensions-and-macros/macros/config-ref' - '@redpanda-data/docs-extensions-and-macros/macros/helm-ref' - '@redpanda-data/docs-extensions-and-macros/asciidoc-extensions/add-line-numbers-highlights' + - '@redpanda-data/docs-extensions-and-macros/macros/badge' antora: extensions: - require: '@redpanda-data/docs-extensions-and-macros/extensions/generate-rp-connect-categories' diff --git a/modules/ai-agents/examples/resources/inputs/event-workflow.yaml b/modules/ai-agents/examples/resources/inputs/event-workflow.yaml index bc550ab0..0f252b82 100644 --- a/modules/ai-agents/examples/resources/inputs/event-workflow.yaml +++ b/modules/ai-agents/examples/resources/inputs/event-workflow.yaml @@ -9,8 +9,3 @@ meta: mcp: enabled: true description: "Consume order events to trigger workflows" - properties: - - name: event_type - type: string - description: "Type of event to process (order_created, order_cancelled, etc.)" - required: false diff --git a/modules/ai-agents/examples/resources/inputs/generate-input.yaml b/modules/ai-agents/examples/resources/inputs/generate-input.yaml new file mode 100644 index 00000000..fc7a66e1 --- /dev/null +++ b/modules/ai-agents/examples/resources/inputs/generate-input.yaml @@ -0,0 +1,9 @@ +generate: + interval: 1s + count: 0 + mapping: | + root.id = uuid_v4() + root.timestamp = now() + root.user_id = random_int(min: 1000, max: 9999) + root.event_type = ["login", "purchase", "logout"].index(random_int(max: 2)) + root.amount = if this.event_type == "purchase" { random_int(min: 10, max: 500) } diff --git a/modules/ai-agents/examples/resources/outputs/redpanda-output-with-processors.yaml b/modules/ai-agents/examples/resources/outputs/redpanda-output-with-processors.yaml new file mode 100644 index 00000000..049e00b7 --- /dev/null +++ b/modules/ai-agents/examples/resources/outputs/redpanda-output-with-processors.yaml @@ -0,0 +1,13 @@ +redpanda: + seed_brokers: [ "${REDPANDA_BROKERS}" ] + topic: "llm-responses" + +processors: + - openai_chat_completion: + api_key: "${OPENAI_API_KEY}" + model: "gpt-4" + prompt: ${! json("question") } + - mapping: | + root.question = this.question + root.answer = this.content + root.timestamp = now().ts_format("2006-01-02T15:04:05Z07:00") diff --git a/modules/ai-agents/examples/resources/outputs/redpanda-publish.yaml b/modules/ai-agents/examples/resources/outputs/redpanda-publish.yaml index 95d3bdd8..412ec019 100644 --- a/modules/ai-agents/examples/resources/outputs/redpanda-publish.yaml +++ b/modules/ai-agents/examples/resources/outputs/redpanda-publish.yaml @@ -21,6 +21,6 @@ meta: description: "Customer ID for partitioning" required: true - name: order_data - type: object - description: "Order details (items, total, etc.)" + type: string + description: "Order details as JSON string (items, total, etc.)" required: true diff --git a/modules/ai-agents/examples/test-mcp-examples.sh b/modules/ai-agents/examples/test-mcp-examples.sh new file mode 100755 index 00000000..36db6c02 --- /dev/null +++ b/modules/ai-agents/examples/test-mcp-examples.sh @@ -0,0 +1,157 @@ +#!/usr/bin/env bash +# +# Automated testing script for Redpanda Connect MCP examples +# +# Usage: +# ./test-mcp-examples.sh + +set -euo pipefail + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +NC='\033[0m' + +# Counters +TOTAL=0 +SKIPPED=0 +MCP_FAILS=0 + +echo "πŸ§ͺ Redpanda Connect MCP Examples Test Suite" +echo "============================================" +echo "" + +# Run MCP server lint on the directory +echo "Running rpk connect mcp-server lint..." +LINT_OUTPUT=$(rpk connect mcp-server lint --skip-env-var-check --verbose 2>&1) || { + echo -e "${RED}❌ Linting failed${NC}" + echo "" + echo "$LINT_OUTPUT" + exit 1 +} +echo -e "${GREEN}βœ… Linting passed${NC}" +echo "" + +# Function to validate MCP metadata +validate_mcp_metadata() { + local file=$1 + + echo -n " Validating MCP metadata... " + + # Determine which YAML parser to use + local use_yq=true + if ! command -v yq &> /dev/null; then + use_yq=false + if ! command -v python3 &> /dev/null; then + echo -e "${RED}FAILED${NC} (neither yq nor python3 available)" + MCP_FAILS=$((MCP_FAILS + 1)) + return 1 + fi + fi + + # Check if .meta.mcp exists + local mcp_exists + if $use_yq; then + mcp_exists=$(yq eval '.meta.mcp' "$file" 2>/dev/null) + else + mcp_exists=$(python3 -c " +import yaml +try: + with open('$file') as f: + doc = yaml.safe_load(f) + meta = doc.get('meta', {}) if doc else {} + mcp = meta.get('mcp') + print('null' if mcp is None else 'exists') +except: + print('null') +" 2>/dev/null) + fi + + if [[ "$mcp_exists" == "null" || -z "$mcp_exists" ]]; then + echo -e "${YELLOW}SKIPPED${NC} (no MCP metadata)" + SKIPPED=$((SKIPPED + 1)) + return 0 + fi + + # Read .meta.mcp.enabled + local enabled + if $use_yq; then + enabled=$(yq eval '.meta.mcp.enabled' "$file" 2>/dev/null) + else + enabled=$(python3 -c " +import yaml +try: + with open('$file') as f: + doc = yaml.safe_load(f) + enabled = doc.get('meta', {}).get('mcp', {}).get('enabled') + print('null' if enabled is None else str(enabled).lower()) +except: + print('null') +" 2>/dev/null) + fi + + if [[ "$enabled" != "true" ]]; then + echo -e "${YELLOW}WARNING${NC} (mcp.enabled not set to true)" + return 0 + fi + + # Read .meta.mcp.description + local description + if $use_yq; then + description=$(yq eval '.meta.mcp.description' "$file" 2>/dev/null) + else + description=$(python3 -c " +import yaml +try: + with open('$file') as f: + doc = yaml.safe_load(f) + desc = doc.get('meta', {}).get('mcp', {}).get('description') + print('null' if desc is None or desc == '' else str(desc)) +except: + print('null') +" 2>/dev/null) + fi + + if [[ "$description" == "null" || -z "$description" ]]; then + echo -e "${RED}FAILED${NC} (missing description)" + MCP_FAILS=$((MCP_FAILS + 1)) + return 1 + fi + + echo -e "${GREEN}PASSED${NC}" + return 0 +} + +# Validate MCP metadata for each file +for file in resources/*/*.yaml; do + if [[ -f "$file" ]]; then + TOTAL=$((TOTAL + 1)) + echo "" + echo -e "${BLUE}πŸ“„ Validating: $file${NC}" + validate_mcp_metadata "$file" + fi +done + +# Summary +echo "" +echo "============================================" +echo "πŸ“Š Test Summary" +echo "============================================" +echo "Total configs tested: $TOTAL" +if [[ $MCP_FAILS -gt 0 ]]; then + echo -e "MCP validation failures: ${RED}$MCP_FAILS${NC}" +fi +if [[ $SKIPPED -gt 0 ]]; then + echo -e "Skipped: ${YELLOW}$SKIPPED${NC}" +fi +echo "" + +if [[ $MCP_FAILS -gt 0 ]]; then + echo -e "${RED}❌ Some tests failed${NC}" + exit 1 +else + echo -e "${GREEN}βœ… All tests passed!${NC}" + exit 0 +fi diff --git a/modules/ai-agents/examples/testing.adoc b/modules/ai-agents/examples/testing.adoc new file mode 100644 index 00000000..dfda7747 --- /dev/null +++ b/modules/ai-agents/examples/testing.adoc @@ -0,0 +1,356 @@ += Test MCP Examples +:description: Automated testing strategies for Redpanda Connect MCP server examples. + +This document describes the automated testing strategies for Redpanda Connect MCP server examples. + +All MCP examples are automatically tested to ensure: + +. YAML syntax and structure are correct +. MCP metadata is complete and valid +. Component schemas match Redpanda Connect specifications +. Environment variables are handled gracefully (both `${secrets.X}` and `${X}` syntax) + +== Testing approaches + +=== Configuration linting + +Validate MCP tool configurations using `rpk connect lint`: + +[,bash] +---- +# Lint a single MCP tool +rpk connect lint resources/processors/weather-api.yaml + +# Lint all processor examples +rpk connect lint resources/processors/*.yaml + +# Lint with environment variable checking skipped (recommended for MCP) +rpk connect lint --skip-env-var-check resources/processors/*.yaml +---- + +This checks for common issues such as: + +* YAML syntax errors +* Unknown component types +* Invalid field names +* Type mismatches +* Missing required fields + +=== MCP metadata validation + +The test script validates MCP-specific metadata for all tool examples: + +[,bash] +---- +# Run all tests (includes linting + MCP validation) +./test-mcp-examples.sh + +# Test specific component types +./test-mcp-examples.sh processors +./test-mcp-examples.sh inputs +./test-mcp-examples.sh outputs +---- + +MCP metadata validation checks: + +* Presence of `meta.mcp` section +* `enabled: true` is set +* `description` field exists and is non-empty +* `properties` are properly structured (if present) + +=== Unit testing limitations + +[IMPORTANT] +==== +MCP tool examples are standalone component definitions (e.g., `label:`, `processors:`, `meta:`), not full pipelines with `input:`, `pipeline:`, `output:` sections. This means they cannot use inline `tests:` sections like cookbook examples do. + +The `rpk connect test` command requires full pipeline structure with paths like `/pipeline/processors/0`, which don't exist in MCP tool definitions. +==== + +For testing MCP tools: + +* **Linting is the primary validation** - ensures syntax and schema correctness +* **MCP metadata validation** - verifies tool has proper description and properties +* **Manual testing** - use `rpk connect mcp-server` to start a server and test tools with an MCP client + +== MCP tool structure + +MCP tools are structured as standalone components: + +[,yaml] +---- +label: fetch-weather +processors: + - label: prepare_parameters + mutation: | + meta city_name = this.city_name + + - label: fetch_weather + http: + url: 'https://wttr.in/${! @city_name }?format=j1' + verb: GET + + - label: format_response + mutation: | + root = { + "city": @city_name, + "temperature": this.current_condition.0.temp_C.number() + } + +meta: + mcp: + enabled: true + description: "Fetch current weather information for a specified city" + properties: + - name: city_name + type: string + description: "Name of the city to get weather information for" + required: true +---- + +This differs from full pipeline configurations used in cookbooks: + +[,yaml] +---- +# Cookbook style (full pipeline) +input: + generate: { ... } + +pipeline: + processors: [ ... ] + +output: + stdout: {} + +tests: [ ... ] # Can use inline tests +---- + +== Test script usage + +The `test-mcp-examples.sh` script provides automated validation: + +[,bash] +---- +# Test all examples +./test-mcp-examples.sh + +# Test specific component types +./test-mcp-examples.sh processors +./test-mcp-examples.sh inputs +./test-mcp-examples.sh outputs +./test-mcp-examples.sh caches +./test-mcp-examples.sh o11y + +# Test specific files +./test-mcp-examples.sh resources/processors/weather-*.yaml +---- + +The script provides color-coded output: + +[,console] +---- +πŸ§ͺ Redpanda Connect MCP Examples Test Suite +============================================ + +πŸ“„ Testing: resources/processors/weather-api.yaml (processor) + Linting weather-api.yaml... PASSED + Validating MCP metadata... PASSED + +============================================ +πŸ“Š Test Summary +============================================ +Total configs tested: 7 +Passed: 7 +Failed: 0 + +βœ… All tests passed! +---- + +== Manual end-to-end testing + +For comprehensive validation, test MCP tools with an actual MCP server: + +[,bash] +---- +# Navigate to examples directory +cd modules/ai-agents/examples + +# Start MCP server with your tools +rpk connect mcp-server --address localhost:4195 + +# In another terminal, connect with an MCP client +# Example: Claude Code with mcp-remote +claude mcp add local -- npx mcp-remote http://localhost:4195/sse + +# Test your tool through the MCP client +# The tool should appear in the tools list and be invocable +---- + +This validates: + +* Tool loads correctly in MCP server +* MCP metadata is properly exposed +* Tool executes with provided parameters +* Responses are formatted correctly + +== GitHub Actions CI/CD + +Automated tests run on every push and pull request using GitHub Actions. + +The workflow includes two jobs: + +. **lint-and-test** - Tests all examples at once +. **test-matrix** - Tests each component type in parallel for faster feedback + +See `.github/workflows/test-mcp-examples.yaml` for the complete workflow. + +== Best practices + +=== Use descriptive tool names + +[,yaml] +---- +# Good +label: fetch-customer-orders + +# Bad +label: tool1 +---- + +=== Write clear MCP descriptions + +[,yaml] +---- +# Good +meta: + mcp: + description: "Fetch a customer's order history and calculate spending metrics over the last 30 days" + +# Bad +meta: + mcp: + description: "Get orders" +---- + +=== Document all properties + +[,yaml] +---- +# Good +properties: + - name: customer_id + type: string + description: "Unique identifier for the customer" + required: true + - name: days + type: number + description: "Number of days to look back (default: 30)" + required: false + +# Bad +properties: + - name: id + type: string + required: true +---- + +=== Use environment variables for secrets + +[,yaml] +---- +# For Cloud (Secrets Store) +ifdef::env-cloud[] +dsn: "${secrets.POSTGRES_DSN}" +endif::[] + +# For self-managed Connect (environment variables) +ifndef::env-cloud[] +dsn: "${POSTGRES_DSN}" +endif::[] +---- + +=== Tag your examples + +[,yaml] +---- +meta: + tags: [ example, weather, api ] # Helps organize and filter + mcp: + enabled: true +---- + +== Adding new examples + +When adding new MCP tool examples: + +. **Choose the appropriate directory:** ++ +[cols="1,2"] +|=== +|Directory |Purpose + +|`resources/processors/` +|Most MCP tools (data transformations, API calls) + +|`resources/inputs/` +|Streaming data sources + +|`resources/outputs/` +|Data sinks for batch operations + +|`resources/caches/` +|Caching components + +|`o11y/` +|Observability configurations (metrics, tracing) +|=== + +. **Include complete MCP metadata:** ++ +[,yaml] +---- +meta: + mcp: + enabled: true + description: "Clear, task-oriented description" + properties: + - name: param_name + type: string + description: "Parameter purpose and constraints" + required: true +---- + +. **Lint your example:** ++ +[,bash] +---- +rpk connect lint --skip-env-var-check resources/processors/my-tool.yaml +---- + +. **Run automated tests:** ++ +[,bash] +---- +cd modules/ai-agents/examples +./test-mcp-examples.sh processors +---- + +. **Test manually (recommended):** ++ +[,bash] +---- +rpk connect mcp-server --address localhost:4195 +# Connect with MCP client and verify tool works end-to-end +---- + +. **Commit your example:** ++ +[,bash] +---- +git add modules/ai-agents/examples/resources/processors/my-tool.yaml +git commit -m "Add my-tool MCP example" +---- + +== See also + +* https://docs.redpanda.com/redpanda-connect/configuration/unit_testing[Unit Testing Guide^] diff --git a/modules/ai-agents/pages/mcp-server/developer-guide.adoc b/modules/ai-agents/pages/mcp-server/developer-guide.adoc index 69d3b03d..744923e1 100644 --- a/modules/ai-agents/pages/mcp-server/developer-guide.adoc +++ b/modules/ai-agents/pages/mcp-server/developer-guide.adoc @@ -82,39 +82,8 @@ See the next sections for details on customizing these files for your use case. [[contract]] == Design the tool contract and MCP metadata -Each MCP tool must declare its interface using `meta.mcp` metadata. This metadata allows AI clients to discover and invoke the tool correctly. - -Define a clear, stable interface for each tool. Keep the description task-oriented and keep parameters to a minimum. - -[source,yaml] ----- -meta: - mcp: - enabled: true <1> - description: "Fetches a compact summary from an external API using two optional parameters." <2> - properties: <3> - - name: parameter1 - type: string - description: "Primary filter; defaults to provider standard when omitted." - required: false - - name: parameter2 - type: number - description: "Limit of results (1-100)." - required: false ----- - -<1> Set `meta.mcp.enabled: true` to expose the tool using MCP. -<2> Add a concise description that explains what the tool does. The description is passed as a tool option, making it available to clients and documentation. This should be understandable by an AI model. -<3> List the input parameters (properties) for the tool. -+ -Property guidance: -+ -* Use `string`, `number`, or `boolean` types. -* Validate ranges and enums using xref:guides:bloblang/about.adoc[Bloblang]. -* Mark only mandatory fields as required. -* Document defaults in the `description` and enforce them in the configuration. - -After defining your tool contract, implement the configuration to handle input validation, defaults, and the main processing steps. +// Tool contract guidance - single-sourced from partial +include::ai-agents:partial$mcp/tool-contract-guidance.adoc[] [[pipeline-patterns]] == Implement the configuration @@ -136,110 +105,14 @@ Here's a complete example that demonstrates best practices: include::ai-agents:example$resources/processors/weather-service.yaml[] ---- -=== YAML configuration rules - -Each YAML file should contain exactly one component type. The component type is inferred from the directory structure: - -[cols="1,1", options="header"] -|=== -| Directory | Component Type - -| `resources/inputs/` -| Input component - -| `resources/outputs/` -| Output component - -| `resources/processors/` -| Processor component - -| `resources/caches/` -| Cache component -|=== - -=== Property restrictions by component type - -Different component types have different property capabilities when exposed as MCP tools: - -[cols="1,2,2"] -|=== -| Component Type | Property Support | Details - -| `input` -| Only supports `count` property -| AI clients can specify how many messages to read, but you cannot define custom properties. - -| `cache` -| No custom properties -| Properties are hardcoded to `key` and `value` for cache operations. - -| `output` -| Custom properties supported -| AI sees properties as an array for batch operations: `[{prop1, prop2}, {prop1, prop2}]`. - -| `processor` -| Custom properties supported -| You can define any properties needed for data processing operations. -|=== - -.Correct example when inside resources/inputs/ -[source,yaml] ----- -label: event-reader -redpanda: - seed_brokers: [ "${REDPANDA_BROKERS}" ] - topics: [ "events" ] - consumer_group: "mcp-reader" - -meta: - mcp: - enabled: true - description: "Consume events from Redpanda" ----- - -.Correct example when inside resources/processors/ -[source,yaml] ----- -label: fetch-example-data -processors: - - label: safe_operation - try: - - http: - url: "https://api.example.com/data" - timeout: "10s" - - mutation: | - root = this.merge({"processed": true}) +// YAML configuration rules - single-sourced from partial +include::ai-agents:partial$mcp/yaml-config-rules-connect.adoc[] - - label: handle_errors - catch: - - mutation: | - root = { - "error": "Operation failed", - "details": error() - } ----- +// Property restrictions table - single-sourced from partial +include::ai-agents:partial$mcp/property-restrictions-table.adoc[] -.Incorrect (do not include the input wrapper) -[source,yaml] ----- -label: incorrect-example -input: - redpanda: - seed_brokers: [ "${REDPANDA_BROKERS}" ] - topics: [ "events" ] ----- - -.Incorrect (multiple component types in one file) -[source,yaml] ----- -label: incorrect-example -input: - redpanda: { ... } -processors: - - mutation: { ... } -output: - redpanda: { ... } ----- +// Configuration examples - single-sourced from partial +include::ai-agents:partial$mcp/config-examples.adoc[] .Incorrect (try/catch as single processor) [source,yaml] diff --git a/modules/ai-agents/pages/mcp-server/overview.adoc b/modules/ai-agents/pages/mcp-server/overview.adoc index e14e6771..51a1f9bd 100644 --- a/modules/ai-agents/pages/mcp-server/overview.adoc +++ b/modules/ai-agents/pages/mcp-server/overview.adoc @@ -8,44 +8,8 @@ Redpanda Connect's MCP (Model Context Protocol) server lets you expose your data * Accelerate automation and insight by letting AI trigger real business actions. * Control exposure so that only the tools you tag are visible to AI. -== Use cases - -[cols="1,3a"] -|=== -|Category |Example prompts - -|Operational monitoring -| -- "Check partition lag for customer-events topic." - -- "Show me the top 10 producers by message volume today." - -- "Get schema registry health status." - -|Data enrichment and analysis -| -- "Fetch user profile data and recent orders for customer ID 12345." - -- "Get real-time stock prices for symbols in my portfolio topic." - -- "Analyze sentiment of latest product reviews." - -|Team productivity -| -- "Deploy my microservice to the staging environment." - -- "Generate load test data for the payments service." - -- "Create a summary dashboard of this week's incident reports." - -|Business intelligence -| -- "What are the trending products in the last 24 hours?" - -- "Show revenue impact of the latest feature deployment." - -- "Get customer satisfaction scores from support tickets." -|=== +// Use cases table - single-sourced from partial +include::ai-agents:partial$mcp/use-cases-table.adoc[] == How it works @@ -54,50 +18,11 @@ Redpanda Connect's MCP (Model Context Protocol) server lets you expose your data . AI clients can list, inspect, and invoke tools to trigger real business actions or analytics with validated, structured input. . All tool metadata is optimized for LLMs, so AI can understand what each tool does and how to use it. -== Example: Customer analytics tool - -Here's how you might expose a customer analytics configuration as an MCP tool: - -[source,yaml] ----- -label: analyze_customer_orders -processors: - - label: fetch_customer_data - sql_select: - driver: "postgres" - dsn: "${secrets.POSTGRES_DSN}" - table: "orders" - where: "customer_id = ? AND created_at >= NOW() - INTERVAL '30 days'" - args_mapping: 'root = [this.customer_id]' - - - label: calculate_metrics - mutation: | - root = { - "customer_id": this.0.customer_id, - "total_orders": this.length(), - "total_spent": this.map_each(o -> o.total).sum(), - "avg_order_value": this.map_each(o -> o.total).mean(), - "last_order_date": this.map_each(o -> o.created_at).max() - } - -meta: - mcp: - enabled: true - description: "Analyze a customer's order history and spending patterns over the last 30 days" - properties: - - name: customer_id - type: string - description: "Customer ID to analyze" - required: true ----- - -This tool can be invoked by an AI assistant with prompts like "analyze the order history for customer ID 12345" without needing to know the underlying SQL or processing logic. - -== MCP specification support - -Redpanda Connect MCP server implements the open MCP protocol for tool exposure. Only the tool concept from the MCP server specification is supported. Features such as MCP resources and prompts are not yet available. +// Customer analytics example - single-sourced from partial +include::ai-agents:partial$mcp/customer-analytics-example.adoc[] -For full details, see the link:https://modelcontextprotocol.io/specification/2025-06-18/server[official MCP server specification^]. +// MCP specification support - single-sourced from partial +include::ai-agents:partial$mcp/specification-support.adoc[] == Get started diff --git a/modules/ai-agents/pages/mcp-server/pipeline-patterns.adoc b/modules/ai-agents/pages/mcp-server/pipeline-patterns.adoc index ca1e0f90..7e0d8e77 100644 --- a/modules/ai-agents/pages/mcp-server/pipeline-patterns.adoc +++ b/modules/ai-agents/pages/mcp-server/pipeline-patterns.adoc @@ -7,10 +7,46 @@ Each pattern is a reusable example for a common MCP tool scenario. Patterns are For a high-level overview of MCP servers, see xref:ai-agents:mcp-server/overview.adoc[]. +== Pattern selection guide + +Choose the right pattern for your use case: + +* **Generate test data or synthetic events**: <> +* **Call external REST APIs**: <> +* **Query databases**: <> +* **Read from Redpanda topics**: <> +* **Transform or validate data**: <> +* **Publish to Redpanda topics**: <> +* **Process streaming data**: <> +* **Build event-driven workflows**: <> +* **Integrate with AI/LLM services**: <> +* **Cache frequently accessed data**: <> + +== Data generators + +Use xref:components:inputs/about.adoc[`inputs`] to create tools that read data from internal or external systems or generate sample data for testing and development. + +**When to use:** Development and testing environments where you need synthetic data, load testing scenarios, or demonstrating data flows without real data sources. + +**Example use cases:** Mock user events, test order data, synthetic sensor readings, demo data for presentations. + +This example generates a realistic user event message: + +[source,yaml] +---- +include::ai-agents:example$resources/inputs/generate-input.yaml[] +---- + +See also: xref:components:inputs/generate.adoc[`generate` input component] + == External API calls Use xref:components:processors/about.adoc[`processors`] to fetch data from external APIs, databases, or services and return formatted results. This is one of the most common patterns for MCP tools. +**When to use:** Integrating with third-party services, fetching real-time data, calling internal microservices, or enriching event data with external information. + +**Example use cases:** Fetch user profile from CRM, get product pricing from inventory API, validate addresses with geocoding service, retrieve weather data. + [source,yaml] ---- include::ai-agents:example$resources/processors/weather-api.yaml[] @@ -22,6 +58,10 @@ See also: xref:components:processors/http.adoc[`http` processor], xref:component Query external databases and return structured results. This pattern is essential for tools that need to access business data. +**When to use:** Retrieving customer records, querying analytics data, looking up configuration values, or joining streaming data with dimensional data from data warehouses. + +**Example use cases:** Fetch customer details from PostgreSQL, query sales data from BigQuery, retrieve product catalog from MongoDB, look up reference data. + NOTE: This example requires setting the `DATABASE_URL` environment variable with your PostgreSQL connection string. For example, `export DATABASE_URL="postgres://user:password@localhost:5432/dbname"`. [source,yaml] @@ -37,273 +77,169 @@ Build tools that interact with Redpanda topics to publish data, consume events, NOTE: The examples in this section require setting the `REDPANDA_BROKERS` environment variable with your Redpanda broker addresses. For example, `export REDPANDA_BROKERS="localhost:19092"`. -=== Publishing to Redpanda topics +=== Publish to Redpanda topics + +Create tools that write data to Redpanda topics using the `redpanda` output. + +**When to use:** Publishing events to Redpanda for consumption by other services, creating event sourcing patterns, building audit trails, or triggering downstream workflows. -Create tools that write data to Redpanda topics using the `redpanda` output: +**Example use cases:** Publish order confirmations, emit audit events, trigger notifications, create event-driven workflows. [source,yaml] ---- include::ai-agents:example$resources/outputs/redpanda-publish.yaml[] ---- -=== Consuming from Redpanda topics +==== Outputs with processors -Build tools that read data from topics and return processed results: +Output tools can include processors to transform data before publishing. This pattern is useful when you need to process data and save the result to a destination in a single tool. + +**When to use:** Processing user input with an LLM and saving the response, transforming data before publishing to a topic, enriching events before writing to external systems. + +**Example use cases:** Send a prompt to an LLM, then save the answer to a topic in Redpanda. [source,yaml] ---- -include::ai-agents:example$resources/inputs/redpanda-consume.yaml[] +include::ai-agents:example$resources/outputs/redpanda-output-with-processors.yaml[] ---- -== Stream processing with Redpanda Connect +You can use an output component type with processors, but you cannot use a processor component type with outputs. The `processors` field is available in all output components. -Create tools that process streaming data and return aggregated results: +=== Consume from Redpanda topics -[source,yaml] ----- -include::ai-agents:example$resources/inputs/stream-analytics.yaml[] ----- +Build tools that read data from topics and return processed results. -== Event-driven workflows +**When to use:** Processing events from Redpanda topics, building event-driven AI agents, consuming audit logs, or subscribing to data change streams. -Build tools that trigger workflows based on Redpanda events: +**Example use cases:** Monitor order events, process user activity streams, consume IoT sensor data, react to system notifications. [source,yaml] ---- -include::ai-agents:example$resources/inputs/event-workflow.yaml[] +include::ai-agents:example$resources/inputs/redpanda-consume.yaml[] ---- -See also: xref:components:inputs/redpanda.adoc[`redpanda` input] - -== Production workflows and observability +== Data transformation -Build enterprise-grade tools with error handling, validation, multi-step workflows, and monitoring. +Transform, validate, and enrich data as it flows through your MCP tools. Use Bloblang mapping language for powerful data manipulation. -=== Parameter validation and type coercion +**When to use:** Converting data formats, validating schemas, filtering events, enriching messages with computed fields, or normalizing data structures. -Always validate and coerce input parameters to ensure your tools are robust: +**Example use cases:** Parse JSON payloads, validate required fields, add timestamps, convert units, mask sensitive data, aggregate nested objects. [source,yaml] ---- -processors: - - label: validate_params - mutation: | - # Validate required parameters - root = if !this.exists("user_id") { - throw("user_id parameter is required") - } else { this } - - # Type coercion with validation - meta user_id = this.user_id.string() - meta limit = this.limit.number().catch(10) - meta start_date = this.start_date.parse_timestamp("2006-01-02").catch(now() - duration("24h")) ----- - -=== Dynamic configuration +mapping: | + # Parse and validate incoming data + root.user_id = this.user_id.or(throw("user_id is required")) + root.timestamp = now().ts_format("2006-01-02T15:04:05Z07:00") -Build tools that adapt their behavior based on input parameters: + # Transform and enrich + root.email_domain = this.email.split("@").index(1) + root.is_premium = this.subscription_tier == "premium" -[source,yaml] ----- -processors: - - label: dynamic_config - mutation: | - # Choose data source based on environment - meta env = this.environment | "production" - meta table_name = match @env { - "dev" => "dev_orders", - "staging" => "staging_orders", - "production" => "prod_orders", - _ => "dev_orders" - } - - # Adjust query complexity based on urgency - meta columns = if this.detailed.bool().catch(false) { - ["order_id", "customer_id", "total", "items", "shipping_address"] - } else { - ["order_id", "customer_id", "total"] - } + # Filter sensitive data + root.profile = this.profile.without("ssn", "credit_card") ---- -=== Error handling and fallbacks +See also: xref:components:processors/mapping.adoc[`mapping` processor], xref:guides:bloblang/about.adoc[Bloblang guide] -Implement error handling to make your tools reliable: +== Stream processing with Redpanda Connect -[source,yaml] ----- -processors: - - label: primary_fetch - try: - - http: - url: "https://api.primary.com/data" - timeout: "10s" - catch: - - log: - message: "Primary API failed, trying fallback" - - label: fallback_fetch - http: - url: "https://api.fallback.com/data" - timeout: "15s" - - mutation: | - root.metadata.source = "fallback" - root.metadata.warning = "Primary source unavailable" ----- +Create tools that process streaming data and return aggregated results. -=== Conditional processing +**When to use:** Real-time analytics, windowed aggregations, computing metrics over time, or building streaming dashboards. -Build tools that branch based on input or data characteristics: +**Example use cases:** Calculate rolling averages, count events per time window, detect anomalies in streams, aggregate metrics. [source,yaml] ---- -processors: - - label: conditional_processing - switch: - - check: this.data_type == "json" - processors: - - json: - operator: "parse" - - mutation: 'root.parsed_data = this' - - check: this.data_type == "csv" - processors: - - csv: - parse: true - - mutation: 'root.parsed_data = this' - - processors: - - mutation: 'root.error = "Unsupported data type"' +include::ai-agents:example$resources/inputs/stream-analytics.yaml[] ---- -[[secrets]] -=== Secrets and credentials +== Event-driven workflows -Securely handle multiple credentials and API keys using environment variables. +Build tools that trigger workflows based on Redpanda events. -Here is an example of using an API key from environment variables. +**When to use:** Orchestrating multi-step processes, responding to business events, implementing saga patterns, or coordinating microservices. -. Set an environment variable with your API key: -+ -[source,bash] ----- -export EXTERNAL_API_KEY="your-api-key-here" ----- +**Example use cases:** Order fulfillment workflows, approval processes, notification cascades, data pipeline orchestration. -. Reference the environment variable in your configuration: -+ [source,yaml] ---- -processors: - - label: call_external_api - http: - url: "https://api.example.com/data" - verb: GET - headers: - Authorization: "Bearer ${EXTERNAL_API_KEY}" # <1> - Accept: "application/json" +include::ai-agents:example$resources/inputs/event-workflow.yaml[] ---- -+ -<1> The environment variable is injected at runtime. Never store the actual API key in your YAML. The actual secret value never appears in your configuration files or logs. -=== Monitoring, debugging, and observability +See also: xref:components:inputs/redpanda.adoc[`redpanda` input] -Use structured logging, request tracing, and performance metrics to gain insights into tool execution. +== AI/LLM integration -[source,yaml] ----- -include::ai-agents:example$resources/processors/observable-tool.yaml[] ----- +Integrate AI and LLM services into your MCP tools for intelligent data processing, embeddings generation, and natural language understanding. -Observability features: +**When to use:** Generating embeddings for semantic search, calling LLM APIs for text generation, building RAG (Retrieval Augmented Generation) pipelines, or analyzing sentiment. -* *Correlation IDs*: Use `uuid_v7()` to generate unique request identifiers for tracing -* *Execution timing*: Track how long your tools take to execute using nanosecond precision -* *Structured logging*: Include consistent fields like `request_id`, `duration_ms`, `tool_name` -* *Request/response metadata*: Log input parameters and response characteristics -* *Success tracking*: Monitor whether operations complete successfully +**Example use cases:** Generate embeddings for documents, classify customer feedback, summarize long text, extract entities, answer questions with context. -You can test this pattern by invoking the tool with valid and invalid parameters, and observe the structured logs for tracing execution flow. For example, with a user ID of 1, you might see logs like: +=== OpenAI integration -[source,json] ----- -{ - "metadata": { - "execution_time_ms": 0.158977, - "request_id": "019951ab-d07d-703f-aaae-7e1c9a5afa95", - "success": true, - "timestamp": "2025-09-16T08:37:18.589Z", - "tool": "observable_tool" - }, - "trace": { - "request_id": "019951ab-d07d-703f-aaae-7e1c9a5afa95", - "timestamp": "2025-09-16T08:37:18.589Z", - "tool": "observable_tool", - "version": "1.0.0" - }, - "user_id": "1" -} +[source,yaml] ---- +openai_chat_completion: + api_key: "${OPENAI_API_KEY}" + model: "gpt-4" + prompt: | + Analyze this customer feedback and provide: + 1. Sentiment (positive/negative/neutral) + 2. Key themes + 3. Actionable insights -See also: xref:components:processors/log.adoc[`log` processor], xref:components:processors/try.adoc[`try` processor], xref:guides:bloblang/functions.adoc[Bloblang functions] (for timing and ID generation) - -=== Multi-step data enrichment - -Build tools that combine data from multiple sources. + Feedback: ${! json("feedback_text") } + max_tokens: 500 +---- -This workflow fetches customer data from a SQL database, enriches it with recent order history, and computes summary metrics. +See also: xref:components:processors/openai_chat_completion.adoc[`openai_chat_completion`], xref:components:processors/openai_embeddings.adoc[`openai_embeddings`] -NOTE: This example requires setting the `POSTGRES_DSN` environment variable with your PostgreSQL connection string. For example, `export POSTGRES_DSN="postgres://user:password@localhost:5432/dbname"`. +=== Embeddings generation [source,yaml] ---- -include::ai-agents:example$resources/processors/customer-enrichment.yaml[] +openai_embeddings: + api_key: "${OPENAI_API_KEY}" + model: "text-embedding-3-small" + text: ${! json("content") } ---- -See also: xref:components:processors/sql_select.adoc[`sql_select` processor], xref:guides:bloblang/about.adoc[Bloblang functions] (for data manipulation and aggregations) +See also: xref:components:processors/cohere_embeddings.adoc[`cohere_embeddings`], xref:components:processors/gcp_vertex_ai_embeddings.adoc[`gcp_vertex_ai_embeddings`] + +== Caching systems + +Use caching to store frequently accessed data, reduce latency, and minimize external API calls. You can implement caching using either Redpanda topics or in-memory stores. -=== Workflow orchestration +**When to use:** Reducing repeated API calls, storing lookup tables, caching database query results, or maintaining session state across tool invocations. -Coordinate complex workflows with multiple steps and conditional logic. +**Example use cases:** Cache user profiles, store API rate limit counters, maintain configuration values, cache product catalogs. -This workflow simulates a complete order processing configuration with mock data for inventory and processing tiers. This allows you to test the full logic without needing real external systems. +**In-memory cache** for low-latency access to small datasets: [source,yaml] ---- -include::ai-agents:example$resources/processors/order-workflow.yaml[] +memory: + default_ttl: 300s + compaction_interval: 60s ---- -For the input `{"order_id": "ORD001", "product_id": "widget-001", "quantity": 5, "total": 250, "customer_tier": "vip"}`, the workflow produces: +See also: xref:components:caches/memory.adoc[`memory` cache], Redpanda-backed cache using xref:components:outputs/redpanda.adoc[`redpanda` output] -[source,json] ----- -{ - "assigned_rep": "vip-team@company.com", - "available_quantity": 100, - "customer_tier": "vip", - "estimated_fulfillment": "TBD - calculated based on processing tier", - "inventory_check": "passed", - "order_id": "ORD001", - "order_status": "processed", - "perks": [ - "expedited_shipping", - "white_glove_service" - ], - "priority_score": 90, - "processed_at": "2025-09-16T09:05:29.138Z", - "processing_tier": "vip", - "processing_time_estimate": "1-2 hours", - "processing_time_hours": 2, - "product_id": "widget-001", - "product_name": "Standard Widget", - "quantity": 5, - "total": 250 -} ----- +// Production workflows section - single-sourced from partials +// Part 1: Parameter validation, dynamic config, error handling, conditional processing +include::ai-agents:partial$mcp/production-workflows-before-secrets.adoc[] + +// Secrets section - deployment-specific for Connect (uses environment variables) +include::ai-agents:partial$mcp/secrets-connect.adoc[] -Notice how the workflow: +// Part 2: Monitoring, multi-step enrichment, workflow orchestration +include::ai-agents:partial$mcp/production-workflows-after-secrets.adoc[] -. Preserves original input: `order_id`, `product_id`, `quantity`, `total`, and `customer_tier` pass through unchanged. -. Adds inventory data: `available_quantity`, `product_name`, and `inventory_check` status from the mock lookup. -. Routes by customer tier: Since `customer_tier` is `vip`, it gets VIP processing with special `perks` and priority. -. Enriches with processing metadata: `assigned_rep`, `priority_score`, `processing_tier`, and time estimates. -. Finalizes with timestamps: `order_status`, `processed_at`, and calculated `processing_time_hours`. +NOTE: The multi-step enrichment example requires setting the `POSTGRES_DSN` environment variable with your PostgreSQL connection string. For example, `export POSTGRES_DSN="postgres://user:password@localhost:5432/dbname"`. == Suggested reading diff --git a/modules/ai-agents/pages/mcp-server/quickstart.adoc b/modules/ai-agents/pages/mcp-server/quickstart.adoc index 43639527..ee64ced9 100644 --- a/modules/ai-agents/pages/mcp-server/quickstart.adoc +++ b/modules/ai-agents/pages/mcp-server/quickstart.adoc @@ -1,17 +1,36 @@ = Redpanda Connect MCP Server Quickstart -:description: Expose your Redpanda Connect configurations to Claude Code as AI-consumable HTTP endpoints. +:description: Learn how to expose your Redpanda Connect configurations as AI-consumable tools using the Model Context Protocol (MCP). -This quickstart shows you how to build a Redpanda Connect configuration that searches Bluesky posts using a customizable query. You'll expose this configuration to Claude Code as a tool through an *MCP server*, then ask Claude to perform the search using natural language. +This quickstart builds a local MCP server that exposes Redpanda Connect configurations as tools that AI assistants can use. It creates a tool that searches Bluesky posts, then asks Claude Code to perform the search using natural language: > Find recent posts on Bluesky mentioning Redpanda. -The MCP server makes your local tools *AI-consumable*. It exposes them with metadata that Claude can understand and use to: +== What you'll learn -- Discover available tools -- Understand required inputs and output format -- Make intelligent, structured HTTP requests +By completing this quickstart, you'll understand: -This enables natural-language automation for internal workflows, using your own business logic. +* How MCP enables AI assistants to discover and use your tools. +* How to structure Redpanda Connect configurations as MCP tools. +* How to run a local MCP server and connect it to Claude Code. +* How AI agents orchestrate your tools through natural language. + +== How MCP works + +The Model Context Protocol (MCP) makes your tools AI-consumable by exposing them with structured metadata: + +* **MCP Server**: A local HTTP service that hosts your Redpanda Connect configurations as discoverable tools +* **Tools**: Individual Redpanda Connect configurations (processors, inputs, outputs, caches) exposed through the server +* **Metadata**: Structured descriptions of what each tool does, what inputs it needs, and what outputs it returns +* **Client**: An AI assistant (like Claude Code) that discovers your tools and calls them based on user requests + +When you ask Claude to "search Bluesky for Redpanda news," Claude: + +. Discovers your `search-bluesky-posts` tool through the MCP server +. Understands it needs a `query` parameter +. Calls your tool with the appropriate query +. Returns the results in natural language + +This enables natural-language automation for internal workflows using your own business logic. == Prerequisites @@ -153,7 +172,19 @@ Only tools with the specified `--tag` are exposed. This helps you: == Connect Claude Code to your MCP server -To connect Claude Code to your MCP server, you need to expose a live event stream that Claude can consume. This is done using the link:https://www.npmjs.com/package/mcp-remote[`mcp-remote` utility^], which bridges your local service to Claude's MCP interface. `mcp-remote` is a lightweight bridge that turns any streaming HTTP endpoint into a source of MCP-compatible messages. +Now that your MCP server is running and exposing tools, you can connect Claude Code so it can discover and use them. + +Claude Code communicates with MCP servers using the standard input/output (stdio) transport protocol. Because your Redpanda Connect MCP server exposes tools over HTTP with Server-Sent Events (SSE), you need a bridge to convert between these protocols. The link:https://www.npmjs.com/package/mcp-remote[`mcp-remote` utility^] provides this bridge. + +When you connect Claude Code: + +- `mcp-remote` subscribes to your server's SSE endpoint (`http://localhost:4195/sse`). +- It translates HTTP/SSE messages into stdio format that Claude understands. +- Claude automatically discovers your tools and their metadata. +- You can ask Claude in natural language to use your tools. +- Claude calls the appropriate tool based on your request. + +To set up this connection: . Open a new terminal window. @@ -190,9 +221,9 @@ Tools for local (1 tools) . Press *Esc* until you return to the main prompt. -== Write a prompt that uses the tool +== Use the tool with Claude -To use the `search-bluesky-posts` tool in Claude, write a prompt that includes a natural language request. +Now you can see MCP in action. When you ask Claude to search Bluesky, watch what happens behind the scenes. . Enter the following prompt to start a conversation with Claude Code: + @@ -228,14 +259,18 @@ claude Search Bluesky for the latest news about Redpanda Data β”‚ 2. Yes, and don't ask again for local:search-bluesky-posts commands in /Users/jakecahill/Documents/my-agent β”‚ β”‚ 3. No, and tell Claude what to do differently (esc) ---- - -Claude will: - -. Fill in the `query` property -. Send an HTTP request to your local MCP server -. Return the result in conversation - -If you change the YAML configuration of your tools, make sure to restart the MCP server to pick up the changes. ++ +This is what happens: ++ +-- +* Claude analyzes your natural language request. +* Claude identifies the `search-bluesky-posts` tool as the right one to use. +* Claude extracts "Redpanda Data" as the query parameter. +* Claude calls your local MCP server via `mcp-remote`. +* Your Redpanda Connect configuration executes the Bluesky API request. +* Results flow back through the MCP server to Claude. +* Claude synthesizes the results into a conversational response. +-- Here's an example of what the result might look like: @@ -261,12 +296,23 @@ Here's an example of what the result might look like: momentum in hiring and customer wins. ---- +[NOTE] +==== +If you change the YAML configuration of your tools, restart the MCP server to pick up the changes. +==== + == Stop or disconnect the MCP server To disconnect or stop the MCP server, press kbd:[Ctrl+C] in the terminal where the server is running. This will gracefully shut down the MCP server process and disconnect any connected clients. You can also close the terminal window or kill the process using standard OS commands (such as `kill ` on Linux/macOS). +== Summary + +You extended Claude's capabilities with a custom tool built from Redpanda Connect components. Claude can now search Bluesky through natural language commands without manual API calls or scripting. + +This same pattern works for any workflow you can build with Redpanda Connect: consuming from Kafka topics, transforming data, calling APIs, querying databases, or orchestrating complex data pipelines. Each configuration becomes an AI-consumable tool. + == Troubleshoot This section covers issues you might encounter when setting up and using the MCP server. @@ -351,10 +397,14 @@ claude mcp add local -- npx mcp-remote http://localhost:4196/sse == Next steps -Try adding more tools under the same `example` tag to expand Claude Code's capabilities. See xref:ai-agents:mcp-server/developer-guide.adoc[]. - -View the xref:components:about.adoc[full catalog of connectors] you can use to build more tools. +Now that you understand how MCP works with Redpanda Connect, you can create custom tools for your specific workflows: -NOTE: You can connect any MCP client to your MCP server. For a list of example clients, see the link:https://modelcontextprotocol.io/clients[MCP documentation^]. +* **Data pipeline orchestration**: Build tools that consume from Kafka topics, transform data, and write to destinations. +* **API integrations**: Create tools that call internal APIs, third-party services, or webhooks. +* **Database operations**: Add tools for querying, inserting, or updating data using natural language. +* **Monitoring and observability**: Expose tools that check system health, query metrics, or analyze logs. +* **Multi-step workflows**: Combine multiple tools with different tags to handle complex operations. +* **Advanced patterns**: See xref:ai-agents:mcp-server/developer-guide.adoc[] to learn advanced patterns for building MCP tools. +* **Component reference**: View the xref:components:about.adoc[full catalog of Redpanda Connect components] you can use to build tools. diff --git a/modules/ai-agents/partials/mcp/config-examples.adoc b/modules/ai-agents/partials/mcp/config-examples.adoc new file mode 100644 index 00000000..14817fed --- /dev/null +++ b/modules/ai-agents/partials/mcp/config-examples.adoc @@ -0,0 +1,76 @@ +// Single-sourced partial for MCP configuration examples +// Shows correct and incorrect ways to structure MCP tool configurations +// +// Include from Connect: include::ai-agents:partial$mcp/config-examples.adoc[] +// Include from Cloud: include::redpanda-connect:ai-agents:partial$mcp/config-examples.adoc[] + +=== Configuration examples + +.Correct example +[source,yaml] +---- +label: event-reader +redpanda: +ifdef::env-cloud[] + seed_brokers: [ "${secrets.REDPANDA_BROKERS}" ] +endif::[] +ifndef::env-cloud[] + seed_brokers: [ "${REDPANDA_BROKERS}" ] +endif::[] + topics: [ "events" ] + consumer_group: "mcp-reader" + +meta: + mcp: + enabled: true + description: "Consume events from Redpanda" +---- + +.Correct example +[source,yaml] +---- +label: fetch-example-data +processors: + - label: safe_operation + try: + - http: + url: "https://api.example.com/data" + timeout: "10s" + - mutation: | + root = this.merge({"processed": true}) + + - label: handle_errors + catch: + - mutation: | + root = { + "error": "Operation failed", + "details": error() + } +---- + +.Incorrect (do not include the input wrapper) +[source,yaml] +---- +label: incorrect-example +input: + redpanda: +ifdef::env-cloud[] + seed_brokers: [ "${secrets.REDPANDA_BROKERS}" ] +endif::[] +ifndef::env-cloud[] + seed_brokers: [ "${REDPANDA_BROKERS}" ] +endif::[] + topics: [ "events" ] +---- + +.Incorrect (multiple component types in one file) +[source,yaml] +---- +label: incorrect-example +input: + redpanda: { ... } +processors: + - mutation: { ... } +output: + redpanda: { ... } +---- diff --git a/modules/ai-agents/partials/mcp/customer-analytics-example.adoc b/modules/ai-agents/partials/mcp/customer-analytics-example.adoc new file mode 100644 index 00000000..12f1e8ef --- /dev/null +++ b/modules/ai-agents/partials/mcp/customer-analytics-example.adoc @@ -0,0 +1,50 @@ +// Shared customer analytics tool example +// Used by both self-managed Redpanda Connect and Remote MCP servers in Cloud +// +// Include syntax: +// - From rp-connect-docs: include::ai-agents:partial$mcp/customer-analytics-example.adoc[] +// - From cloud-docs: include::redpanda-connect:ai-agents:partial$mcp/customer-analytics-example.adoc[] + +== Example: Customer analytics tool + +Here's how you might expose a customer analytics pipeline as an MCP tool: + +[source,yaml] +---- +label: analyze_customer_orders +processors: + - label: fetch_customer_data + sql_select: + driver: "postgres" +ifdef::env-cloud[] + dsn: "${secrets.POSTGRES_DSN}" +endif::[] +ifndef::env-cloud[] + dsn: "${POSTGRES_DSN}" +endif::[] + table: "orders" + where: "customer_id = ? AND created_at >= NOW() - INTERVAL '30 days'" + args_mapping: 'root = [this.customer_id]' + + - label: calculate_metrics + mutation: | + root = { + "customer_id": this.0.customer_id, + "total_orders": this.length(), + "total_spent": this.map_each(o -> o.total).sum(), + "avg_order_value": this.map_each(o -> o.total).mean(), + "last_order_date": this.map_each(o -> o.created_at).max() + } + +meta: + mcp: + enabled: true + description: "Analyze a customer's order history and spending patterns over the last 30 days" + properties: + - name: customer_id + type: string + description: "Customer ID to analyze" + required: true +---- + +This tool can be invoked by an AI assistant with prompts like "analyze the order history for customer ID 12345" without needing to know the underlying SQL or processing logic. diff --git a/modules/ai-agents/partials/mcp/production-workflows-after-secrets.adoc b/modules/ai-agents/partials/mcp/production-workflows-after-secrets.adoc new file mode 100644 index 00000000..8318d05c --- /dev/null +++ b/modules/ai-agents/partials/mcp/production-workflows-after-secrets.adoc @@ -0,0 +1,108 @@ +// Shared content for production workflows - part 2 (after secrets section) +// Used by both self-managed Redpanda Connect and Remote MCP servers in Cloud +// Source: rp-connect-docs/modules/ai-agents/pages/mcp-server/pipeline-patterns.adoc +// +// Include syntax: +// - From rp-connect-docs: include::ai-agents:partial$mcp/production-workflows-after-secrets.adoc[] +// - From cloud-docs: include::redpanda-connect:ai-agents:partial$mcp/production-workflows-after-secrets.adoc[] + +=== Monitoring, debugging, and observability + +Use structured logging, request tracing, and performance metrics to gain insights into tool execution. + +[source,yaml] +---- +include::ai-agents:example$resources/processors/observable-tool.yaml[] +---- + +Observability features: + +* *Correlation IDs*: Use `uuid_v7()` to generate unique request identifiers for tracing +* *Execution timing*: Track how long your tools take to execute using nanosecond precision +* *Structured logging*: Include consistent fields like `request_id`, `duration_ms`, `tool_name` +* *Request/response metadata*: Log input parameters and response characteristics +* *Success tracking*: Monitor whether operations complete successfully + +You can test this pattern by invoking the tool with valid and invalid parameters, and observe the structured logs for tracing execution flow. For example, with a user ID of 1, you might see logs like: + +[source,json] +---- +{ + "metadata": { + "execution_time_ms": 0.158977, + "request_id": "019951ab-d07d-703f-aaae-7e1c9a5afa95", + "success": true, + "timestamp": "2025-09-16T08:37:18.589Z", + "tool": "observable_tool" + }, + "trace": { + "request_id": "019951ab-d07d-703f-aaae-7e1c9a5afa95", + "timestamp": "2025-09-16T08:37:18.589Z", + "tool": "observable_tool", + "version": "1.0.0" + }, + "user_id": "1" +} +---- + +See also: xref:components:processors/log.adoc[`log` processor], xref:components:processors/try.adoc[`try` processor], xref:guides:bloblang/functions.adoc[Bloblang functions] (for timing and ID generation) + +=== Multi-step data enrichment + +Build tools that combine data from multiple sources. + +This workflow fetches customer data from a SQL database, enriches it with recent order history, and computes summary metrics. + +[source,yaml] +---- +include::ai-agents:example$resources/processors/customer-enrichment.yaml[] +---- + +See also: xref:components:processors/sql_select.adoc[`sql_select` processor], xref:guides:bloblang/about.adoc[Bloblang functions] (for data manipulation and aggregations) + +=== Workflow orchestration + +Coordinate complex workflows with multiple steps and conditional logic. + +This workflow simulates a complete order processing pipeline with mock data for inventory and processing tiers. This allows you to test the full logic without needing real external systems. + +[source,yaml] +---- +include::ai-agents:example$resources/processors/order-workflow.yaml[] +---- + +For the input `{"order_id": "ORD001", "product_id": "widget-001", "quantity": 5, "total": 250, "customer_tier": "vip"}`, the workflow produces: + +[source,json] +---- +{ + "assigned_rep": "vip-team@company.com", + "available_quantity": 100, + "customer_tier": "vip", + "estimated_fulfillment": "TBD - calculated based on processing tier", + "inventory_check": "passed", + "order_id": "ORD001", + "order_status": "processed", + "perks": [ + "expedited_shipping", + "white_glove_service" + ], + "priority_score": 90, + "processed_at": "2025-09-16T09:05:29.138Z", + "processing_tier": "vip", + "processing_time_estimate": "1-2 hours", + "processing_time_hours": 2, + "product_id": "widget-001", + "product_name": "Standard Widget", + "quantity": 5, + "total": 250 +} +---- + +Notice how the workflow: + +. Preserves original input: `order_id`, `product_id`, `quantity`, `total`, and `customer_tier` pass through unchanged. +. Adds inventory data: `available_quantity`, `product_name`, and `inventory_check` status from the mock lookup. +. Routes by customer tier: Since `customer_tier` is `vip`, it gets VIP processing with special `perks` and priority. +. Enriches with processing metadata: `assigned_rep`, `priority_score`, `processing_tier`, and time estimates. +. Finalizes with timestamps: `order_status`, `processed_at`, and calculated `processing_time_hours`. diff --git a/modules/ai-agents/partials/mcp/production-workflows-before-secrets.adoc b/modules/ai-agents/partials/mcp/production-workflows-before-secrets.adoc new file mode 100644 index 00000000..4f9dd66b --- /dev/null +++ b/modules/ai-agents/partials/mcp/production-workflows-before-secrets.adoc @@ -0,0 +1,104 @@ +// Shared content for production workflows - part 1 (before secrets section) +// Used by both self-managed Redpanda Connect and Remote MCP servers in Cloud +// Source: rp-connect-docs/modules/ai-agents/pages/mcp-server/pipeline-patterns.adoc +// +// Include syntax: +// - From rp-connect-docs: include::ai-agents:partial$mcp/production-workflows-before-secrets.adoc[] +// - From cloud-docs: include::redpanda-connect:ai-agents:partial$mcp/production-workflows-before-secrets.adoc[] + +== Production workflows and observability + +Build enterprise-grade tools with error handling, validation, multi-step workflows, and monitoring. + +=== Parameter validation and type coercion + +Always validate and coerce input parameters to ensure your tools are robust: + +[source,yaml] +---- +processors: + - label: validate_params + mutation: | + # Validate required parameters + root = if !this.exists("user_id") { + throw("user_id parameter is required") + } else { this } + + # Type coercion with validation + meta user_id = this.user_id.string() + meta limit = this.limit.number().catch(10) + meta start_date = this.start_date.parse_timestamp("2006-01-02").catch(now() - duration("24h")) +---- + +=== Dynamic configuration + +Build tools that adapt their behavior based on input parameters: + +[source,yaml] +---- +processors: + - label: dynamic_config + mutation: | + # Choose data source based on environment + meta env = this.environment | "production" + meta table_name = match @env { + "dev" => "dev_orders", + "staging" => "staging_orders", + "production" => "prod_orders", + _ => "dev_orders" + } + + # Adjust query complexity based on urgency + meta columns = if this.detailed.bool().catch(false) { + ["order_id", "customer_id", "total", "items", "shipping_address"] + } else { + ["order_id", "customer_id", "total"] + } +---- + +=== Error handling and fallbacks + +Implement error handling to make your tools reliable: + +[source,yaml] +---- +processors: + - label: primary_fetch + try: + - http: + url: "https://api.primary.com/data" + timeout: "10s" + catch: + - log: + message: "Primary API failed, trying fallback" + - label: fallback_fetch + http: + url: "https://api.fallback.com/data" + timeout: "15s" + - mutation: | + root.metadata.source = "fallback" + root.metadata.warning = "Primary source unavailable" +---- + +=== Conditional processing + +Build tools that branch based on input or data characteristics: + +[source,yaml] +---- +processors: + - label: conditional_processing + switch: + - check: this.data_type == "json" + processors: + - json: + operator: "parse" + - mutation: 'root.parsed_data = this' + - check: this.data_type == "csv" + processors: + - csv: + parse: true + - mutation: 'root.parsed_data = this' + - processors: + - mutation: 'root.error = "Unsupported data type"' +---- diff --git a/modules/ai-agents/partials/mcp/property-restrictions-table.adoc b/modules/ai-agents/partials/mcp/property-restrictions-table.adoc new file mode 100644 index 00000000..d976750a --- /dev/null +++ b/modules/ai-agents/partials/mcp/property-restrictions-table.adoc @@ -0,0 +1,30 @@ +// Single-sourced partial for MCP property restrictions by component type +// Used in both Connect and Cloud developer guides +// +// Include from Connect: include::ai-agents:partial$mcp/property-restrictions-table.adoc[] +// Include from Cloud: include::redpanda-connect:ai-agents:partial$mcp/property-restrictions-table.adoc[] + +=== Property restrictions by component type + +Different component types have different property capabilities when exposed as MCP tools: + +[cols="1,2,2"] +|=== +| Component Type | Property Support | Details + +| `input` +| Only supports `count` property +| AI clients can specify how many messages to read, but you cannot define custom properties. + +| `cache` +| No custom properties +| Properties are hardcoded to `key` and `value` for cache operations. + +| `output` +| Custom properties supported +| AI sees properties as an array for batch operations: `[{prop1, prop2}, {prop1, prop2}]`. + +| `processor` +| Custom properties supported +| You can define any properties needed for data processing operations. +|=== diff --git a/modules/ai-agents/partials/mcp/secrets-cloud.adoc b/modules/ai-agents/partials/mcp/secrets-cloud.adoc new file mode 100644 index 00000000..ba19e36c --- /dev/null +++ b/modules/ai-agents/partials/mcp/secrets-cloud.adoc @@ -0,0 +1,27 @@ +// This partial is for Remote MCP servers in Redpanda Cloud +// It describes using the Secrets Store for secrets + +[[secrets]] +=== Secrets and credentials + +Securely handle multiple credentials and API keys. + +Here is an example of using an API key secret. + +. Create a secret in the xref:develop:connect/configuration/secret-management.adoc[Secrets Store] with name `EXTERNAL_API_KEY` and your API key as the value. + +. Reference the secret in your YAML configuration: ++ +[source,yaml] +---- +processors: + - label: call_external_api + http: + url: "https://api.example.com/data" + verb: GET + headers: + Authorization: "Bearer ${secrets.EXTERNAL_API_KEY}" # <1> + Accept: "application/json" +---- ++ +<1> The secret is injected at runtime. Never store the actual API key in your YAML configuration. The actual secret value never appears in your configuration files or logs. diff --git a/modules/ai-agents/partials/mcp/secrets-connect.adoc b/modules/ai-agents/partials/mcp/secrets-connect.adoc new file mode 100644 index 00000000..0cd475e3 --- /dev/null +++ b/modules/ai-agents/partials/mcp/secrets-connect.adoc @@ -0,0 +1,32 @@ +// This partial is for self-managed Redpanda Connect MCP servers +// It describes using environment variables for secrets + +[[secrets]] +=== Secrets and credentials + +Securely handle multiple credentials and API keys using environment variables. + +Here is an example of using an API key from environment variables. + +. Set an environment variable with your API key: ++ +[source,bash] +---- +export EXTERNAL_API_KEY="your-api-key-here" +---- + +. Reference the environment variable in your configuration: ++ +[source,yaml] +---- +processors: + - label: call_external_api + http: + url: "https://api.example.com/data" + verb: GET + headers: + Authorization: "Bearer ${EXTERNAL_API_KEY}" # <1> + Accept: "application/json" +---- ++ +<1> The environment variable is injected at runtime. Never store the actual API key in your YAML. The actual secret value never appears in your configuration files or logs. diff --git a/modules/ai-agents/partials/mcp/specification-support.adoc b/modules/ai-agents/partials/mcp/specification-support.adoc new file mode 100644 index 00000000..7fc57f5a --- /dev/null +++ b/modules/ai-agents/partials/mcp/specification-support.adoc @@ -0,0 +1,12 @@ +// Shared MCP specification support section +// Used by both self-managed Redpanda Connect and Remote MCP servers in Cloud +// +// Include syntax: +// - From rp-connect-docs: include::ai-agents:partial$mcp/specification-support.adoc[] +// - From cloud-docs: include::redpanda-connect:ai-agents:partial$mcp/specification-support.adoc[] + +== MCP specification support + +MCP servers implement the open MCP protocol for tool exposure. Only the tool concept from the MCP server specification is supported. Features such as MCP resources and prompts are not yet available. + +For full details, see the link:https://modelcontextprotocol.io/specification/2025-06-18/server[official MCP server specification^]. diff --git a/modules/ai-agents/partials/mcp/tool-contract-guidance.adoc b/modules/ai-agents/partials/mcp/tool-contract-guidance.adoc new file mode 100644 index 00000000..6012ecd3 --- /dev/null +++ b/modules/ai-agents/partials/mcp/tool-contract-guidance.adoc @@ -0,0 +1,39 @@ +// Single-sourced partial for MCP tool contract design guidance +// Used in both Connect and Cloud developer guides +// +// Include from Connect: include::ai-agents:partial$mcp/tool-contract-guidance.adoc[] +// Include from Cloud: include::redpanda-connect:ai-agents:partial$mcp/tool-contract-guidance.adoc[] + +Each MCP tool must declare its interface using `meta.mcp` metadata. This metadata allows AI clients to discover and invoke the tool correctly. + +Define a clear, stable interface for each tool. Keep the description task-oriented and keep parameters to a minimum. + +[source,yaml] +---- +meta: + mcp: + enabled: true <1> + description: "Fetches a compact summary from an external API using two optional parameters." <2> + properties: <3> + - name: parameter1 + type: string + description: "Primary filter; defaults to provider standard when omitted." + required: false + - name: parameter2 + type: number + description: "Limit of results (1-100)." + required: false +---- + +<1> Set `meta.mcp.enabled: true` to expose the tool using MCP. +<2> Add a concise description that explains what the tool does. The description is passed as a tool option, making it available to clients and documentation. This should be understandable by an AI model. +<3> List the input parameters (properties) for the tool. + +Property guidance: + +* Use `string`, `number`, or `boolean` types. +* Validate ranges and enums using xref:guides:bloblang/about.adoc[Bloblang]. +* Mark only mandatory fields as required. +* Document defaults in the `description` and enforce them in the configuration. + +After defining your tool contract, implement the configuration to handle input validation, defaults, and the main processing steps. diff --git a/modules/ai-agents/partials/mcp/use-cases-table.adoc b/modules/ai-agents/partials/mcp/use-cases-table.adoc new file mode 100644 index 00000000..8da97dc5 --- /dev/null +++ b/modules/ai-agents/partials/mcp/use-cases-table.adoc @@ -0,0 +1,33 @@ +// Shared use cases table for MCP servers +// Used by both self-managed Redpanda Connect and Remote MCP servers in Cloud +// +// Include syntax: +// - From rp-connect-docs: include::ai-agents:partial$mcp/use-cases-table.adoc[] +// - From cloud-docs: include::redpanda-connect:ai-agents:partial$mcp/use-cases-table.adoc[] + +== Use cases + +[cols="1s,3a"] +|=== +|Category |Example prompts + +|Operational monitoring +|* Check partition lag for customer-events topic +* Show me the top 10 producers by message volume today +* Get schema registry health status + +|Data enrichment and analysis +|* Fetch user profile data and recent orders for customer ID 12345 +* Get real-time stock prices for symbols in my portfolio topic +* Analyze sentiment of latest product reviews + +|Team productivity +|* Deploy my microservice to the staging environment +* Generate load test data for the payments service +* Create a summary dashboard of this week's incident reports + +|Business intelligence +|* What are the trending products in the last 24 hours? +* Show revenue impact of the latest feature deployment +* Get customer satisfaction scores from support tickets +|=== diff --git a/modules/ai-agents/partials/mcp/yaml-config-rules-cloud.adoc b/modules/ai-agents/partials/mcp/yaml-config-rules-cloud.adoc new file mode 100644 index 00000000..d92cb090 --- /dev/null +++ b/modules/ai-agents/partials/mcp/yaml-config-rules-cloud.adoc @@ -0,0 +1,13 @@ +// Single-sourced partial for Cloud-specific YAML configuration rules +// This version describes component types without directory structure (managed in Cloud UI) +// +// Include from Cloud: include::redpanda-connect:ai-agents:partial$mcp/yaml-config-rules-cloud.adoc[] + +=== YAML configuration rules + +Each YAML configuration (tool) should contain exactly one component type. The component type is inferred from the type you choose in the dropdown when creating or editing the MCP server. Valid component types are: + +* xref:develop:connect/components/inputs/about.adoc[`input`] (for data sources) +* xref:develop:connect/components/outputs/about.adoc[`output`] (for data sinks) +* xref:develop:connect/components/processors/about.adoc[`processor`] (for data transformations and data access) +* xref:develop:connect/components/caches/about.adoc[`cache`] (for caching intermediate results) diff --git a/modules/ai-agents/partials/mcp/yaml-config-rules-connect.adoc b/modules/ai-agents/partials/mcp/yaml-config-rules-connect.adoc new file mode 100644 index 00000000..b9506e93 --- /dev/null +++ b/modules/ai-agents/partials/mcp/yaml-config-rules-connect.adoc @@ -0,0 +1,25 @@ +// Single-sourced partial for Connect-specific YAML configuration rules +// This version includes directory structure information for Connect local development +// +// Include from Connect: include::ai-agents:partial$mcp/yaml-config-rules-connect.adoc[] + +=== YAML configuration rules + +Each YAML file should contain exactly one component type. The component type is inferred from the directory structure: + +[cols="1,1", options="header"] +|=== +| Directory | Component Type + +| `resources/inputs/` +| Input component + +| `resources/outputs/` +| Output component + +| `resources/processors/` +| Processor component + +| `resources/caches/` +| Cache component +|=== diff --git a/modules/components/partials/fields/outputs/legacy_redpanda_migrator.adoc b/modules/components/partials/fields/outputs/legacy_redpanda_migrator.adoc index f9cb2de3..eea6fccb 100644 --- a/modules/components/partials/fields/outputs/legacy_redpanda_migrator.adoc +++ b/modules/components/partials/fields/outputs/legacy_redpanda_migrator.adoc @@ -54,7 +54,7 @@ The rough amount of time to allow connections to idle before they are closed. === `idempotent_write` -Enable the idempotent write producer option. When enabled, the producer initializes a producer ID and uses it to guarantee exactly-once semantics per partition (no duplicates on retries). This requires the `IDEMPOTENT_WRITE` permission on the `CLUSTER` resource. If your cluster does not grant this permission or uses ACLs restrictively, disable this option. Note: Idempotent writes are strictly a win for data integrity but may be unavailable in restricted environments (e.g., some managed Kafka services, Redpanda with strict ACLs). Disabling this option is safe and only affects retry behaviorβ€”duplicates may occur on producer retries, but the pipeline will continue to function normally. +Enable the idempotent write producer option. When enabled, the producer initializes a producer ID and uses it to guarantee exactly-once semantics per partition (no duplicates on retries). This requires the `IDEMPOTENT_WRITE` permission on the `CLUSTER` resource. If your cluster does not grant this permission or uses ACLs restrictively, disable this option. Note: Idempotent writes are strictly a win for data integrity but may be unavailable in restricted environments (e.g., some managed Kafka services, Redpanda with strict ACLs). Disabling this option is safe and only affects retry behavior. Duplicates may occur on producer retries, but the pipeline will continue to function normally. *Type*: `bool` diff --git a/modules/guides/pages/delivery_semantics.adoc b/modules/guides/pages/delivery_semantics.adoc index a2c328b7..98d11f6c 100644 --- a/modules/guides/pages/delivery_semantics.adoc +++ b/modules/guides/pages/delivery_semantics.adoc @@ -1,7 +1,7 @@ = Message Delivery Semantics :description: Learn about Redpanda Connect's transactional model and error handling -Redpanda Connect guarantees strong message delivery semantics across its data pipelines by leveraging a transactional model centered around message batch acknowledgment. This ensures reliable, scalable, and predictable data movement between input and output systems, with particular support for *at-least-once delivery* β€” without requiring intermediate message persistence. +Redpanda Connect guarantees strong message delivery semantics across its data pipelines by leveraging a transactional model centered around message batch acknowledgment. This ensures reliable, scalable, and predictable data movement between input and output systems, with particular support for *at-least-once delivery* without requiring intermediate message persistence. This topic describes how Redpanda Connect manages message delivery across the Input β†’ Pipeline β†’ Output architecture.