Skip to content

binding-mcp-kafka: implement mcp_kafka · proxy binding #1671

@jfallows

Description

@jfallows

Summary

Implement the mcp_kafka · proxy binding in a new binding-mcp-kafka module.
Accepts mcp streams and produces kafka Zilla streams (with KafkaBeginEx
extensions). Exposes Kafka broker operations as intrinsic MCP tools — tool
schemas, input schemas, and Kafka stream mappings are all defined internally by
the binding. The operator's only configuration is which tools to expose and
which kafka client to route them to.

mcp_kafka is not a shorthand for mcp_http + http_kafka. It exposes the
full set of Kafka protocol capabilities available through the downstream
kafka · client binding — including admin operations like create_topic,
describe_cluster, and consumer group management that have no natural HTTP
equivalent in Zilla. Application-level tool-to-topic mapping (e.g.
place_order → orders topic) is expressed via mcp_asyncapi or mcp_http +
http_kafka instead.

The AWS MSK MCP server operates at the AWS control plane — managing MSK clusters
as AWS resources. mcp_kafka operates at the Kafka data plane via the
kafka · client binding. They are complementary.

Stream type

  • Accepts: mcp
  • Produces: kafka (Zilla stream — KafkaBeginEx with appropriate capability/API)

Kafka stream mappings

Each tool maps to a Zilla kafka stream type from kafka.idl. All flyweight
instances must be pre-allocated as fields on the factory class — no
per-request allocation.

Data plane

produceKafkaProduceBeginEx + KafkaProduceDataEx

Input:

{
  "type": "object",
  "required": ["topic", "value"],
  "properties": {
    "topic":   { "type": "string" },
    "value":   { "type": "string" },
    "key":     { "type": "string" },
    "headers": { "type": "object", "additionalProperties": { "type": "string" } }
  }
}

consumeKafkaMergedBeginEx (FETCH_ONLY) + KafkaMergedFetchDataEx

Input:

{
  "type": "object",
  "required": ["topic"],
  "properties": {
    "topic":     { "type": "string" },
    "limit":     { "type": "integer", "default": 10 },
    "offset":    { "oneOf": [{ "type": "string", "enum": ["earliest", "latest"] }, { "type": "integer" }], "default": "latest" },
    "partition": { "type": "integer" }
  }
}

Returns: array of { key, value, offset, partition, timestamp, headers }.


Topic management

list_topicsKafkaMetaBeginEx + extended KafkaMetaDataEx (see Dependencies)

No input. Returns: array of { name, partition_count, replication_factor }.


describe_topicKafkaMetaBeginEx + extended KafkaMetaDataEx

Input: { "topic": string }

Returns: { name, partitions: [{ partition_id, leader, replicas, isr }] }.


create_topicKafkaRequestBeginEx/createTopics + KafkaResponseBeginEx/createTopics

Input:

{
  "type": "object",
  "required": ["topic"],
  "properties": {
    "topic":              { "type": "string" },
    "partitions":         { "type": "integer", "default": 1 },
    "replication_factor": { "type": "integer", "default": 1 },
    "configs":            { "type": "object", "additionalProperties": { "type": "string" } }
  }
}

Returns: { topic, created: true }.


delete_topicKafkaRequestBeginEx/deleteTopics + KafkaResponseBeginEx/deleteTopics

Input: { "topic": string }

Returns: { topic, deleted: true }.


Consumer groups and cluster

list_consumer_groups → new KafkaRequestBeginEx/listGroups + KafkaResponseBeginEx/listGroups (see Dependencies)

No input. Returns: array of { group_id, state }.


describe_consumer_group → new KafkaRequestBeginEx/describeGroups + KafkaResponseBeginEx/describeGroups

Input: { "group": string }

Returns:

{
  group_id, state,
  members: [{ member_id, client_id, assignments: [{ topic, partition, offset, lag }] }]
}

Lag derived from KafkaOffsetFetchBeginEx + KafkaFetchBeginEx latest offset.


reset_offsets → new KafkaRequestBeginEx/alterConsumerGroupOffsets + KafkaResponseBeginEx/alterConsumerGroupOffsets

Input:

