Summary
Implement the mcp_kafka · proxy binding in a new binding-mcp-kafka module.
Accepts mcp streams and produces kafka Zilla streams (with KafkaBeginEx
extensions). Exposes Kafka broker operations as intrinsic MCP tools — tool
schemas, input schemas, and Kafka stream mappings are all defined internally by
the binding. The operator's only configuration is which tools to expose and
which kafka client to route them to.
mcp_kafka is not a shorthand for mcp_http + http_kafka. It exposes the
full set of Kafka protocol capabilities available through the downstream
kafka · client binding — including admin operations like create_topic,
describe_cluster, and consumer group management that have no natural HTTP
equivalent in Zilla. Application-level tool-to-topic mapping (e.g.
place_order → orders topic) is expressed via mcp_asyncapi or mcp_http +
http_kafka instead.
The AWS MSK MCP server operates at the AWS control plane — managing MSK clusters
as AWS resources. mcp_kafka operates at the Kafka data plane via the
kafka · client binding. They are complementary.
Stream type
- Accepts:
mcp
- Produces:
kafka (Zilla stream — KafkaBeginEx with appropriate capability/API)
Kafka stream mappings
Each tool maps to a Zilla kafka stream type from kafka.idl. All flyweight
instances must be pre-allocated as fields on the factory class — no
per-request allocation.
Data plane
produce → KafkaProduceBeginEx + KafkaProduceDataEx
Input:
{
"type": "object",
"required": ["topic", "value"],
"properties": {
"topic": { "type": "string" },
"value": { "type": "string" },
"key": { "type": "string" },
"headers": { "type": "object", "additionalProperties": { "type": "string" } }
}
}
consume → KafkaMergedBeginEx (FETCH_ONLY) + KafkaMergedFetchDataEx
Input:
{
"type": "object",
"required": ["topic"],
"properties": {
"topic": { "type": "string" },
"limit": { "type": "integer", "default": 10 },
"offset": { "oneOf": [{ "type": "string", "enum": ["earliest", "latest"] }, { "type": "integer" }], "default": "latest" },
"partition": { "type": "integer" }
}
}
Returns: array of { key, value, offset, partition, timestamp, headers }.
Topic management
list_topics → KafkaMetaBeginEx + extended KafkaMetaDataEx (see Dependencies)
No input. Returns: array of { name, partition_count, replication_factor }.
describe_topic → KafkaMetaBeginEx + extended KafkaMetaDataEx
Input: { "topic": string }
Returns: { name, partitions: [{ partition_id, leader, replicas, isr }] }.
create_topic → KafkaRequestBeginEx/createTopics + KafkaResponseBeginEx/createTopics
Input:
{
"type": "object",
"required": ["topic"],
"properties": {
"topic": { "type": "string" },
"partitions": { "type": "integer", "default": 1 },
"replication_factor": { "type": "integer", "default": 1 },
"configs": { "type": "object", "additionalProperties": { "type": "string" } }
}
}
Returns: { topic, created: true }.
delete_topic → KafkaRequestBeginEx/deleteTopics + KafkaResponseBeginEx/deleteTopics
Input: { "topic": string }
Returns: { topic, deleted: true }.
Consumer groups and cluster
list_consumer_groups → new KafkaRequestBeginEx/listGroups + KafkaResponseBeginEx/listGroups (see Dependencies)
No input. Returns: array of { group_id, state }.
describe_consumer_group → new KafkaRequestBeginEx/describeGroups + KafkaResponseBeginEx/describeGroups
Input: { "group": string }
Returns:
{
group_id, state,
members: [{ member_id, client_id, assignments: [{ topic, partition, offset, lag }] }]
}
Lag derived from KafkaOffsetFetchBeginEx + KafkaFetchBeginEx latest offset.
reset_offsets → new KafkaRequestBeginEx/alterConsumerGroupOffsets + KafkaResponseBeginEx/alterConsumerGroupOffsets
Input:
{
"type": "object",
"required": ["group", "topic", "offset"],
"properties": {
"group": { "type": "string" },
"topic": { "type": "string" },
"offset": { "oneOf": [{ "type": "string", "enum": ["earliest", "latest"] }, { "type": "integer" }] },
"partition": { "type": "integer", "description": "Omit to reset all partitions." }
}
}
Returns: { group, topic, reset: true }.
Note: requires the consumer group to be inactive. Return a clear error if the group has active members.
list_brokers → KafkaRequestBeginEx/describeCluster + KafkaResponseBeginEx/describeCluster
No input. Returns: array of { broker_id, host, port, rack } from KafkaDescribeClusterResponseBeginEx.brokers.
describe_cluster → KafkaRequestBeginEx/describeCluster + KafkaResponseBeginEx/describeCluster
No input. Returns: { cluster_id, controller_id, authorized_operations }.
cluster_overview → KafkaRequestBeginEx/describeCluster + extended KafkaMetaDataEx
No input. Returns:
{ broker_count, controller_id, under_replicated_partitions, offline_partitions, topic_count }.
broker_count and controller_id from KafkaDescribeClusterResponseBeginEx;
partition health from extended KafkaMetaDataEx.
describe_configs → KafkaDescribeBeginEx (topic) or new KafkaRequestBeginEx/describeConfigs (broker) — see Dependencies
Input: { "resource_type": "topic"|"broker", "resource_name": string }
Returns: array of { name, value, is_default, is_sensitive }.
alter_configs → KafkaRequestBeginEx/alterConfigs + KafkaResponseBeginEx/alterConfigs
Input:
{
"type": "object",
"required": ["resource_type", "resource_name", "configs"],
"properties": {
"resource_type": { "type": "string", "enum": ["topic", "broker"] },
"resource_name": { "type": "string" },
"configs": { "type": "object", "additionalProperties": { "type": "string" } }
}
}
Returns: { resource_type, resource_name, updated: true }.
Resources
Kafka topics can be exposed as readable MCP resources:
routes:
- exit: kafka_client0
when:
- resource: "kafka://{topic}/messages"
with:
capability: fetch
topic: "${params.topic}"
${params.topic} captures the {topic} segment from the resource URI template.
Configuration
mcp_kafka0:
type: mcp_kafka
kind: proxy
routes:
- exit: kafka_client0
when:
- tool: list_topics
- tool: describe_topic
- tool: create_topic
- tool: delete_topic
- tool: produce
- tool: consume
- tool: list_consumer_groups
- tool: describe_consumer_group
- tool: reset_offsets
- tool: describe_cluster
- tool: cluster_overview
- tool: list_brokers
- tool: describe_configs
- tool: alter_configs
- resource: "kafka://{topic}/messages"
Dependencies — required additions to binding-kafka
The following capabilities are absent from kafka.idl or have insufficient
response types. Each requires a separate issue against binding-kafka and must
land before the corresponding mcp_kafka tool can be implemented.
New Kafka API support
New entries required in the KafkaApi enum and KafkaRequestBeginEx /
KafkaResponseBeginEx unions:
| Tool |
Kafka API |
API key |
New types needed |
list_consumer_groups |
LIST_GROUPS |
16 |
KafkaListGroupsRequestBeginEx, KafkaListGroupsResponseBeginEx |
describe_consumer_group |
DESCRIBE_GROUPS |
15 |
KafkaDescribeGroupsRequestBeginEx, KafkaDescribeGroupsResponseBeginEx |
reset_offsets |
ALTER_CONSUMER_GROUP_OFFSETS |
53 |
KafkaAlterConsumerGroupOffsetsRequestBeginEx, KafkaAlterConsumerGroupOffsetsResponseBeginEx |
Extensions to existing types
| Tool |
Current type |
Missing fields |
list_topics, describe_topic, cluster_overview |
KafkaMetaDataEx |
replicationFactor, replicas[], isr[] per partition |
describe_configs (broker) |
KafkaDescribeBeginEx (topic-only) |
KafkaResourceType + broker resource support, or new KafkaDescribeConfigsRequestBeginEx in KafkaRequestBeginEx |
Blocked by
Additional context
Summary
Implement the
mcp_kafka · proxybinding in a newbinding-mcp-kafkamodule.Accepts
mcpstreams and produceskafkaZilla streams (withKafkaBeginExextensions). Exposes Kafka broker operations as intrinsic MCP tools — tool
schemas, input schemas, and Kafka stream mappings are all defined internally by
the binding. The operator's only configuration is which tools to expose and
which kafka client to route them to.
mcp_kafkais not a shorthand formcp_http+http_kafka. It exposes thefull set of Kafka protocol capabilities available through the downstream
kafka · clientbinding — including admin operations likecreate_topic,describe_cluster, and consumer group management that have no natural HTTPequivalent in Zilla. Application-level tool-to-topic mapping (e.g.
place_order→ orders topic) is expressed viamcp_asyncapiormcp_http+http_kafkainstead.The AWS MSK MCP server operates at the AWS control plane — managing MSK clusters
as AWS resources.
mcp_kafkaoperates at the Kafka data plane via thekafka · clientbinding. They are complementary.Stream type
mcpkafka(Zilla stream —KafkaBeginExwith appropriate capability/API)Kafka stream mappings
Each tool maps to a Zilla kafka stream type from
kafka.idl. All flyweightinstances must be pre-allocated as fields on the factory class — no
per-request allocation.
Data plane
produce→KafkaProduceBeginEx+KafkaProduceDataExInput:
{ "type": "object", "required": ["topic", "value"], "properties": { "topic": { "type": "string" }, "value": { "type": "string" }, "key": { "type": "string" }, "headers": { "type": "object", "additionalProperties": { "type": "string" } } } }consume→KafkaMergedBeginEx(FETCH_ONLY) +KafkaMergedFetchDataExInput:
{ "type": "object", "required": ["topic"], "properties": { "topic": { "type": "string" }, "limit": { "type": "integer", "default": 10 }, "offset": { "oneOf": [{ "type": "string", "enum": ["earliest", "latest"] }, { "type": "integer" }], "default": "latest" }, "partition": { "type": "integer" } } }Returns: array of
{ key, value, offset, partition, timestamp, headers }.Topic management
list_topics→KafkaMetaBeginEx+ extendedKafkaMetaDataEx(see Dependencies)No input. Returns: array of
{ name, partition_count, replication_factor }.describe_topic→KafkaMetaBeginEx+ extendedKafkaMetaDataExInput:
{ "topic": string }Returns:
{ name, partitions: [{ partition_id, leader, replicas, isr }] }.create_topic→KafkaRequestBeginEx/createTopics+KafkaResponseBeginEx/createTopicsInput:
{ "type": "object", "required": ["topic"], "properties": { "topic": { "type": "string" }, "partitions": { "type": "integer", "default": 1 }, "replication_factor": { "type": "integer", "default": 1 }, "configs": { "type": "object", "additionalProperties": { "type": "string" } } } }Returns:
{ topic, created: true }.delete_topic→KafkaRequestBeginEx/deleteTopics+KafkaResponseBeginEx/deleteTopicsInput:
{ "topic": string }Returns:
{ topic, deleted: true }.Consumer groups and cluster
list_consumer_groups→ newKafkaRequestBeginEx/listGroups+KafkaResponseBeginEx/listGroups(see Dependencies)No input. Returns: array of
{ group_id, state }.describe_consumer_group→ newKafkaRequestBeginEx/describeGroups+KafkaResponseBeginEx/describeGroupsInput:
{ "group": string }Returns:
Lag derived from
KafkaOffsetFetchBeginEx+KafkaFetchBeginExlatest offset.reset_offsets→ newKafkaRequestBeginEx/alterConsumerGroupOffsets+KafkaResponseBeginEx/alterConsumerGroupOffsetsInput:
{ "type": "object", "required": ["group", "topic", "offset"], "properties": { "group": { "type": "string" }, "topic": { "type": "string" }, "offset": { "oneOf": [{ "type": "string", "enum": ["earliest", "latest"] }, { "type": "integer" }] }, "partition": { "type": "integer", "description": "Omit to reset all partitions." } } }Returns:
{ group, topic, reset: true }.Note: requires the consumer group to be inactive. Return a clear error if the group has active members.
list_brokers→KafkaRequestBeginEx/describeCluster+KafkaResponseBeginEx/describeClusterNo input. Returns: array of
{ broker_id, host, port, rack }fromKafkaDescribeClusterResponseBeginEx.brokers.describe_cluster→KafkaRequestBeginEx/describeCluster+KafkaResponseBeginEx/describeClusterNo input. Returns:
{ cluster_id, controller_id, authorized_operations }.cluster_overview→KafkaRequestBeginEx/describeCluster+ extendedKafkaMetaDataExNo input. Returns:
{ broker_count, controller_id, under_replicated_partitions, offline_partitions, topic_count }.broker_countandcontroller_idfromKafkaDescribeClusterResponseBeginEx;partition health from extended
KafkaMetaDataEx.describe_configs→KafkaDescribeBeginEx(topic) or newKafkaRequestBeginEx/describeConfigs(broker) — see DependenciesInput:
{ "resource_type": "topic"|"broker", "resource_name": string }Returns: array of
{ name, value, is_default, is_sensitive }.alter_configs→KafkaRequestBeginEx/alterConfigs+KafkaResponseBeginEx/alterConfigsInput:
{ "type": "object", "required": ["resource_type", "resource_name", "configs"], "properties": { "resource_type": { "type": "string", "enum": ["topic", "broker"] }, "resource_name": { "type": "string" }, "configs": { "type": "object", "additionalProperties": { "type": "string" } } } }Returns:
{ resource_type, resource_name, updated: true }.Resources
Kafka topics can be exposed as readable MCP resources:
${params.topic}captures the{topic}segment from the resource URI template.Configuration
Dependencies — required additions to
binding-kafkaThe following capabilities are absent from
kafka.idlor have insufficientresponse types. Each requires a separate issue against
binding-kafkaand mustland before the corresponding
mcp_kafkatool can be implemented.New Kafka API support
New entries required in the
KafkaApienum andKafkaRequestBeginEx/KafkaResponseBeginExunions:list_consumer_groupsLIST_GROUPSKafkaListGroupsRequestBeginEx,KafkaListGroupsResponseBeginExdescribe_consumer_groupDESCRIBE_GROUPSKafkaDescribeGroupsRequestBeginEx,KafkaDescribeGroupsResponseBeginExreset_offsetsALTER_CONSUMER_GROUP_OFFSETSKafkaAlterConsumerGroupOffsetsRequestBeginEx,KafkaAlterConsumerGroupOffsetsResponseBeginExExtensions to existing types
list_topics,describe_topic,cluster_overviewKafkaMetaDataExreplicationFactor,replicas[],isr[]per partitiondescribe_configs(broker)KafkaDescribeBeginEx(topic-only)KafkaResourceType+ broker resource support, or newKafkaDescribeConfigsRequestBeginExinKafkaRequestBeginExBlocked by
binding-mcp: implement mcp · server binding(binding-mcp: implement mcp · server binding #1668) — defines themcpstream typebinding-kafkaissues for the three new Kafka APIs abovebinding-kafkaissue forKafkaMetaDataExextensionAdditional context