{
  "type": "object",
  "required": ["group", "topic", "offset"],
  "properties": {
    "group":     { "type": "string" },
    "topic":     { "type": "string" },
    "offset":    { "oneOf": [{ "type": "string", "enum": ["earliest", "latest"] }, { "type": "integer" }] },
    "partition": { "type": "integer", "description": "Omit to reset all partitions." }
  }
}

Returns: { group, topic, reset: true }.

Note: requires the consumer group to be inactive. Return a clear error if the group has active members.


list_brokersKafkaRequestBeginEx/describeCluster + KafkaResponseBeginEx/describeCluster

No input. Returns: array of { broker_id, host, port, rack } from KafkaDescribeClusterResponseBeginEx.brokers.


describe_clusterKafkaRequestBeginEx/describeCluster + KafkaResponseBeginEx/describeCluster

No input. Returns: { cluster_id, controller_id, authorized_operations }.


cluster_overviewKafkaRequestBeginEx/describeCluster + extended KafkaMetaDataEx

No input. Returns:
{ broker_count, controller_id, under_replicated_partitions, offline_partitions, topic_count }.

broker_count and controller_id from KafkaDescribeClusterResponseBeginEx;
partition health from extended KafkaMetaDataEx.


describe_configsKafkaDescribeBeginEx (topic) or new KafkaRequestBeginEx/describeConfigs (broker) — see Dependencies

Input: { "resource_type": "topic"|"broker", "resource_name": string }

Returns: array of { name, value, is_default, is_sensitive }.


alter_configsKafkaRequestBeginEx/alterConfigs + KafkaResponseBeginEx/alterConfigs

Input:

{
  "type": "object",
  "required": ["resource_type", "resource_name", "configs"],
  "properties": {
    "resource_type": { "type": "string", "enum": ["topic", "broker"] },
    "resource_name": { "type": "string" },
    "configs":       { "type": "object", "additionalProperties": { "type": "string" } }
  }
}

Returns: { resource_type, resource_name, updated: true }.


Resources

Kafka topics can be exposed as readable MCP resources:

routes:
  - exit: kafka_client0
    when:
      - resource: "kafka://{topic}/messages"
    with:
      capability: fetch
      topic: "${params.topic}"

${params.topic} captures the {topic} segment from the resource URI template.

Configuration

mcp_kafka0:
  type: mcp_kafka
  kind: proxy
  routes:
    - exit: kafka_client0
      when:
        - tool: list_topics
        - tool: describe_topic
        - tool: create_topic
        - tool: delete_topic
        - tool: produce
        - tool: consume
        - tool: list_consumer_groups
        - tool: describe_consumer_group
        - tool: reset_offsets
        - tool: describe_cluster
        - tool: cluster_overview
        - tool: list_brokers
        - tool: describe_configs
        - tool: alter_configs
        - resource: "kafka://{topic}/messages"

Dependencies — required additions to binding-kafka

The following capabilities are absent from kafka.idl or have insufficient
response types. Each requires a separate issue against binding-kafka and must
land before the corresponding mcp_kafka tool can be implemented.

New Kafka API support

New entries required in the KafkaApi enum and KafkaRequestBeginEx /
KafkaResponseBeginEx unions:

Tool Kafka API API key New types needed
list_consumer_groups LIST_GROUPS 16 KafkaListGroupsRequestBeginEx, KafkaListGroupsResponseBeginEx
describe_consumer_group DESCRIBE_GROUPS 15 KafkaDescribeGroupsRequestBeginEx, KafkaDescribeGroupsResponseBeginEx
reset_offsets ALTER_CONSUMER_GROUP_OFFSETS 53 KafkaAlterConsumerGroupOffsetsRequestBeginEx, KafkaAlterConsumerGroupOffsetsResponseBeginEx

Extensions to existing types

Tool Current type Missing fields
list_topics, describe_topic, cluster_overview KafkaMetaDataEx replicationFactor, replicas[], isr[] per partition
describe_configs (broker) KafkaDescribeBeginEx (topic-only) KafkaResourceType + broker resource support, or new KafkaDescribeConfigsRequestBeginEx in KafkaRequestBeginEx

Blocked by

Additional context

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Type

No fields configured for Task.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions