diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheBootstrapFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheBootstrapFactory.java index 88c0633bc8..093b612c8b 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheBootstrapFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheBootstrapFactory.java @@ -35,10 +35,10 @@ import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaBindingConfig; import io.aklivity.zilla.runtime.binding.kafka.internal.types.Array32FW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.ArrayFW; -import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaConfigFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaConfigDetailFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaOffsetFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaOffsetType; -import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaPartitionFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaPartitionMetadataFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.OctetsFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.String16FW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.AbortFW; @@ -666,14 +666,14 @@ private void doBootstrapCleanup( private void onTopicConfigChanged( long traceId, - ArrayFW changedConfigs) + ArrayFW changedConfigs) { metaStream.doMetaInitialBeginIfNecessary(traceId); } private void onTopicMetaDataChanged( long traceId, - ArrayFW partitions) + ArrayFW partitions) { leadersByPartitionId.clear(); partitions.forEach(p -> leadersByPartitionId.put(p.partitionId(), p.leaderId())); @@ -698,7 +698,7 @@ private void onTopicMetaDataChanged( private void onPartitionMetaDataChangedIfNecessary( long traceId, - KafkaPartitionFW partition) + KafkaPartitionMetadataFW partition) { final int partitionId = partition.partitionId(); final int leaderId = partition.leaderId(); @@ -886,7 +886,7 @@ private void doDescribeInitialBegin( traceId, bootstrap.authorization, 0L, ex -> ex.set((b, o, l) -> kafkaBeginExRW.wrap(b, o, l) .typeId(kafkaTypeId) - .describe(m -> m.topic(bootstrap.topic) + .describe(m -> m.name(bootstrap.topic) .configsItem(ci -> ci.set(CONFIG_NAME_CLEANUP_POLICY)) .configsItem(ci -> ci.set(CONFIG_NAME_MAX_MESSAGE_BYTES)) .configsItem(ci -> ci.set(CONFIG_NAME_SEGMENT_BYTES)) @@ -1009,7 +1009,7 @@ private void onDescribeReplyData( { final KafkaDataExFW kafkaDataEx = extension.get(kafkaDataExRO::wrap); final KafkaDescribeDataExFW kafkaDescribeDataEx = kafkaDataEx.describe(); - final ArrayFW changedConfigs = kafkaDescribeDataEx.configs(); + final ArrayFW changedConfigs = kafkaDescribeDataEx.configs(); bootstrap.onTopicConfigChanged(traceId, changedConfigs); @@ -1266,7 +1266,7 @@ private void onMetaReplyData( { final KafkaDataExFW kafkaDataEx = extension.get(kafkaDataExRO::wrap); final KafkaMetaDataExFW kafkaMetaDataEx = kafkaDataEx.meta(); - final ArrayFW partitions = kafkaMetaDataEx.partitions(); + final ArrayFW partitions = kafkaMetaDataEx.partitions(); bootstrap.onTopicMetaDataChanged(traceId, partitions); diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheClientDescribeFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheClientDescribeFactory.java index ee6eca6f42..726304f679 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheClientDescribeFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheClientDescribeFactory.java @@ -35,7 +35,7 @@ import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaRouteConfig; import io.aklivity.zilla.runtime.binding.kafka.internal.types.ArrayFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.Flyweight; -import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaConfigFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaConfigDetailFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.OctetsFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.String16FW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.AbortFW; @@ -127,7 +127,7 @@ public MessageConsumer newStream( final KafkaBeginExFW kafkaBeginEx = extension.get(kafkaBeginExRO::tryWrap); assert kafkaBeginEx.kind() == KafkaBeginExFW.KIND_DESCRIBE; final KafkaDescribeBeginExFW kafkaDescribeBeginEx = kafkaBeginEx.describe(); - final String16FW topic = kafkaDescribeBeginEx.topic(); + final String16FW topic = kafkaDescribeBeginEx.name(); final String topicName = topic.asString(); MessageConsumer newStream = null; @@ -484,7 +484,7 @@ private void doDescribeFanoutInitialBegin( traceId, authorization, 0L, ex -> ex.set((b, o, l) -> kafkaBeginExRW.wrap(b, o, l) .typeId(kafkaTypeId) - .describe(d -> d.topic(topic).configs(cs -> configNames.forEach(c -> cs.item(i -> i.set(c, UTF_8))))) + .describe(d -> d.name(topic).configs(cs -> configNames.forEach(c -> cs.item(i -> i.set(c, UTF_8))))) .build() .sizeof())); state = KafkaState.openingInitial(state); @@ -606,7 +606,7 @@ private void onDescribeFanoutReplyData( if (kafkaDescribeDataEx != null) { - final ArrayFW changedConfigs = kafkaDescribeDataEx.configs(); + final ArrayFW changedConfigs = kafkaDescribeDataEx.configs(); if (configValues == null) { configValues = new LinkedHashMap<>(); @@ -852,7 +852,7 @@ private void doDescribeReplyBegin( traceId, authorization, affinity, ex -> ex.set((b, o, l) -> kafkaBeginExRW.wrap(b, o, l) .typeId(kafkaTypeId) - .describe(m -> m.topic(group.topic) + .describe(m -> m.name(group.topic) .configs(cs -> group.configNames.forEach(c -> cs.item(i -> i.set(c, UTF_8))))) .build() .sizeof())); diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheMetaFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheMetaFactory.java index 06e00f0ea9..c3729a8c5a 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheMetaFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheMetaFactory.java @@ -41,7 +41,7 @@ import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaRouteConfig; import io.aklivity.zilla.runtime.binding.kafka.internal.types.ArrayFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.Flyweight; -import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaPartitionFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaPartitionMetadataFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.OctetsFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.String16FW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.AbortFW; @@ -471,8 +471,10 @@ private void onMetaFanoutMemberOpened( final KafkaDataExFW kafkaDataEx = kafkaDataExRW.wrap(extBuffer, 0, extBuffer.capacity()) .typeId(kafkaTypeId) - .meta(m -> leadersByPartitionId.forEach((p, l) -> m.partitionsItem(i -> i.partitionId(p) - .leaderId(l)))) + .meta(m -> m.replicationFactor((short) 0) + .partitions(ps -> leadersByPartitionId.forEach( + (p, l) -> ps.item(i -> i.partitionId(p).leaderId(l) + .replicas(r -> {}).isr(r -> {}))))) .build(); member.doMetaReplyDataIfNecessary(traceId, kafkaDataEx); } @@ -699,7 +701,7 @@ private void onMetaFanoutReplyData( if (kafkaMetaDataEx != null) { - final ArrayFW partitions = kafkaMetaDataEx.partitions(); + final ArrayFW partitions = kafkaMetaDataEx.partitions(); leadersByPartitionId.clear(); partitions.forEach(p -> leadersByPartitionId.put(p.partitionId(), p.leaderId())); diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheServerDescribeFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheServerDescribeFactory.java index 18440bf2de..413ca080cb 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheServerDescribeFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheServerDescribeFactory.java @@ -42,7 +42,7 @@ import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaRouteConfig; import io.aklivity.zilla.runtime.binding.kafka.internal.types.ArrayFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.Flyweight; -import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaConfigFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaConfigDetailFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.OctetsFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.String16FW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.AbortFW; @@ -152,7 +152,7 @@ public MessageConsumer newStream( final KafkaBeginExFW kafkaBeginEx = extension.get(kafkaBeginExRO::tryWrap); assert kafkaBeginEx.kind() == KafkaBeginExFW.KIND_DESCRIBE; final KafkaDescribeBeginExFW kafkaDescribeBeginEx = kafkaBeginEx.describe(); - final String16FW beginTopic = kafkaDescribeBeginEx.topic(); + final String16FW beginTopic = kafkaDescribeBeginEx.name(); final String topicName = beginTopic.asString(); MessageConsumer newStream = null; @@ -471,7 +471,8 @@ private void onDescribeFanoutMemberOpened( final KafkaDataExFW kafkaDataEx = kafkaDataExRW.wrap(extBuffer, 0, extBuffer.capacity()) .typeId(kafkaTypeId) - .describe(d -> configValues.forEach((n, v) -> d.configsItem(i -> i.name(n).value(v)))) + .describe(d -> d.configs(cs -> configValues.forEach( + (n, v) -> cs.item(i -> i.name(n).value(v).isDefault(0).isSensitive(0))))) .build(); member.doDescribeReplyDataIfNecessary(traceId, kafkaDataEx); } @@ -527,7 +528,7 @@ private void doDescribeFanoutInitialBegin( traceId, authorization, 0L, ex -> ex.set((b, o, l) -> kafkaBeginExRW.wrap(b, o, l) .typeId(kafkaTypeId) - .describe(d -> d.topic(topic.name()) + .describe(d -> d.name(topic.name()) .configs(cs -> configNames.forEach(c -> cs.item(i -> i.set(c, UTF_8))))) .build() .sizeof())); @@ -711,7 +712,7 @@ private void onDescribeFanoutReplyData( if (kafkaDescribeDataEx != null) { - final ArrayFW changedConfigs = kafkaDescribeDataEx.configs(); + final ArrayFW changedConfigs = kafkaDescribeDataEx.configs(); if (configValues == null) { configValues = new LinkedHashMap<>(); @@ -726,7 +727,7 @@ private void onDescribeFanoutReplyData( } private void onDescribeFanoutConfigChanged( - KafkaConfigFW config) + KafkaConfigDetailFW config) { final String16FW configName = config.name(); final String16FW configValue = config.value(); @@ -1021,7 +1022,7 @@ private void doDescribeReplyBegin( traceId, authorization, affinity, ex -> ex.set((b, o, l) -> kafkaBeginExRW.wrap(b, o, l) .typeId(kafkaTypeId) - .describe(m -> m.topic(group.topic.name()) + .describe(m -> m.name(group.topic.name()) .configs(cs -> group.configNames.forEach(c -> cs.item(i -> i.set(c, UTF_8))))) .build() .sizeof())); diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientDescribeFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientDescribeFactory.java index d9c6ff6a85..3d276ad0f4 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientDescribeFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientDescribeFactory.java @@ -206,7 +206,7 @@ public MessageConsumer newStream( assert kafkaBeginEx.kind() == KafkaBeginExFW.KIND_DESCRIBE; final KafkaDescribeBeginExFW kafkaDescribeBeginEx = kafkaBeginEx.describe(); - final String16FW beginTopic = kafkaDescribeBeginEx.topic(); + final String16FW beginTopic = kafkaDescribeBeginEx.name(); final String topicName = beginTopic.asString(); MessageConsumer newStream = null; @@ -823,7 +823,7 @@ private void doApplicationBegin( ex -> ex.set((b, o, l) -> kafkaBeginExRW.wrap(b, o, l) .typeId(kafkaTypeId) .describe(m -> m - .topic(topic) + .name(topic) .configs(cs -> configs.forEach(n -> { cs.item(i -> i.set(n, UTF_8)); @@ -1675,7 +1675,7 @@ private void onDecodeDescribeResponse( { final KafkaDataExFW kafkaDataEx = kafkaDataExRW.wrap(extBuffer, 0, extBuffer.capacity()) .typeId(kafkaTypeId) - .describe(d -> changedConfigs.forEach(n -> d.configsItem(ci -> ci.name(n).value(configs.get(n))))) + .describe(d -> changedConfigs.forEach(n -> d.configsItem(ci -> ci.name(n).value(configs.get(n)).isDefault(0).isSensitive(0)))) .build(); doApplicationData(traceId, authorization, kafkaDataEx); diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientMetaFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientMetaFactory.java index 4964c96090..12f3480f52 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientMetaFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientMetaFactory.java @@ -1891,7 +1891,9 @@ private void onDecodeMetaResponse( final KafkaDataExFW kafkaDataEx = kafkaDataExRW.wrap(extBuffer, 0, extBuffer.capacity()) .typeId(kafkaTypeId) - .meta(m -> partitions.forEach((k, v) -> m.partitionsItem(pi -> pi.partitionId(k).leaderId(v)))) + .meta(m -> m.replicationFactor((short) 0) + .partitions(ps -> partitions.forEach((k, v) -> + ps.item(pi -> pi.partitionId(k).leaderId(v).replicas(r -> {}).isr(r -> {}))))) .build(); doApplicationData(traceId, authorization, kafkaDataEx); diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaMergedFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaMergedFactory.java index 901f0fb455..e01f273c91 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaMergedFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaMergedFactory.java @@ -53,7 +53,7 @@ import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaAckMode; import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaCapabilities; import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaConditionFW; -import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaConfigFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaConfigDetailFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaDeltaFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaDeltaType; import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaEvaluation; @@ -65,7 +65,7 @@ import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaNotFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaOffsetFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaOffsetType; -import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaPartitionFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaPartitionMetadataFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaValueMatchFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.OctetsFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.String16FW; @@ -1984,7 +1984,7 @@ private void cleanupBudgetCreditorIfNecessary() private void onTopicConfigChanged( long traceId, - ArrayFW configs) + ArrayFW configs) { configs.forEach(c -> { @@ -1998,7 +1998,7 @@ private void onTopicConfigChanged( private void onTopicMetaDataChanged( long traceId, - ArrayFW partitions) + ArrayFW partitions) { leadersByPartitionId.clear(); partitions.forEach(p -> leadersByPartitionId.put(p.partitionId(), p.leaderId())); @@ -2390,7 +2390,7 @@ private void doDescribeInitialBegin( traceId, merged.authorization, 0L, ex -> ex.set((b, o, l) -> kafkaBeginExRW.wrap(b, o, l) .typeId(kafkaTypeId) - .describe(m -> m.topic(merged.topic) + .describe(m -> m.name(merged.topic) .configsItem(ci -> ci.set(CONFIG_NAME_CLEANUP_POLICY)) .configsItem(ci -> ci.set(CONFIG_NAME_MAX_MESSAGE_BYTES)) .configsItem(ci -> ci.set(CONFIG_NAME_SEGMENT_BYTES)) @@ -2513,7 +2513,7 @@ private void onDescribeReplyData( { final KafkaDataExFW kafkaDataEx = extension.get(kafkaDataExRO::wrap); final KafkaDescribeDataExFW kafkaDescribeDataEx = kafkaDataEx.describe(); - final ArrayFW configs = kafkaDescribeDataEx.configs(); + final ArrayFW configs = kafkaDescribeDataEx.configs(); merged.onTopicConfigChanged(traceId, configs); doDescribeReplyWindow(traceId, 0, replyMax); @@ -2769,7 +2769,7 @@ private void onMetaReplyData( { final KafkaDataExFW kafkaDataEx = extension.get(kafkaDataExRO::wrap); final KafkaMetaDataExFW kafkaMetaDataEx = kafkaDataEx.meta(); - final ArrayFW partitions = kafkaMetaDataEx.partitions(); + final ArrayFW partitions = kafkaMetaDataEx.partitions(); merged.onTopicMetaDataChanged(traceId, partitions); doMetaReplyWindow(traceId, 0, replyMax); diff --git a/runtime/binding-mcp-kafka/pom.xml b/runtime/binding-mcp-kafka/pom.xml new file mode 100644 index 0000000000..1a80ff796e --- /dev/null +++ b/runtime/binding-mcp-kafka/pom.xml @@ -0,0 +1,218 @@ + + + + 4.0.0 + + io.aklivity.zilla + runtime + develop-SNAPSHOT + ../pom.xml + + + binding-mcp-kafka + zilla::runtime::binding-mcp-kafka + + + + Aklivity Community License Agreement + https://www.aklivity.io/aklivity-community-license/ + repo + + + + + 0.00 + 500 + + + + + ${project.groupId} + binding-mcp-kafka.spec + ${project.version} + provided + + + ${project.groupId} + engine + ${project.version} + provided + + + ${project.groupId} + binding-mcp + ${project.version} + provided + + + ${project.groupId} + binding-kafka + ${project.version} + provided + + + ${project.groupId} + engine + test-jar + ${project.version} + test + + + junit + junit + test + + + org.hamcrest + hamcrest + test + + + org.mockito + mockito-core + test + + + io.aklivity.k3po + control-junit + test + + + io.aklivity.k3po + lang + test + + + + + + + org.jasig.maven + maven-notice-plugin + + + ${project.groupId} + flyweight-maven-plugin + ${project.version} + + core internal mcp kafka + io.aklivity.zilla.runtime.binding.mcp.kafka.internal.types + + + + + generate + + + + + + com.mycila + license-maven-plugin + + + maven-checkstyle-plugin + + + maven-dependency-plugin + + + process-resources + + unpack + + + + + ${project.groupId} + binding-mcp-kafka.spec + + + ^\Qio/aklivity/zilla/specs/binding/mcp/kafka/\E + io/aklivity/zilla/runtime/binding/mcp/kafka/internal/ + + + + + io/aklivity/zilla/specs/binding/mcp/kafka/schema/mcp.kafka.schema.patch.json + ${project.build.directory}/classes + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + org.apache.maven.plugins + maven-surefire-plugin + + + org.moditect + moditect-maven-plugin + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + + + io.aklivity.k3po + k3po-maven-plugin + + + ${project.groupId} + engine + ${project.version} + test-jar + + + ${project.groupId} + engine + ${project.version} + + + + + org.apache.maven.plugins + maven-failsafe-plugin + + + org.jacoco + jacoco-maven-plugin + + + io/aklivity/zilla/runtime/binding/mcp/kafka/internal/types/**/*.class + + + + BUNDLE + + + INSTRUCTION + COVEREDRATIO + ${jacoco.coverage.ratio} + + + CLASS + MISSEDCOUNT + ${jacoco.missed.count} + + + + + + + + + diff --git a/runtime/binding-mcp-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mcp/kafka/config/McpKafkaConditionConfig.java b/runtime/binding-mcp-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mcp/kafka/config/McpKafkaConditionConfig.java new file mode 100644 index 0000000000..f36705fc4d --- /dev/null +++ b/runtime/binding-mcp-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mcp/kafka/config/McpKafkaConditionConfig.java @@ -0,0 +1,31 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.mcp.kafka.config; + +import io.aklivity.zilla.runtime.engine.config.ConditionConfig; + +public final class McpKafkaConditionConfig extends ConditionConfig +{ + public final String tool; + public final String resource; + + public McpKafkaConditionConfig( + String tool, + String resource) + { + this.tool = tool; + this.resource = resource; + } +} diff --git a/runtime/binding-mcp-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mcp/kafka/config/McpKafkaOptionsConfig.java b/runtime/binding-mcp-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mcp/kafka/config/McpKafkaOptionsConfig.java new file mode 100644 index 0000000000..55a0c7b400 --- /dev/null +++ b/runtime/binding-mcp-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mcp/kafka/config/McpKafkaOptionsConfig.java @@ -0,0 +1,24 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.mcp.kafka.config; + +import io.aklivity.zilla.runtime.engine.config.OptionsConfig; + +public final class McpKafkaOptionsConfig extends OptionsConfig +{ + public McpKafkaOptionsConfig() + { + } +} diff --git a/runtime/binding-mcp-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mcp/kafka/internal/McpKafkaBinding.java b/runtime/binding-mcp-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mcp/kafka/internal/McpKafkaBinding.java new file mode 100644 index 0000000000..1bca27165e --- /dev/null +++ b/runtime/binding-mcp-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mcp/kafka/internal/McpKafkaBinding.java @@ -0,0 +1,52 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.mcp.kafka.internal; + +import java.net.URL; + +import io.aklivity.zilla.runtime.engine.EngineContext; +import io.aklivity.zilla.runtime.engine.binding.Binding; + +public final class McpKafkaBinding implements Binding +{ + public static final String NAME = "mcp_kafka"; + + private final McpKafkaConfiguration config; + + McpKafkaBinding( + McpKafkaConfiguration config) + { + this.config = config; + } + + @Override + public String name() + { + return McpKafkaBinding.NAME; + } + + @Override + public URL type() + { + return getClass().getResource("schema/mcp.kafka.schema.patch.json"); + } + + @Override + public McpKafkaBindingContext supply( + EngineContext context) + { + return new McpKafkaBindingContext(config, context); + } +} diff --git a/runtime/binding-mcp-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mcp/kafka/internal/McpKafkaBindingContext.java b/runtime/binding-mcp-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mcp/kafka/internal/McpKafkaBindingContext.java new file mode 100644 index 0000000000..51c65d192b --- /dev/null +++ b/runtime/binding-mcp-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mcp/kafka/internal/McpKafkaBindingContext.java @@ -0,0 +1,71 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.mcp.kafka.internal; + +import static io.aklivity.zilla.runtime.engine.config.KindConfig.PROXY; +import static java.util.Collections.singletonMap; + +import java.util.Map; + +import io.aklivity.zilla.runtime.binding.mcp.kafka.internal.stream.McpKafkaProxyFactory; +import io.aklivity.zilla.runtime.engine.EngineContext; +import io.aklivity.zilla.runtime.engine.binding.BindingContext; +import io.aklivity.zilla.runtime.engine.binding.BindingHandler; +import io.aklivity.zilla.runtime.engine.config.BindingConfig; +import io.aklivity.zilla.runtime.engine.config.KindConfig; + +final class McpKafkaBindingContext implements BindingContext +{ + private final Map factories; + + McpKafkaBindingContext( + McpKafkaConfiguration config, + EngineContext context) + { + this.factories = singletonMap(PROXY, new McpKafkaProxyFactory(config, context)); + } + + @Override + public BindingHandler attach( + BindingConfig binding) + { + final McpKafkaProxyFactory factory = factories.get(binding.kind); + + if (factory != null) + { + factory.attach(binding); + } + + return factory; + } + + @Override + public void detach( + BindingConfig binding) + { + final McpKafkaProxyFactory factory = factories.get(binding.kind); + + if (factory != null) + { + factory.detach(binding.id); + } + } + + @Override + public String toString() + { + return String.format("%s %s", getClass().getSimpleName(), factories); + } +} diff --git a/runtime/binding-mcp-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mcp/kafka/internal/McpKafkaBindingFactorySpi.java b/runtime/binding-mcp-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mcp/kafka/internal/McpKafkaBindingFactorySpi.java new file mode 100644 index 0000000000..c070c1cc1d --- /dev/null +++ b/runtime/binding-mcp-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mcp/kafka/internal/McpKafkaBindingFactorySpi.java @@ -0,0 +1,34 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.mcp.kafka.internal; + +import io.aklivity.zilla.runtime.engine.Configuration; +import io.aklivity.zilla.runtime.engine.binding.BindingFactorySpi; + +public final class McpKafkaBindingFactorySpi implements BindingFactorySpi +{ + @Override + public String type() + { + return McpKafkaBinding.NAME; + } + + @Override + public McpKafkaBinding create( + Configuration config) + { + return new McpKafkaBinding(new McpKafkaConfiguration(config)); + } +} diff --git a/runtime/binding-mcp-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mcp/kafka/internal/McpKafkaConfiguration.java b/runtime/binding-mcp-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mcp/kafka/internal/McpKafkaConfiguration.java new file mode 100644 index 0000000000..d3a5ab6bba --- /dev/null +++ b/runtime/binding-mcp-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mcp/kafka/internal/McpKafkaConfiguration.java @@ -0,0 +1,34 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.mcp.kafka.internal; + +import io.aklivity.zilla.runtime.engine.Configuration; + +public class McpKafkaConfiguration extends Configuration +{ + private static final ConfigurationDef MCP_KAFKA_CONFIG; + + static + { + final ConfigurationDef config = new ConfigurationDef("zilla.binding.mcp.kafka"); + MCP_KAFKA_CONFIG = config; + } + + public McpKafkaConfiguration( + Configuration config) + { + super(MCP_KAFKA_CONFIG, config); + } +} diff --git a/runtime/binding-mcp-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mcp/kafka/internal/config/McpKafkaBindingConfig.java b/runtime/binding-mcp-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mcp/kafka/internal/config/McpKafkaBindingConfig.java new file mode 100644 index 0000000000..4c1a3364e0 --- /dev/null +++ b/runtime/binding-mcp-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mcp/kafka/internal/config/McpKafkaBindingConfig.java @@ -0,0 +1,49 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.mcp.kafka.internal.config; + +import static java.util.stream.Collectors.toList; + +import java.util.List; + +import io.aklivity.zilla.runtime.engine.config.BindingConfig; +import io.aklivity.zilla.runtime.engine.config.KindConfig; + +public final class McpKafkaBindingConfig +{ + public final long id; + public final String name; + public final KindConfig kind; + public final List routes; + + public McpKafkaBindingConfig( + BindingConfig binding) + { + this.id = binding.id; + this.name = binding.name; + this.kind = binding.kind; + this.routes = binding.routes.stream().map(McpKafkaRouteConfig::new).collect(toList()); + } + + public McpKafkaRouteConfig resolve( + long authorization, + String tool) + { + return routes.stream() + .filter(r -> r.matches(tool)) + .findFirst() + .orElse(null); + } +} diff --git a/runtime/binding-mcp-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mcp/kafka/internal/config/McpKafkaConditionConfigAdapter.java b/runtime/binding-mcp-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mcp/kafka/internal/config/McpKafkaConditionConfigAdapter.java new file mode 100644 index 0000000000..f2e7fec538 --- /dev/null +++ b/runtime/binding-mcp-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mcp/kafka/internal/config/McpKafkaConditionConfigAdapter.java @@ -0,0 +1,73 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.mcp.kafka.internal.config; + +import jakarta.json.Json; +import jakarta.json.JsonObject; +import jakarta.json.JsonObjectBuilder; +import jakarta.json.bind.adapter.JsonbAdapter; + +import io.aklivity.zilla.runtime.binding.mcp.kafka.config.McpKafkaConditionConfig; +import io.aklivity.zilla.runtime.binding.mcp.kafka.internal.McpKafkaBinding; +import io.aklivity.zilla.runtime.engine.config.ConditionConfig; +import io.aklivity.zilla.runtime.engine.config.ConditionConfigAdapterSpi; + +public final class McpKafkaConditionConfigAdapter implements ConditionConfigAdapterSpi, JsonbAdapter +{ + private static final String TOOL_NAME = "tool"; + private static final String RESOURCE_NAME = "resource"; + + @Override + public String type() + { + return McpKafkaBinding.NAME; + } + + @Override + public JsonObject adaptToJson( + ConditionConfig condition) + { + McpKafkaConditionConfig mcpKafkaCondition = (McpKafkaConditionConfig) condition; + + JsonObjectBuilder object = Json.createObjectBuilder(); + + if (mcpKafkaCondition.tool != null) + { + object.add(TOOL_NAME, mcpKafkaCondition.tool); + } + + if (mcpKafkaCondition.resource != null) + { + object.add(RESOURCE_NAME, mcpKafkaCondition.resource); + } + + return object.build(); + } + + @Override + public ConditionConfig adaptFromJson( + JsonObject object) + { + String tool = object.containsKey(TOOL_NAME) + ? object.getString(TOOL_NAME) + : null; + + String resource = object.containsKey(RESOURCE_NAME) + ? object.getString(RESOURCE_NAME) + : null; + + return new McpKafkaConditionConfig(tool, resource); + } +} diff --git a/runtime/binding-mcp-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mcp/kafka/internal/config/McpKafkaRouteConfig.java b/runtime/binding-mcp-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mcp/kafka/internal/config/McpKafkaRouteConfig.java new file mode 100644 index 0000000000..5e3b7b91de --- /dev/null +++ b/runtime/binding-mcp-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mcp/kafka/internal/config/McpKafkaRouteConfig.java @@ -0,0 +1,51 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.mcp.kafka.internal.config; + +import static java.util.stream.Collectors.toList; + +import java.util.List; + +import io.aklivity.zilla.runtime.binding.mcp.kafka.config.McpKafkaConditionConfig; +import io.aklivity.zilla.runtime.engine.config.RouteConfig; + +public final class McpKafkaRouteConfig +{ + public final long id; + + private final List when; + + public McpKafkaRouteConfig( + RouteConfig route) + { + this.id = route.id; + this.when = route.when.stream() + .map(McpKafkaConditionConfig.class::cast) + .collect(toList()); + } + + public boolean matches( + String tool) + { + return when.isEmpty() || when.stream().anyMatch(w -> matchesTool(w, tool)); + } + + private boolean matchesTool( + McpKafkaConditionConfig condition, + String tool) + { + return condition.tool == null || condition.tool.equals(tool); + } +} diff --git a/runtime/binding-mcp-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mcp/kafka/internal/stream/McpKafkaProxyFactory.java b/runtime/binding-mcp-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mcp/kafka/internal/stream/McpKafkaProxyFactory.java new file mode 100644 index 0000000000..c8e80b3703 --- /dev/null +++ b/runtime/binding-mcp-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mcp/kafka/internal/stream/McpKafkaProxyFactory.java @@ -0,0 +1,830 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.mcp.kafka.internal.stream; + +import static io.aklivity.zilla.runtime.binding.mcp.kafka.internal.types.KafkaCapabilities.FETCH_ONLY; +import static io.aklivity.zilla.runtime.binding.mcp.kafka.internal.types.KafkaCapabilities.PRODUCE_ONLY; + +import java.util.function.LongUnaryOperator; + +import org.agrona.DirectBuffer; +import org.agrona.MutableDirectBuffer; +import org.agrona.collections.Long2ObjectHashMap; +import org.agrona.concurrent.UnsafeBuffer; + +import io.aklivity.zilla.runtime.binding.mcp.kafka.internal.McpKafkaConfiguration; +import io.aklivity.zilla.runtime.binding.mcp.kafka.internal.config.McpKafkaBindingConfig; +import io.aklivity.zilla.runtime.binding.mcp.kafka.internal.config.McpKafkaRouteConfig; +import io.aklivity.zilla.runtime.binding.mcp.kafka.internal.types.Flyweight; +import io.aklivity.zilla.runtime.binding.mcp.kafka.internal.types.OctetsFW; +import io.aklivity.zilla.runtime.binding.mcp.kafka.internal.types.stream.AbortFW; +import io.aklivity.zilla.runtime.binding.mcp.kafka.internal.types.stream.BeginFW; +import io.aklivity.zilla.runtime.binding.mcp.kafka.internal.types.stream.DataFW; +import io.aklivity.zilla.runtime.binding.mcp.kafka.internal.types.stream.EndFW; +import io.aklivity.zilla.runtime.binding.mcp.kafka.internal.types.stream.KafkaBeginExFW; +import io.aklivity.zilla.runtime.binding.mcp.kafka.internal.types.stream.McpBeginExFW; +import io.aklivity.zilla.runtime.binding.mcp.kafka.internal.types.stream.ResetFW; +import io.aklivity.zilla.runtime.binding.mcp.kafka.internal.types.stream.WindowFW; +import io.aklivity.zilla.runtime.engine.EngineContext; +import io.aklivity.zilla.runtime.engine.binding.BindingHandler; +import io.aklivity.zilla.runtime.engine.binding.function.MessageConsumer; +import io.aklivity.zilla.runtime.engine.config.BindingConfig; + +public final class McpKafkaProxyFactory implements BindingHandler +{ + private static final String MCP_TYPE_NAME = "mcp"; + private static final String KAFKA_TYPE_NAME = "kafka"; + + private static final String TOOL_PRODUCE = "produce"; + private static final String TOOL_CONSUME = "consume"; + private static final String TOOL_LIST_TOPICS = "list_topics"; + private static final String TOOL_DESCRIBE_TOPIC = "describe_topic"; + private static final String TOOL_CREATE_TOPIC = "create_topic"; + private static final String TOOL_DELETE_TOPIC = "delete_topic"; + private static final String TOOL_LIST_CONSUMER_GROUPS = "list_consumer_groups"; + private static final String TOOL_DESCRIBE_CONSUMER_GROUP = "describe_consumer_group"; + private static final String TOOL_RESET_OFFSETS = "reset_offsets"; + private static final String TOOL_LIST_BROKERS = "list_brokers"; + private static final String TOOL_DESCRIBE_CLUSTER = "describe_cluster"; + private static final String TOOL_CLUSTER_OVERVIEW = "cluster_overview"; + private static final String TOOL_DESCRIBE_CONFIGS = "describe_configs"; + private static final String TOOL_ALTER_CONFIGS = "alter_configs"; + + private final OctetsFW emptyRO = new OctetsFW().wrap(new UnsafeBuffer(0L, 0), 0, 0); + + private final BeginFW beginRO = new BeginFW(); + private final DataFW dataRO = new DataFW(); + private final EndFW endRO = new EndFW(); + private final AbortFW abortRO = new AbortFW(); + private final WindowFW windowRO = new WindowFW(); + private final ResetFW resetRO = new ResetFW(); + private final McpBeginExFW mcpBeginExRO = new McpBeginExFW(); + + private final BeginFW.Builder beginRW = new BeginFW.Builder(); + private final DataFW.Builder dataRW = new DataFW.Builder(); + private final EndFW.Builder endRW = new EndFW.Builder(); + private final AbortFW.Builder abortRW = new AbortFW.Builder(); + private final WindowFW.Builder windowRW = new WindowFW.Builder(); + private final ResetFW.Builder resetRW = new ResetFW.Builder(); + private final KafkaBeginExFW.Builder kafkaBeginExRW = new KafkaBeginExFW.Builder(); + + private final MutableDirectBuffer writeBuffer; + private final MutableDirectBuffer extBuffer; + private final BindingHandler streamFactory; + private final LongUnaryOperator supplyInitialId; + private final LongUnaryOperator supplyReplyId; + private final int mcpTypeId; + private final int kafkaTypeId; + + private final Long2ObjectHashMap bindings; + + public McpKafkaProxyFactory( + McpKafkaConfiguration config, + EngineContext context) + { + this.writeBuffer = context.writeBuffer(); + this.extBuffer = new UnsafeBuffer(new byte[context.writeBuffer().capacity()]); + this.streamFactory = context.streamFactory(); + this.supplyInitialId = context::supplyInitialId; + this.supplyReplyId = context::supplyReplyId; + this.bindings = new Long2ObjectHashMap<>(); + this.mcpTypeId = context.supplyTypeId(MCP_TYPE_NAME); + this.kafkaTypeId = context.supplyTypeId(KAFKA_TYPE_NAME); + } + + @Override + public int originTypeId() + { + return mcpTypeId; + } + + @Override + public int routedTypeId() + { + return kafkaTypeId; + } + + public void attach( + BindingConfig binding) + { + McpKafkaBindingConfig newBinding = new McpKafkaBindingConfig(binding); + bindings.put(binding.id, newBinding); + } + + public void detach( + long bindingId) + { + bindings.remove(bindingId); + } + + @Override + public MessageConsumer newStream( + int msgTypeId, + DirectBuffer buffer, + int index, + int length, + MessageConsumer sender) + { + final BeginFW begin = beginRO.wrap(buffer, index, index + length); + final long originId = begin.originId(); + final long routedId = begin.routedId(); + final long initialId = begin.streamId(); + final long authorization = begin.authorization(); + final long affinity = begin.affinity(); + + final McpKafkaBindingConfig binding = bindings.get(routedId); + + MessageConsumer newStream = null; + + if (binding != null) + { + String tool = null; + + final McpBeginExFW mcpBeginEx = mcpBeginExRO.tryWrap(begin.extension().buffer(), + begin.extension().offset(), begin.extension().limit()); + + if (mcpBeginEx != null) + { + tool = mcpBeginEx.kind().asString(); + } + + final McpKafkaRouteConfig route = binding.resolve(authorization, tool); + + if (route != null) + { + final McpProxy mcpProxy = new McpProxy( + sender, + originId, + routedId, + initialId, + route.id, + affinity, + authorization, + tool); + newStream = mcpProxy::onMcpMessage; + } + } + + return newStream; + } + + private void doBegin( + MessageConsumer receiver, + long originId, + long routedId, + long streamId, + long traceId, + long authorization, + long affinity, + Flyweight extension) + { + final BeginFW begin = beginRW.wrap(writeBuffer, 0, writeBuffer.capacity()) + .originId(originId) + .routedId(routedId) + .streamId(streamId) + .sequence(0) + .acknowledge(0) + .maximum(0) + .traceId(traceId) + .authorization(authorization) + .affinity(affinity) + .extension(extension.buffer(), extension.offset(), extension.sizeof()) + .build(); + + receiver.accept(begin.typeId(), begin.buffer(), begin.offset(), begin.sizeof()); + } + + private void doData( + MessageConsumer receiver, + long originId, + long routedId, + long streamId, + long traceId, + long authorization, + long budgetId, + int flags, + int reserved, + DirectBuffer payload, + int offset, + int length) + { + final DataFW data = dataRW.wrap(writeBuffer, 0, writeBuffer.capacity()) + .originId(originId) + .routedId(routedId) + .streamId(streamId) + .sequence(0) + .acknowledge(0) + .maximum(0) + .traceId(traceId) + .authorization(authorization) + .budgetId(budgetId) + .reserved(reserved) + .flags(flags) + .payload(payload, offset, length) + .build(); + + receiver.accept(data.typeId(), data.buffer(), data.offset(), data.sizeof()); + } + + private void doEnd( + MessageConsumer receiver, + long originId, + long routedId, + long streamId, + long traceId, + long authorization) + { + final EndFW end = endRW.wrap(writeBuffer, 0, writeBuffer.capacity()) + .originId(originId) + .routedId(routedId) + .streamId(streamId) + .sequence(0) + .acknowledge(0) + .maximum(0) + .traceId(traceId) + .authorization(authorization) + .build(); + + receiver.accept(end.typeId(), end.buffer(), end.offset(), end.sizeof()); + } + + private void doAbort( + MessageConsumer receiver, + long originId, + long routedId, + long streamId, + long traceId, + long authorization) + { + final AbortFW abort = abortRW.wrap(writeBuffer, 0, writeBuffer.capacity()) + .originId(originId) + .routedId(routedId) + .streamId(streamId) + .sequence(0) + .acknowledge(0) + .maximum(0) + .traceId(traceId) + .authorization(authorization) + .build(); + + receiver.accept(abort.typeId(), abort.buffer(), abort.offset(), abort.sizeof()); + } + + private void doReset( + MessageConsumer receiver, + long originId, + long routedId, + long streamId, + long traceId, + long authorization) + { + final ResetFW reset = resetRW.wrap(writeBuffer, 0, writeBuffer.capacity()) + .originId(originId) + .routedId(routedId) + .streamId(streamId) + .sequence(0) + .acknowledge(0) + .maximum(0) + .traceId(traceId) + .authorization(authorization) + .build(); + + receiver.accept(reset.typeId(), reset.buffer(), reset.offset(), reset.sizeof()); + } + + private void doWindow( + MessageConsumer receiver, + long originId, + long routedId, + long streamId, + long traceId, + long authorization, + long budgetId, + int credit, + int padding) + { + final WindowFW window = windowRW.wrap(writeBuffer, 0, writeBuffer.capacity()) + .originId(originId) + .routedId(routedId) + .streamId(streamId) + .sequence(0) + .acknowledge(0) + .maximum(credit) + .traceId(traceId) + .authorization(authorization) + .budgetId(budgetId) + .padding(padding) + .build(); + + receiver.accept(window.typeId(), window.buffer(), window.offset(), window.sizeof()); + } + + private KafkaBeginExFW buildKafkaBeginEx( + String tool, + String topic) + { + final KafkaBeginExFW.Builder builder = kafkaBeginExRW.wrap(extBuffer, 0, extBuffer.capacity()) + .typeId(kafkaTypeId); + + switch (tool != null ? tool : "") + { + case TOOL_PRODUCE: + builder.merged(m -> m + .capabilities(c -> c.set(PRODUCE_ONLY)) + .topic(topic != null ? topic : "") + .partitionsItem(p -> p.partitionId(-1).partitionOffset(-2L))); + break; + case TOOL_CONSUME: + builder.merged(m -> m + .capabilities(c -> c.set(FETCH_ONLY)) + .topic(topic != null ? topic : "") + .partitionsItem(p -> p.partitionId(-1).partitionOffset(-2L))); + break; + case TOOL_LIST_TOPICS: + case TOOL_DESCRIBE_TOPIC: + builder.meta(m -> m + .topic(topic != null ? topic : "")); + break; + case TOOL_CREATE_TOPIC: + builder.request(r -> r.createTopics(ct -> ct + .topicsItem(t -> t + .name(topic != null ? topic : "") + .numPartitions(1) + .replicationFactor((short) 1)))); + break; + case TOOL_DELETE_TOPIC: + builder.request(r -> r.deleteTopics(dt -> dt + .topicsItem(t -> t.name(topic != null ? topic : "")))); + break; + case TOOL_LIST_CONSUMER_GROUPS: + builder.request(r -> r.listGroups(lg -> {})); + break; + case TOOL_DESCRIBE_CONSUMER_GROUP: + builder.request(r -> r.describeGroups(dg -> dg + .groupsItem(g -> g.value(topic != null ? topic : "")))); + break; + case TOOL_RESET_OFFSETS: + builder.request(r -> r.alterConsumerGroupOffsets(aco -> aco + .groupId(topic != null ? topic : ""))); + break; + case TOOL_LIST_BROKERS: + case TOOL_DESCRIBE_CLUSTER: + case TOOL_CLUSTER_OVERVIEW: + builder.request(r -> r.describeCluster(dc -> {})); + break; + case TOOL_DESCRIBE_CONFIGS: + builder.describe(d -> d + .name(topic != null ? topic : "")); + break; + case TOOL_ALTER_CONFIGS: + builder.request(r -> r.alterConfigs(ac -> ac + .resourcesItem(res -> res + .type((byte) 2) + .name(topic != null ? topic : "")))); + break; + default: + builder.merged(m -> m + .capabilities(c -> c.set(PRODUCE_ONLY)) + .topic(topic != null ? topic : "") + .partitionsItem(p -> p.partitionId(-1).partitionOffset(-2L))); + break; + } + + return builder.build(); + } + + private final class McpProxy + { + private final MessageConsumer mcp; + private final long originId; + private final long routedId; + private final long initialId; + private final long replyId; + private final long resolvedId; + private final long affinity; + private final long authorization; + private final String tool; + + private KafkaProxy kafka; + private int state; + + private McpProxy( + MessageConsumer mcp, + long originId, + long routedId, + long initialId, + long resolvedId, + long affinity, + long authorization, + String tool) + { + this.mcp = mcp; + this.originId = originId; + this.routedId = routedId; + this.initialId = initialId; + this.replyId = supplyReplyId.applyAsLong(initialId); + this.resolvedId = resolvedId; + this.affinity = affinity; + this.authorization = authorization; + this.tool = tool; + } + + private void onMcpMessage( + int msgTypeId, + DirectBuffer buffer, + int index, + int length) + { + switch (msgTypeId) + { + case BeginFW.TYPE_ID: + final BeginFW begin = beginRO.wrap(buffer, index, index + length); + onMcpBegin(begin); + break; + case DataFW.TYPE_ID: + final DataFW data = dataRO.wrap(buffer, index, index + length); + onMcpData(data); + break; + case EndFW.TYPE_ID: + final EndFW end = endRO.wrap(buffer, index, index + length); + onMcpEnd(end); + break; + case AbortFW.TYPE_ID: + final AbortFW abort = abortRO.wrap(buffer, index, index + length); + onMcpAbort(abort); + break; + case WindowFW.TYPE_ID: + final WindowFW window = windowRO.wrap(buffer, index, index + length); + onMcpWindow(window); + break; + case ResetFW.TYPE_ID: + final ResetFW reset = resetRO.wrap(buffer, index, index + length); + onMcpReset(reset); + break; + default: + break; + } + } + + private void onMcpBegin( + BeginFW begin) + { + final long traceId = begin.traceId(); + + state = McpKafkaState.openingInitial(state); + + final KafkaBeginExFW kafkaBeginEx = buildKafkaBeginEx(tool, null); + + kafka = new KafkaProxy(this, originId, resolvedId, affinity, authorization); + kafka.doKafkaBegin(traceId, kafkaBeginEx); + + doMcpWindow(traceId, 0, writeBuffer.capacity(), 0); + } + + private void onMcpData( + DataFW data) + { + final long traceId = data.traceId(); + final long budgetId = data.budgetId(); + final int flags = data.flags(); + final int reserved = data.reserved(); + final DirectBuffer payload = data.payload(); + + if (kafka != null && payload != null) + { + kafka.doKafkaData(traceId, budgetId, flags, reserved, payload, 0, payload.capacity()); + } + } + + private void onMcpEnd( + EndFW end) + { + final long traceId = end.traceId(); + + state = McpKafkaState.closedInitial(state); + + if (kafka != null) + { + kafka.doKafkaEnd(traceId); + } + } + + private void onMcpAbort( + AbortFW abort) + { + final long traceId = abort.traceId(); + + state = McpKafkaState.closedInitial(state); + + if (kafka != null) + { + kafka.doKafkaAbort(traceId); + } + } + + private void onMcpWindow( + WindowFW window) + { + final long traceId = window.traceId(); + final long budgetId = window.budgetId(); + final int credit = window.maximum(); + final int padding = window.padding(); + + if (kafka != null) + { + kafka.doKafkaWindow(traceId, budgetId, credit, padding); + } + } + + private void onMcpReset( + ResetFW reset) + { + final long traceId = reset.traceId(); + + state = McpKafkaState.closedReply(state); + + if (kafka != null) + { + kafka.doKafkaReset(traceId); + } + } + + private void doMcpBegin( + long traceId) + { + if (!McpKafkaState.initialClosed(state)) + { + state = McpKafkaState.openedReply(state); + doBegin(mcp, originId, routedId, replyId, traceId, authorization, affinity, emptyRO); + } + } + + private void doMcpData( + long traceId, + long budgetId, + int flags, + int reserved, + DirectBuffer payload, + int offset, + int length) + { + doData(mcp, originId, routedId, replyId, traceId, authorization, + budgetId, flags, reserved, payload, offset, length); + } + + private void doMcpEnd( + long traceId) + { + if (!McpKafkaState.replyClosed(state)) + { + state = McpKafkaState.closedReply(state); + doEnd(mcp, originId, routedId, replyId, traceId, authorization); + } + } + + private void doMcpAbort( + long traceId) + { + if (!McpKafkaState.replyClosed(state)) + { + state = McpKafkaState.closedReply(state); + doAbort(mcp, originId, routedId, replyId, traceId, authorization); + } + } + + private void doMcpWindow( + long traceId, + long budgetId, + int credit, + int padding) + { + doWindow(mcp, originId, routedId, replyId, traceId, authorization, budgetId, credit, padding); + } + + private void doMcpReset( + long traceId) + { + if (!McpKafkaState.initialClosed(state)) + { + state = McpKafkaState.closedInitial(state); + doReset(mcp, originId, routedId, replyId, traceId, authorization); + } + } + } + + private final class KafkaProxy + { + private final McpProxy peer; + private final long originId; + private final long resolvedId; + private final long affinity; + private final long authorization; + private final long kafkaInitialId; + private final long kafkaReplyId; + private final MessageConsumer kafka; + + private int state; + + private KafkaProxy( + McpProxy peer, + long originId, + long resolvedId, + long affinity, + long authorization) + { + this.peer = peer; + this.originId = originId; + this.resolvedId = resolvedId; + this.affinity = affinity; + this.authorization = authorization; + this.kafkaInitialId = supplyInitialId.applyAsLong(resolvedId); + this.kafkaReplyId = supplyReplyId.applyAsLong(kafkaInitialId); + this.kafka = streamFactory.newStream(BeginFW.TYPE_ID, writeBuffer, 0, 0, this::onKafkaMessage); + } + + private void onKafkaMessage( + int msgTypeId, + DirectBuffer buffer, + int index, + int length) + { + switch (msgTypeId) + { + case BeginFW.TYPE_ID: + final BeginFW begin = beginRO.wrap(buffer, index, index + length); + onKafkaBegin(begin); + break; + case DataFW.TYPE_ID: + final DataFW data = dataRO.wrap(buffer, index, index + length); + onKafkaData(data); + break; + case EndFW.TYPE_ID: + final EndFW end = endRO.wrap(buffer, index, index + length); + onKafkaEnd(end); + break; + case AbortFW.TYPE_ID: + final AbortFW abort = abortRO.wrap(buffer, index, index + length); + onKafkaAbort(abort); + break; + case WindowFW.TYPE_ID: + final WindowFW window = windowRO.wrap(buffer, index, index + length); + onKafkaWindow(window); + break; + case ResetFW.TYPE_ID: + final ResetFW reset = resetRO.wrap(buffer, index, index + length); + onKafkaReset(reset); + break; + default: + break; + } + } + + private void onKafkaBegin( + BeginFW begin) + { + final long traceId = begin.traceId(); + + state = McpKafkaState.openedReply(state); + + peer.doMcpBegin(traceId); + doKafkaWindow(traceId, 0, writeBuffer.capacity(), 0); + } + + private void onKafkaData( + DataFW data) + { + final long traceId = data.traceId(); + final long budgetId = data.budgetId(); + final int flags = data.flags(); + final int reserved = data.reserved(); + final DirectBuffer payload = data.payload(); + + if (payload != null) + { + peer.doMcpData(traceId, budgetId, flags, reserved, payload, 0, payload.capacity()); + } + } + + private void onKafkaEnd( + EndFW end) + { + final long traceId = end.traceId(); + + state = McpKafkaState.closedReply(state); + + peer.doMcpEnd(traceId); + } + + private void onKafkaAbort( + AbortFW abort) + { + final long traceId = abort.traceId(); + + state = McpKafkaState.closedReply(state); + + peer.doMcpAbort(traceId); + } + + private void onKafkaWindow( + WindowFW window) + { + final long traceId = window.traceId(); + final long budgetId = window.budgetId(); + final int credit = window.maximum(); + final int padding = window.padding(); + + peer.doMcpWindow(traceId, budgetId, credit, padding); + } + + private void onKafkaReset( + ResetFW reset) + { + final long traceId = reset.traceId(); + + state = McpKafkaState.closedInitial(state); + + peer.doMcpReset(traceId); + } + + private void doKafkaBegin( + long traceId, + KafkaBeginExFW extension) + { + if (kafka != null) + { + state = McpKafkaState.openingInitial(state); + doBegin(kafka, originId, resolvedId, kafkaInitialId, traceId, authorization, affinity, extension); + } + } + + private void doKafkaData( + long traceId, + long budgetId, + int flags, + int reserved, + DirectBuffer payload, + int offset, + int length) + { + if (kafka != null) + { + doData(kafka, originId, resolvedId, kafkaInitialId, traceId, authorization, + budgetId, flags, reserved, payload, offset, length); + } + } + + private void doKafkaEnd( + long traceId) + { + if (kafka != null && !McpKafkaState.initialClosed(state)) + { + state = McpKafkaState.closedInitial(state); + doEnd(kafka, originId, resolvedId, kafkaInitialId, traceId, authorization); + } + } + + private void doKafkaAbort( + long traceId) + { + if (kafka != null && !McpKafkaState.initialClosed(state)) + { + state = McpKafkaState.closedInitial(state); + doAbort(kafka, originId, resolvedId, kafkaInitialId, traceId, authorization); + } + } + + private void doKafkaWindow( + long traceId, + long budgetId, + int credit, + int padding) + { + if (kafka != null) + { + doWindow(kafka, originId, resolvedId, kafkaReplyId, traceId, authorization, budgetId, credit, padding); + } + } + + private void doKafkaReset( + long traceId) + { + if (kafka != null && !McpKafkaState.replyClosed(state)) + { + state = McpKafkaState.closedReply(state); + doReset(kafka, originId, resolvedId, kafkaReplyId, traceId, authorization); + } + } + } +} diff --git a/runtime/binding-mcp-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mcp/kafka/internal/stream/McpKafkaState.java b/runtime/binding-mcp-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mcp/kafka/internal/stream/McpKafkaState.java new file mode 100644 index 0000000000..c105a74804 --- /dev/null +++ b/runtime/binding-mcp-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mcp/kafka/internal/stream/McpKafkaState.java @@ -0,0 +1,92 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.mcp.kafka.internal.stream; + +final class McpKafkaState +{ + private static final int INITIAL_OPENING = 0x01; + private static final int INITIAL_OPENED = 0x02; + private static final int INITIAL_CLOSING = 0x04; + private static final int INITIAL_CLOSED = 0x08; + private static final int REPLY_OPENING = 0x10; + private static final int REPLY_OPENED = 0x20; + private static final int REPLY_CLOSING = 0x40; + private static final int REPLY_CLOSED = 0x80; + + static int openingInitial( + int state) + { + return state | INITIAL_OPENING; + } + + static int openedInitial( + int state) + { + return state | INITIAL_OPENED; + } + + static int closingInitial( + int state) + { + return state | INITIAL_CLOSING; + } + + static int closedInitial( + int state) + { + return state | INITIAL_CLOSED; + } + + static boolean initialClosed( + int state) + { + return (state & INITIAL_CLOSED) != 0; + } + + static int openingReply( + int state) + { + return state | REPLY_OPENING; + } + + static int openedReply( + int state) + { + return state | REPLY_OPENED; + } + + static int closingReply( + int state) + { + return state | REPLY_CLOSING; + } + + static int closedReply( + int state) + { + return state | REPLY_CLOSED; + } + + static boolean replyClosed( + int state) + { + return (state & REPLY_CLOSED) != 0; + } + + private McpKafkaState() + { + // utility + } +} diff --git a/runtime/binding-mcp-kafka/src/main/moditect/module-info.java b/runtime/binding-mcp-kafka/src/main/moditect/module-info.java new file mode 100644 index 0000000000..a5b61380e6 --- /dev/null +++ b/runtime/binding-mcp-kafka/src/main/moditect/module-info.java @@ -0,0 +1,28 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +module io.aklivity.zilla.runtime.binding.mcp.kafka +{ + requires io.aklivity.zilla.runtime.engine; + requires io.aklivity.zilla.runtime.binding.mcp; + requires io.aklivity.zilla.runtime.binding.kafka; + + exports io.aklivity.zilla.runtime.binding.mcp.kafka.config; + + provides io.aklivity.zilla.runtime.engine.binding.BindingFactorySpi + with io.aklivity.zilla.runtime.binding.mcp.kafka.internal.McpKafkaBindingFactorySpi; + + provides io.aklivity.zilla.runtime.engine.config.ConditionConfigAdapterSpi + with io.aklivity.zilla.runtime.binding.mcp.kafka.internal.config.McpKafkaConditionConfigAdapter; +} diff --git a/runtime/binding-mcp-kafka/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.binding.BindingFactorySpi b/runtime/binding-mcp-kafka/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.binding.BindingFactorySpi new file mode 100644 index 0000000000..8ba200c588 --- /dev/null +++ b/runtime/binding-mcp-kafka/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.binding.BindingFactorySpi @@ -0,0 +1 @@ +io.aklivity.zilla.runtime.binding.mcp.kafka.internal.McpKafkaBindingFactorySpi diff --git a/runtime/binding-mcp-kafka/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.config.ConditionConfigAdapterSpi b/runtime/binding-mcp-kafka/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.config.ConditionConfigAdapterSpi new file mode 100644 index 0000000000..a950c691df --- /dev/null +++ b/runtime/binding-mcp-kafka/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.config.ConditionConfigAdapterSpi @@ -0,0 +1 @@ +io.aklivity.zilla.runtime.binding.mcp.kafka.internal.config.McpKafkaConditionConfigAdapter diff --git a/runtime/binding-mcp/pom.xml b/runtime/binding-mcp/pom.xml new file mode 100644 index 0000000000..e3eb9d993b --- /dev/null +++ b/runtime/binding-mcp/pom.xml @@ -0,0 +1,206 @@ + + + + 4.0.0 + + io.aklivity.zilla + runtime + develop-SNAPSHOT + ../pom.xml + + + binding-mcp + zilla::runtime::binding-mcp + + + + Aklivity Community License Agreement + https://www.aklivity.io/aklivity-community-license/ + repo + + + + + 0.00 + 500 + + + + + ${project.groupId} + binding-mcp.spec + ${project.version} + provided + + + ${project.groupId} + engine + ${project.version} + provided + + + ${project.groupId} + engine + test-jar + ${project.version} + test + + + junit + junit + test + + + org.hamcrest + hamcrest + test + + + org.mockito + mockito-core + test + + + io.aklivity.k3po + control-junit + test + + + io.aklivity.k3po + lang + test + + + + + + + org.jasig.maven + maven-notice-plugin + + + ${project.groupId} + flyweight-maven-plugin + ${project.version} + + core internal mcp + io.aklivity.zilla.runtime.binding.mcp.internal.types + + + + + generate + + + + + + com.mycila + license-maven-plugin + + + maven-checkstyle-plugin + + + maven-dependency-plugin + + + process-resources + + unpack + + + + + ${project.groupId} + binding-mcp.spec + + + ^\Qio/aklivity/zilla/specs/binding/mcp/\E + io/aklivity/zilla/runtime/binding/mcp/internal/ + + + + + io/aklivity/zilla/specs/binding/mcp/schema/mcp.schema.patch.json + ${project.build.directory}/classes + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + org.apache.maven.plugins + maven-surefire-plugin + + + org.moditect + moditect-maven-plugin + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + + + io.aklivity.k3po + k3po-maven-plugin + + + ${project.groupId} + engine + ${project.version} + test-jar + + + ${project.groupId} + engine + ${project.version} + + + + + org.apache.maven.plugins + maven-failsafe-plugin + + + org.jacoco + jacoco-maven-plugin + + + io/aklivity/zilla/runtime/binding/mcp/internal/types/**/*.class + + + + BUNDLE + + + INSTRUCTION + COVEREDRATIO + ${jacoco.coverage.ratio} + + + CLASS + MISSEDCOUNT + ${jacoco.missed.count} + + + + + + + + + diff --git a/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/config/McpConditionConfig.java b/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/config/McpConditionConfig.java new file mode 100644 index 0000000000..7dd0258e18 --- /dev/null +++ b/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/config/McpConditionConfig.java @@ -0,0 +1,41 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.mcp.config; + +import java.util.function.Function; + +import io.aklivity.zilla.runtime.engine.config.ConditionConfig; + +public final class McpConditionConfig extends ConditionConfig +{ + public final String kind; + + public McpConditionConfig( + String kind) + { + this.kind = kind; + } + + public static McpConditionConfigBuilder builder() + { + return new McpConditionConfigBuilder<>(McpConditionConfig.class::cast); + } + + public static McpConditionConfigBuilder builder( + Function mapper) + { + return new McpConditionConfigBuilder<>(mapper); + } +} diff --git a/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/config/McpConditionConfigBuilder.java b/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/config/McpConditionConfigBuilder.java new file mode 100644 index 0000000000..ef3cff5459 --- /dev/null +++ b/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/config/McpConditionConfigBuilder.java @@ -0,0 +1,53 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.mcp.config; + +import java.util.function.Function; + +import io.aklivity.zilla.runtime.engine.config.ConditionConfig; +import io.aklivity.zilla.runtime.engine.config.ConfigBuilder; + +public final class McpConditionConfigBuilder extends ConfigBuilder> +{ + private final Function mapper; + + private String kind; + + public McpConditionConfigBuilder( + Function mapper) + { + this.mapper = mapper; + } + + public McpConditionConfigBuilder kind( + String kind) + { + this.kind = kind; + return this; + } + + @Override + @SuppressWarnings("unchecked") + protected Class> thisType() + { + return (Class>) getClass(); + } + + @Override + public T build() + { + return mapper.apply(new McpConditionConfig(kind)); + } +} diff --git a/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/config/McpOptionsConfig.java b/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/config/McpOptionsConfig.java new file mode 100644 index 0000000000..c0c187fb19 --- /dev/null +++ b/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/config/McpOptionsConfig.java @@ -0,0 +1,42 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.mcp.config; + +import java.util.List; +import java.util.function.Function; + +import io.aklivity.zilla.runtime.engine.config.OptionsConfig; + +public final class McpOptionsConfig extends OptionsConfig +{ + public final List prompts; + + public McpOptionsConfig( + List prompts) + { + this.prompts = prompts; + } + + public static McpOptionsConfigBuilder builder() + { + return new McpOptionsConfigBuilder<>(McpOptionsConfig.class::cast); + } + + public static McpOptionsConfigBuilder builder( + Function mapper) + { + return new McpOptionsConfigBuilder<>(mapper); + } +} diff --git a/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/config/McpOptionsConfigBuilder.java b/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/config/McpOptionsConfigBuilder.java new file mode 100644 index 0000000000..b5c8fbd41b --- /dev/null +++ b/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/config/McpOptionsConfigBuilder.java @@ -0,0 +1,60 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.mcp.config; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; + +import io.aklivity.zilla.runtime.engine.config.ConfigBuilder; +import io.aklivity.zilla.runtime.engine.config.OptionsConfig; + +public final class McpOptionsConfigBuilder extends ConfigBuilder> +{ + private final Function mapper; + + private List prompts; + + public McpOptionsConfigBuilder( + Function mapper) + { + this.mapper = mapper; + } + + public McpOptionsConfigBuilder prompt( + String name, + String description) + { + if (prompts == null) + { + prompts = new ArrayList<>(); + } + prompts.add(new McpPromptConfig(name, description)); + return this; + } + + @Override + @SuppressWarnings("unchecked") + protected Class> thisType() + { + return (Class>) getClass(); + } + + @Override + public T build() + { + return mapper.apply(new McpOptionsConfig(prompts)); + } +} diff --git a/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/config/McpPromptConfig.java b/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/config/McpPromptConfig.java new file mode 100644 index 0000000000..a241fad79e --- /dev/null +++ b/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/config/McpPromptConfig.java @@ -0,0 +1,29 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.mcp.config; + +public final class McpPromptConfig +{ + public final String name; + public final String description; + + public McpPromptConfig( + String name, + String description) + { + this.name = name; + this.description = description; + } +} diff --git a/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/McpBinding.java b/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/McpBinding.java new file mode 100644 index 0000000000..b7d38618ad --- /dev/null +++ b/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/McpBinding.java @@ -0,0 +1,52 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.mcp.internal; + +import java.net.URL; + +import io.aklivity.zilla.runtime.engine.EngineContext; +import io.aklivity.zilla.runtime.engine.binding.Binding; + +public final class McpBinding implements Binding +{ + public static final String NAME = "mcp"; + + private final McpConfiguration config; + + McpBinding( + McpConfiguration config) + { + this.config = config; + } + + @Override + public String name() + { + return McpBinding.NAME; + } + + @Override + public URL type() + { + return getClass().getResource("schema/mcp.schema.patch.json"); + } + + @Override + public McpBindingContext supply( + EngineContext context) + { + return new McpBindingContext(config, context); + } +} diff --git a/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/McpBindingContext.java b/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/McpBindingContext.java new file mode 100644 index 0000000000..b406fee3dd --- /dev/null +++ b/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/McpBindingContext.java @@ -0,0 +1,72 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.mcp.internal; + +import static io.aklivity.zilla.runtime.engine.config.KindConfig.SERVER; +import static java.util.Collections.singletonMap; + +import java.util.Map; + +import io.aklivity.zilla.runtime.binding.mcp.internal.stream.McpServerFactory; +import io.aklivity.zilla.runtime.binding.mcp.internal.stream.McpStreamFactory; +import io.aklivity.zilla.runtime.engine.EngineContext; +import io.aklivity.zilla.runtime.engine.binding.BindingContext; +import io.aklivity.zilla.runtime.engine.binding.BindingHandler; +import io.aklivity.zilla.runtime.engine.config.BindingConfig; +import io.aklivity.zilla.runtime.engine.config.KindConfig; + +final class McpBindingContext implements BindingContext +{ + private final Map factories; + + McpBindingContext( + McpConfiguration config, + EngineContext context) + { + this.factories = singletonMap(SERVER, new McpServerFactory(config, context)); + } + + @Override + public BindingHandler attach( + BindingConfig binding) + { + final McpStreamFactory factory = factories.get(binding.kind); + + if (factory != null) + { + factory.attach(binding); + } + + return factory; + } + + @Override + public void detach( + BindingConfig binding) + { + final McpStreamFactory factory = factories.get(binding.kind); + + if (factory != null) + { + factory.detach(binding.id); + } + } + + @Override + public String toString() + { + return String.format("%s %s", getClass().getSimpleName(), factories); + } +} diff --git a/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/McpBindingFactorySpi.java b/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/McpBindingFactorySpi.java new file mode 100644 index 0000000000..806488d52d --- /dev/null +++ b/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/McpBindingFactorySpi.java @@ -0,0 +1,34 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.mcp.internal; + +import io.aklivity.zilla.runtime.engine.Configuration; +import io.aklivity.zilla.runtime.engine.binding.BindingFactorySpi; + +public final class McpBindingFactorySpi implements BindingFactorySpi +{ + @Override + public String type() + { + return McpBinding.NAME; + } + + @Override + public McpBinding create( + Configuration config) + { + return new McpBinding(new McpConfiguration(config)); + } +} diff --git a/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/McpConfiguration.java b/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/McpConfiguration.java new file mode 100644 index 0000000000..728c74afef --- /dev/null +++ b/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/McpConfiguration.java @@ -0,0 +1,34 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.mcp.internal; + +import io.aklivity.zilla.runtime.engine.Configuration; + +public class McpConfiguration extends Configuration +{ + private static final ConfigurationDef MCP_CONFIG; + + static + { + final ConfigurationDef config = new ConfigurationDef("zilla.binding.mcp"); + MCP_CONFIG = config; + } + + public McpConfiguration( + Configuration config) + { + super(MCP_CONFIG, config); + } +} diff --git a/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/config/McpBindingConfig.java b/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/config/McpBindingConfig.java new file mode 100644 index 0000000000..2e356802b6 --- /dev/null +++ b/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/config/McpBindingConfig.java @@ -0,0 +1,55 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.mcp.internal.config; + +import java.util.List; +import java.util.stream.Collectors; + +import io.aklivity.zilla.runtime.binding.mcp.config.McpOptionsConfig; +import io.aklivity.zilla.runtime.engine.config.BindingConfig; + +public final class McpBindingConfig +{ + public final long id; + public final McpOptionsConfig options; + private final List routes; + + public McpBindingConfig( + BindingConfig binding) + { + this.id = binding.id; + this.options = (McpOptionsConfig) binding.options; + this.routes = binding.routes.stream() + .map(McpRouteConfig::new) + .collect(Collectors.toList()); + } + + public long resolveRoute( + long authorization) + { + long resolvedId = -1L; + + for (McpRouteConfig route : routes) + { + if (route.authorized(authorization)) + { + resolvedId = route.id; + break; + } + } + + return resolvedId; + } +} diff --git a/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/config/McpConditionConfigAdapter.java b/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/config/McpConditionConfigAdapter.java new file mode 100644 index 0000000000..f43fcf499d --- /dev/null +++ b/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/config/McpConditionConfigAdapter.java @@ -0,0 +1,65 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.mcp.internal.config; + +import jakarta.json.Json; +import jakarta.json.JsonObject; +import jakarta.json.JsonObjectBuilder; +import jakarta.json.bind.adapter.JsonbAdapter; + +import io.aklivity.zilla.runtime.binding.mcp.config.McpConditionConfig; +import io.aklivity.zilla.runtime.binding.mcp.internal.McpBinding; +import io.aklivity.zilla.runtime.engine.config.ConditionConfig; +import io.aklivity.zilla.runtime.engine.config.ConditionConfigAdapterSpi; + +public final class McpConditionConfigAdapter implements ConditionConfigAdapterSpi, JsonbAdapter +{ + private static final String KIND_NAME = "kind"; + + @Override + public String type() + { + return McpBinding.NAME; + } + + @Override + public JsonObject adaptToJson( + ConditionConfig condition) + { + McpConditionConfig mcpCondition = (McpConditionConfig) condition; + + JsonObjectBuilder object = Json.createObjectBuilder(); + + if (mcpCondition.kind != null) + { + object.add(KIND_NAME, mcpCondition.kind); + } + + return object.build(); + } + + @Override + public ConditionConfig adaptFromJson( + JsonObject object) + { + String kind = object.containsKey(KIND_NAME) + ? object.getString(KIND_NAME) + : null; + + return McpConditionConfig.builder() + .kind(kind) + .build(); + } +} diff --git a/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/config/McpOptionsConfigAdapter.java b/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/config/McpOptionsConfigAdapter.java new file mode 100644 index 0000000000..d1dfce187e --- /dev/null +++ b/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/config/McpOptionsConfigAdapter.java @@ -0,0 +1,98 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.mcp.internal.config; + +import jakarta.json.Json; +import jakarta.json.JsonArray; +import jakarta.json.JsonArrayBuilder; +import jakarta.json.JsonObject; +import jakarta.json.JsonObjectBuilder; +import jakarta.json.bind.adapter.JsonbAdapter; + +import io.aklivity.zilla.runtime.binding.mcp.config.McpOptionsConfig; +import io.aklivity.zilla.runtime.binding.mcp.config.McpOptionsConfigBuilder; +import io.aklivity.zilla.runtime.binding.mcp.config.McpPromptConfig; +import io.aklivity.zilla.runtime.binding.mcp.internal.McpBinding; +import io.aklivity.zilla.runtime.engine.config.OptionsConfig; +import io.aklivity.zilla.runtime.engine.config.OptionsConfigAdapterSpi; + +public final class McpOptionsConfigAdapter implements OptionsConfigAdapterSpi, JsonbAdapter +{ + private static final String PROMPTS_NAME = "prompts"; + private static final String PROMPT_NAME_NAME = "name"; + private static final String PROMPT_DESCRIPTION_NAME = "description"; + + @Override + public Kind kind() + { + return Kind.BINDING; + } + + @Override + public String type() + { + return McpBinding.NAME; + } + + @Override + public JsonObject adaptToJson( + OptionsConfig options) + { + McpOptionsConfig mcpOptions = (McpOptionsConfig) options; + + JsonObjectBuilder object = Json.createObjectBuilder(); + + if (mcpOptions.prompts != null && !mcpOptions.prompts.isEmpty()) + { + JsonArrayBuilder prompts = Json.createArrayBuilder(); + for (McpPromptConfig prompt : mcpOptions.prompts) + { + JsonObjectBuilder promptObject = Json.createObjectBuilder(); + promptObject.add(PROMPT_NAME_NAME, prompt.name); + if (prompt.description != null) + { + promptObject.add(PROMPT_DESCRIPTION_NAME, prompt.description); + } + prompts.add(promptObject); + } + object.add(PROMPTS_NAME, prompts); + } + + return object.build(); + } + + @Override + public OptionsConfig adaptFromJson( + JsonObject object) + { + McpOptionsConfigBuilder builder = McpOptionsConfig.builder(); + + if (object.containsKey(PROMPTS_NAME)) + { + JsonArray prompts = object.getJsonArray(PROMPTS_NAME); + for (int i = 0; i < prompts.size(); i++) + { + JsonObject prompt = prompts.getJsonObject(i); + String name = prompt.getString(PROMPT_NAME_NAME); + String description = prompt.containsKey(PROMPT_DESCRIPTION_NAME) + ? prompt.getString(PROMPT_DESCRIPTION_NAME) + : null; + builder.prompt(name, description); + } + } + + return builder.build(); + } +} diff --git a/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/config/McpRouteConfig.java b/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/config/McpRouteConfig.java new file mode 100644 index 0000000000..dc679bae0c --- /dev/null +++ b/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/config/McpRouteConfig.java @@ -0,0 +1,49 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.mcp.internal.config; + +import java.util.List; +import java.util.stream.Collectors; + +import io.aklivity.zilla.runtime.binding.mcp.config.McpConditionConfig; +import io.aklivity.zilla.runtime.engine.config.RouteConfig; + +public final class McpRouteConfig +{ + public final long id; + private final List when; + + public McpRouteConfig( + RouteConfig route) + { + this.id = route.id; + this.when = route.when.stream() + .map(McpConditionConfig.class::cast) + .collect(Collectors.toList()); + } + + public boolean authorized( + long authorization) + { + return true; + } + + public boolean matches( + String kind) + { + return when.isEmpty() || + when.stream().anyMatch(c -> c.kind == null || c.kind.equals(kind)); + } +} diff --git a/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/stream/McpServerFactory.java b/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/stream/McpServerFactory.java new file mode 100644 index 0000000000..e81092a0f4 --- /dev/null +++ b/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/stream/McpServerFactory.java @@ -0,0 +1,548 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.mcp.internal.stream; + +import java.util.function.LongUnaryOperator; + +import org.agrona.DirectBuffer; +import org.agrona.MutableDirectBuffer; +import org.agrona.collections.Long2ObjectHashMap; +import org.agrona.concurrent.UnsafeBuffer; + +import io.aklivity.zilla.runtime.binding.mcp.internal.McpConfiguration; +import io.aklivity.zilla.runtime.binding.mcp.internal.config.McpBindingConfig; +import io.aklivity.zilla.runtime.binding.mcp.internal.types.stream.AbortFW; +import io.aklivity.zilla.runtime.binding.mcp.internal.types.stream.BeginFW; +import io.aklivity.zilla.runtime.binding.mcp.internal.types.stream.DataFW; +import io.aklivity.zilla.runtime.binding.mcp.internal.types.stream.EndFW; +import io.aklivity.zilla.runtime.binding.mcp.internal.types.stream.FlushFW; +import io.aklivity.zilla.runtime.binding.mcp.internal.types.stream.McpBeginExFW; +import io.aklivity.zilla.runtime.binding.mcp.internal.types.stream.ResetFW; +import io.aklivity.zilla.runtime.binding.mcp.internal.types.stream.WindowFW; +import io.aklivity.zilla.runtime.engine.EngineContext; +import io.aklivity.zilla.runtime.engine.binding.BindingHandler; +import io.aklivity.zilla.runtime.engine.binding.function.MessageConsumer; +import io.aklivity.zilla.runtime.engine.config.BindingConfig; + +public final class McpServerFactory implements McpStreamFactory +{ + private static final String MCP_TYPE_NAME = "mcp"; + + private final BeginFW beginRO = new BeginFW(); + private final DataFW dataRO = new DataFW(); + private final EndFW endRO = new EndFW(); + private final AbortFW abortRO = new AbortFW(); + private final FlushFW flushRO = new FlushFW(); + private final WindowFW windowRO = new WindowFW(); + private final ResetFW resetRO = new ResetFW(); + + private final BeginFW.Builder beginRW = new BeginFW.Builder(); + private final DataFW.Builder dataRW = new DataFW.Builder(); + private final EndFW.Builder endRW = new EndFW.Builder(); + private final AbortFW.Builder abortRW = new AbortFW.Builder(); + private final WindowFW.Builder windowRW = new WindowFW.Builder(); + private final ResetFW.Builder resetRW = new ResetFW.Builder(); + + private final McpBeginExFW mcpBeginExRO = new McpBeginExFW(); + + private final MutableDirectBuffer writeBuffer; + private final BindingHandler streamFactory; + private final LongUnaryOperator supplyInitialId; + private final LongUnaryOperator supplyReplyId; + private final int mcpTypeId; + + private final Long2ObjectHashMap bindings; + + public McpServerFactory( + McpConfiguration config, + EngineContext context) + { + this.writeBuffer = context.writeBuffer(); + this.streamFactory = context.streamFactory(); + this.supplyInitialId = context::supplyInitialId; + this.supplyReplyId = context::supplyReplyId; + this.bindings = new Long2ObjectHashMap<>(); + this.mcpTypeId = context.supplyTypeId(MCP_TYPE_NAME); + } + + @Override + public int originTypeId() + { + return mcpTypeId; + } + + @Override + public void attach( + BindingConfig binding) + { + McpBindingConfig newBinding = new McpBindingConfig(binding); + bindings.put(binding.id, newBinding); + } + + @Override + public void detach( + long bindingId) + { + bindings.remove(bindingId); + } + + @Override + public MessageConsumer newStream( + int msgTypeId, + DirectBuffer buffer, + int index, + int length, + MessageConsumer sender) + { + final BeginFW begin = beginRO.wrap(buffer, index, index + length); + final long originId = begin.originId(); + final long routedId = begin.routedId(); + final long initialId = begin.streamId(); + final long affinity = begin.affinity(); + final long authorization = begin.authorization(); + + final McpBindingConfig binding = bindings.get(routedId); + + MessageConsumer newStream = null; + + if (binding != null) + { + final long resolvedId = binding.resolveRoute(authorization); + + if (resolvedId != -1L) + { + newStream = new McpServerStream( + sender, + originId, + routedId, + initialId, + resolvedId, + affinity, + authorization)::onMessage; + } + } + + return newStream; + } + + private void doBegin( + MessageConsumer receiver, + long originId, + long routedId, + long streamId, + long traceId, + long authorization, + long affinity) + { + final BeginFW begin = beginRW.wrap(writeBuffer, 0, writeBuffer.capacity()) + .originId(originId) + .routedId(routedId) + .streamId(streamId) + .sequence(0) + .acknowledge(0) + .maximum(0) + .traceId(traceId) + .authorization(authorization) + .affinity(affinity) + .build(); + + receiver.accept(begin.typeId(), begin.buffer(), begin.offset(), begin.sizeof()); + } + + private void doData( + MessageConsumer receiver, + long originId, + long routedId, + long streamId, + long traceId, + long authorization, + long budgetId, + int flags, + int reserved, + DirectBuffer payload, + int offset, + int length) + { + final DataFW data = dataRW.wrap(writeBuffer, 0, writeBuffer.capacity()) + .originId(originId) + .routedId(routedId) + .streamId(streamId) + .sequence(0) + .acknowledge(0) + .maximum(0) + .traceId(traceId) + .authorization(authorization) + .budgetId(budgetId) + .reserved(reserved) + .flags(flags) + .payload(payload, offset, length) + .build(); + + receiver.accept(data.typeId(), data.buffer(), data.offset(), data.sizeof()); + } + + private void doEnd( + MessageConsumer receiver, + long originId, + long routedId, + long streamId, + long traceId, + long authorization) + { + final EndFW end = endRW.wrap(writeBuffer, 0, writeBuffer.capacity()) + .originId(originId) + .routedId(routedId) + .streamId(streamId) + .sequence(0) + .acknowledge(0) + .maximum(0) + .traceId(traceId) + .authorization(authorization) + .build(); + + receiver.accept(end.typeId(), end.buffer(), end.offset(), end.sizeof()); + } + + private void doAbort( + MessageConsumer receiver, + long originId, + long routedId, + long streamId, + long traceId, + long authorization) + { + final AbortFW abort = abortRW.wrap(writeBuffer, 0, writeBuffer.capacity()) + .originId(originId) + .routedId(routedId) + .streamId(streamId) + .sequence(0) + .acknowledge(0) + .maximum(0) + .traceId(traceId) + .authorization(authorization) + .build(); + + receiver.accept(abort.typeId(), abort.buffer(), abort.offset(), abort.sizeof()); + } + + private void doReset( + MessageConsumer receiver, + long originId, + long routedId, + long streamId, + long traceId, + long authorization) + { + final ResetFW reset = resetRW.wrap(writeBuffer, 0, writeBuffer.capacity()) + .originId(originId) + .routedId(routedId) + .streamId(streamId) + .sequence(0) + .acknowledge(0) + .maximum(0) + .traceId(traceId) + .authorization(authorization) + .build(); + + receiver.accept(reset.typeId(), reset.buffer(), reset.offset(), reset.sizeof()); + } + + private void doWindow( + MessageConsumer receiver, + long originId, + long routedId, + long streamId, + long traceId, + long authorization, + long budgetId, + int credit, + int padding) + { + final WindowFW window = windowRW.wrap(writeBuffer, 0, writeBuffer.capacity()) + .originId(originId) + .routedId(routedId) + .streamId(streamId) + .sequence(0) + .acknowledge(0) + .maximum(credit) + .traceId(traceId) + .authorization(authorization) + .budgetId(budgetId) + .padding(padding) + .build(); + + receiver.accept(window.typeId(), window.buffer(), window.offset(), window.sizeof()); + } + + private final class McpServerStream + { + private final MessageConsumer sender; + private final long originId; + private final long routedId; + private final long initialId; + private final long replyId; + private final long resolvedId; + private final long affinity; + private final long authorization; + + private long downstreamInitialId; + private long downstreamReplyId; + private MessageConsumer downstream; + + private McpServerStream( + MessageConsumer sender, + long originId, + long routedId, + long initialId, + long resolvedId, + long affinity, + long authorization) + { + this.sender = sender; + this.originId = originId; + this.routedId = routedId; + this.initialId = initialId; + this.replyId = supplyReplyId.applyAsLong(initialId); + this.resolvedId = resolvedId; + this.affinity = affinity; + this.authorization = authorization; + } + + private void onMessage( + int msgTypeId, + DirectBuffer buffer, + int index, + int length) + { + switch (msgTypeId) + { + case BeginFW.TYPE_ID: + final BeginFW begin = beginRO.wrap(buffer, index, index + length); + onBegin(begin); + break; + case DataFW.TYPE_ID: + final DataFW data = dataRO.wrap(buffer, index, index + length); + onData(data); + break; + case EndFW.TYPE_ID: + final EndFW end = endRO.wrap(buffer, index, index + length); + onEnd(end); + break; + case AbortFW.TYPE_ID: + final AbortFW abort = abortRO.wrap(buffer, index, index + length); + onAbort(abort); + break; + case FlushFW.TYPE_ID: + final FlushFW flush = flushRO.wrap(buffer, index, index + length); + onFlush(flush); + break; + case WindowFW.TYPE_ID: + final WindowFW window = windowRO.wrap(buffer, index, index + length); + onWindow(window); + break; + case ResetFW.TYPE_ID: + final ResetFW reset = resetRO.wrap(buffer, index, index + length); + onReset(reset); + break; + default: + break; + } + } + + private void onBegin( + BeginFW begin) + { + final long traceId = begin.traceId(); + + downstreamInitialId = supplyInitialId.applyAsLong(resolvedId); + downstreamReplyId = supplyReplyId.applyAsLong(downstreamInitialId); + downstream = streamFactory.newStream(BeginFW.TYPE_ID, writeBuffer, 0, 0, this::onDownstreamMessage); + + if (downstream != null) + { + doBegin(downstream, originId, resolvedId, downstreamInitialId, + traceId, authorization, affinity); + } + + doWindow(sender, originId, routedId, replyId, traceId, authorization, 0, + writeBuffer.capacity(), 0); + } + + private void onData( + DataFW data) + { + final long traceId = data.traceId(); + final long budgetId = data.budgetId(); + final int flags = data.flags(); + final int reserved = data.reserved(); + final DirectBuffer payload = data.payload(); + + if (downstream != null && payload != null) + { + doData(downstream, originId, resolvedId, downstreamInitialId, + traceId, authorization, budgetId, flags, reserved, + payload, 0, payload.capacity()); + } + } + + private void onEnd( + EndFW end) + { + final long traceId = end.traceId(); + + if (downstream != null) + { + doEnd(downstream, originId, resolvedId, downstreamInitialId, traceId, authorization); + } + } + + private void onAbort( + AbortFW abort) + { + final long traceId = abort.traceId(); + + if (downstream != null) + { + doAbort(downstream, originId, resolvedId, downstreamInitialId, traceId, authorization); + } + } + + private void onFlush( + FlushFW flush) + { + // pass-through flush — no action required for basic server kind + } + + private void onWindow( + WindowFW window) + { + final long traceId = window.traceId(); + final long budgetId = window.budgetId(); + final int credit = window.maximum(); + final int padding = window.padding(); + + doWindow(sender, originId, routedId, initialId, traceId, authorization, budgetId, credit, padding); + } + + private void onReset( + ResetFW reset) + { + final long traceId = reset.traceId(); + + doReset(sender, originId, routedId, initialId, traceId, authorization); + } + + private void onDownstreamMessage( + int msgTypeId, + DirectBuffer buffer, + int index, + int length) + { + switch (msgTypeId) + { + case BeginFW.TYPE_ID: + final BeginFW begin = beginRO.wrap(buffer, index, index + length); + onDownstreamBegin(begin); + break; + case DataFW.TYPE_ID: + final DataFW data = dataRO.wrap(buffer, index, index + length); + onDownstreamData(data); + break; + case EndFW.TYPE_ID: + final EndFW end = endRO.wrap(buffer, index, index + length); + onDownstreamEnd(end); + break; + case AbortFW.TYPE_ID: + final AbortFW abort = abortRO.wrap(buffer, index, index + length); + onDownstreamAbort(abort); + break; + case WindowFW.TYPE_ID: + final WindowFW window = windowRO.wrap(buffer, index, index + length); + onDownstreamWindow(window); + break; + case ResetFW.TYPE_ID: + final ResetFW reset = resetRO.wrap(buffer, index, index + length); + onDownstreamReset(reset); + break; + default: + break; + } + } + + private void onDownstreamBegin( + BeginFW begin) + { + final long traceId = begin.traceId(); + + doBegin(sender, originId, routedId, replyId, traceId, authorization, affinity); + } + + private void onDownstreamData( + DataFW data) + { + final long traceId = data.traceId(); + final long budgetId = data.budgetId(); + final int flags = data.flags(); + final int reserved = data.reserved(); + final DirectBuffer payload = data.payload(); + + if (payload != null) + { + doData(sender, originId, routedId, replyId, + traceId, authorization, budgetId, flags, reserved, + payload, 0, payload.capacity()); + } + } + + private void onDownstreamEnd( + EndFW end) + { + final long traceId = end.traceId(); + + doEnd(sender, originId, routedId, replyId, traceId, authorization); + } + + private void onDownstreamAbort( + AbortFW abort) + { + final long traceId = abort.traceId(); + + doAbort(sender, originId, routedId, replyId, traceId, authorization); + } + + private void onDownstreamWindow( + WindowFW window) + { + final long traceId = window.traceId(); + final long budgetId = window.budgetId(); + final int credit = window.maximum(); + final int padding = window.padding(); + + if (downstream != null) + { + doWindow(downstream, originId, resolvedId, downstreamInitialId, + traceId, authorization, budgetId, credit, padding); + } + } + + private void onDownstreamReset( + ResetFW reset) + { + final long traceId = reset.traceId(); + + doReset(downstream, originId, resolvedId, downstreamReplyId, traceId, authorization); + } + } +} diff --git a/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/stream/McpStreamFactory.java b/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/stream/McpStreamFactory.java new file mode 100644 index 0000000000..34952e73b2 --- /dev/null +++ b/runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/stream/McpStreamFactory.java @@ -0,0 +1,27 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.mcp.internal.stream; + +import io.aklivity.zilla.runtime.engine.binding.BindingHandler; +import io.aklivity.zilla.runtime.engine.config.BindingConfig; + +public interface McpStreamFactory extends BindingHandler +{ + void attach( + BindingConfig binding); + + void detach( + long bindingId); +} diff --git a/runtime/binding-mcp/src/main/moditect/module-info.java b/runtime/binding-mcp/src/main/moditect/module-info.java new file mode 100644 index 0000000000..9804d1a2d4 --- /dev/null +++ b/runtime/binding-mcp/src/main/moditect/module-info.java @@ -0,0 +1,29 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +module io.aklivity.zilla.runtime.binding.mcp +{ + requires io.aklivity.zilla.runtime.engine; + + exports io.aklivity.zilla.runtime.binding.mcp.config; + + provides io.aklivity.zilla.runtime.engine.binding.BindingFactorySpi + with io.aklivity.zilla.runtime.binding.mcp.internal.McpBindingFactorySpi; + + provides io.aklivity.zilla.runtime.engine.config.OptionsConfigAdapterSpi + with io.aklivity.zilla.runtime.binding.mcp.internal.config.McpOptionsConfigAdapter; + + provides io.aklivity.zilla.runtime.engine.config.ConditionConfigAdapterSpi + with io.aklivity.zilla.runtime.binding.mcp.internal.config.McpConditionConfigAdapter; +} diff --git a/runtime/binding-mcp/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.binding.BindingFactorySpi b/runtime/binding-mcp/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.binding.BindingFactorySpi new file mode 100644 index 0000000000..fee8bfc758 --- /dev/null +++ b/runtime/binding-mcp/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.binding.BindingFactorySpi @@ -0,0 +1 @@ +io.aklivity.zilla.runtime.binding.mcp.internal.McpBindingFactorySpi diff --git a/runtime/binding-mcp/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.config.ConditionConfigAdapterSpi b/runtime/binding-mcp/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.config.ConditionConfigAdapterSpi new file mode 100644 index 0000000000..f35c3d833c --- /dev/null +++ b/runtime/binding-mcp/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.config.ConditionConfigAdapterSpi @@ -0,0 +1 @@ +io.aklivity.zilla.runtime.binding.mcp.internal.config.McpConditionConfigAdapter diff --git a/runtime/binding-mcp/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.config.OptionsConfigAdapterSpi b/runtime/binding-mcp/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.config.OptionsConfigAdapterSpi new file mode 100644 index 0000000000..ceff7e2fa1 --- /dev/null +++ b/runtime/binding-mcp/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.config.OptionsConfigAdapterSpi @@ -0,0 +1 @@ +io.aklivity.zilla.runtime.binding.mcp.internal.config.McpOptionsConfigAdapter diff --git a/runtime/pom.xml b/runtime/pom.xml index d42e0cc398..24d4b29308 100644 --- a/runtime/pom.xml +++ b/runtime/pom.xml @@ -31,6 +31,10 @@ binding-kafka-grpc binding-mqtt binding-mqtt-kafka + binding-mcp + binding-mcp-http + binding-mcp-kafka + binding-mcp-openapi binding-openapi binding-openapi-asyncapi binding-proxy @@ -138,6 +142,26 @@ binding-mqtt-kafka ${project.version} + + ${project.groupId} + binding-mcp + ${project.version} + + + ${project.groupId} + binding-mcp-http + ${project.version} + + + ${project.groupId} + binding-mcp-kafka + ${project.version} + + + ${project.groupId} + binding-mcp-openapi + ${project.version} + ${project.groupId} binding-openapi diff --git a/specs/binding-kafka.spec/src/main/resources/META-INF/zilla/kafka.idl b/specs/binding-kafka.spec/src/main/resources/META-INF/zilla/kafka.idl index 4677f3a27f..06615a66e1 100644 --- a/specs/binding-kafka.spec/src/main/resources/META-INF/zilla/kafka.idl +++ b/specs/binding-kafka.spec/src/main/resources/META-INF/zilla/kafka.idl @@ -63,10 +63,10 @@ scope kafka HEADERS (3) } - enum KafkaResourceType (uint8) + enum KafkaResourceType (int8) { - BROKER(4), - TOPIC(2) + TOPIC (2), + BROKER (4) } union KafkaCondition switch (uint8) @@ -187,10 +187,13 @@ scope kafka BOOTSTRAP (254), MERGED (255), DESCRIBE_CLUSTER (60), + ALTER_CONSUMER_GROUP_OFFSETS (53), ALTER_CONFIGS (33), INIT_PRODUCER_ID (22), DELETE_TOPICS (20), CREATE_TOPICS (19), + LIST_GROUPS (16), + DESCRIBE_GROUPS (15), META (3), OFFSET_COMMIT (8), OFFSET_FETCH (9), @@ -330,20 +333,38 @@ scope kafka string16 topic; } + struct KafkaPartitionMetadata + { + int32 partitionId; + int32 leaderId; + int32[] replicas; + int32[] isr; + } + struct KafkaMetaDataEx { - KafkaPartition[] partitions; + int16 replicationFactor; + KafkaPartitionMetadata[] partitions; } struct KafkaDescribeBeginEx { - string16 topic; + KafkaResourceType resourceType = TOPIC; + string16 name; string16[] configs; } + struct KafkaConfigDetail + { + string16 name; + string16 value; + uint8 isDefault; + uint8 isSensitive; + } + struct KafkaDescribeDataEx { - KafkaConfig[] configs; + KafkaConfigDetail[] configs; } struct KafkaFetchBeginEx @@ -564,12 +585,74 @@ scope kafka uint8 includeAuthorizedOperations; } + struct KafkaGroupState + { + string16 groupId; + string16 protocolType; + string16 groupState; + } + + struct KafkaListGroupsRequestBeginEx + { + string16[] statesFilter; + } + + struct KafkaDescribeGroupMember + { + string16 memberId; + string16 clientId; + string16 clientHost; + varint32 metadataLen; + octets[metadataLen] metadata = null; + varint32 assignmentLen; + octets[assignmentLen] assignment = null; + } + + struct KafkaDescribeGroupInfo + { + int16 error; + string16 groupId; + string16 groupState; + string16 protocolType; + string16 protocol; + KafkaDescribeGroupMember[] members; + } + + struct KafkaDescribeGroupsRequestBeginEx + { + string16[] groupIds; + uint8 includeAuthorizedOperations; + } + + struct KafkaAlterGroupTopicPartition + { + int32 partitionId; + int64 offset; + int32 leaderEpoch = -1; + string16 metadata = null; + } + + struct KafkaAlterGroupTopic + { + string16 name; + KafkaAlterGroupTopicPartition[] partitions; + } + + struct KafkaAlterConsumerGroupOffsetsRequestBeginEx + { + string16 groupId; + KafkaAlterGroupTopic[] topics; + } + union KafkaRequestBeginEx switch (uint8) { case 19: kafka::stream::KafkaCreateTopicsRequestBeginEx createTopics; case 20: kafka::stream::KafkaDeleteTopicsRequestBeginEx deleteTopics; case 33: kafka::stream::KafkaAlterConfigsRequestBeginEx alterConfigs; case 60: kafka::stream::KafkaDescribeClusterRequestBeginEx describeCluster; + case 15: kafka::stream::KafkaDescribeGroupsRequestBeginEx describeGroups; + case 16: kafka::stream::KafkaListGroupsRequestBeginEx listGroups; + case 53: kafka::stream::KafkaAlterConsumerGroupOffsetsRequestBeginEx alterConsumerGroupOffsets; } struct KafkaCreateTopicStatus @@ -630,12 +713,46 @@ scope kafka int32 authorizedOperations; } + struct KafkaListGroupsResponseBeginEx + { + int32 throttle; + int16 error; + KafkaGroupState[] groups; + } + + struct KafkaDescribeGroupsResponseBeginEx + { + int32 throttle; + KafkaDescribeGroupInfo[] groups; + } + + struct KafkaAlterGroupPartitionResult + { + int32 partitionId; + int16 error; + } + + struct KafkaAlterGroupTopicResult + { + string16 name; + KafkaAlterGroupPartitionResult[] partitions; + } + + struct KafkaAlterConsumerGroupOffsetsResponseBeginEx + { + int32 throttle; + KafkaAlterGroupTopicResult[] topics; + } + union KafkaResponseBeginEx switch (uint8) { case 19: kafka::stream::KafkaCreateTopicsResponseBeginEx createTopics; case 20: kafka::stream::KafkaDeleteTopicsResponseBeginEx deleteTopics; case 33: kafka::stream::KafkaAlterConfigsResponseBeginEx alterConfigs; case 60: kafka::stream::KafkaDescribeClusterResponseBeginEx describeCluster; + case 15: kafka::stream::KafkaDescribeGroupsResponseBeginEx describeGroups; + case 16: kafka::stream::KafkaListGroupsResponseBeginEx listGroups; + case 53: kafka::stream::KafkaAlterConsumerGroupOffsetsResponseBeginEx alterConsumerGroupOffsets; } } diff --git a/specs/binding-mcp-kafka.spec/pom.xml b/specs/binding-mcp-kafka.spec/pom.xml new file mode 100644 index 0000000000..4219f3701c --- /dev/null +++ b/specs/binding-mcp-kafka.spec/pom.xml @@ -0,0 +1,139 @@ + + + + 4.0.0 + + io.aklivity.zilla + specs + develop-SNAPSHOT + ../pom.xml + + + binding-mcp-kafka.spec + zilla::specs::binding-mcp-kafka.spec + + + + Aklivity Community License Agreement + https://www.aklivity.io/aklivity-community-license/ + repo + + + + + 1.00 + 0 + + + + + io.aklivity.k3po + lang + provided + + + ${project.groupId} + engine.spec + ${project.version} + + + junit + junit + test + + + io.aklivity.k3po + control-junit + test + + + org.hamcrest + hamcrest-library + test + + + + + + + src/main/resources + + + src/main/scripts + + + + + + org.jasig.maven + maven-notice-plugin + + + com.mycila + license-maven-plugin + + + maven-checkstyle-plugin + + + org.apache.maven.plugins + maven-compiler-plugin + + + org.apache.maven.plugins + maven-surefire-plugin + + + org.moditect + moditect-maven-plugin + + + io.aklivity.k3po + k3po-maven-plugin + + + ${project.groupId} + engine + ${project.version} + test-jar + + + ${project.groupId} + engine + ${project.version} + + + + + org.apache.maven.plugins + maven-failsafe-plugin + + + org.jacoco + jacoco-maven-plugin + + + + BUNDLE + + + INSTRUCTION + COVEREDRATIO + ${jacoco.coverage.ratio} + + + CLASS + MISSEDCOUNT + ${jacoco.missed.count} + + + + + + + + + diff --git a/specs/binding-mcp-kafka.spec/src/main/moditect/module-info.java b/specs/binding-mcp-kafka.spec/src/main/moditect/module-info.java new file mode 100644 index 0000000000..912a41d7a6 --- /dev/null +++ b/specs/binding-mcp-kafka.spec/src/main/moditect/module-info.java @@ -0,0 +1,18 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +open module io.aklivity.zilla.specs.binding.mcp.kafka +{ + requires transitive io.aklivity.zilla.specs.engine; +} diff --git a/specs/binding-mcp-kafka.spec/src/main/resources/META-INF/zilla/mcp.idl b/specs/binding-mcp-kafka.spec/src/main/resources/META-INF/zilla/mcp.idl new file mode 100644 index 0000000000..64e6db0b7e --- /dev/null +++ b/specs/binding-mcp-kafka.spec/src/main/resources/META-INF/zilla/mcp.idl @@ -0,0 +1,25 @@ +/* + * Copyright 2021-2024 Aklivity Inc. + * + * Aklivity licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +scope mcp +{ + scope stream + { + struct McpBeginEx extends core::stream::Extension + { + string8 kind; + } + } +} diff --git a/specs/binding-mcp-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mcp/kafka/schema/mcp.kafka.schema.patch.json b/specs/binding-mcp-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mcp/kafka/schema/mcp.kafka.schema.patch.json new file mode 100644 index 0000000000..180a9e01b2 --- /dev/null +++ b/specs/binding-mcp-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mcp/kafka/schema/mcp.kafka.schema.patch.json @@ -0,0 +1,80 @@ +[ + { + "op": "add", + "path": "/$defs/binding/properties/type/enum/-", + "value": "mcp_kafka" + }, + { + "op": "add", + "path": "/$defs/binding/allOf/-", + "value": + { + "if": + { + "properties": + { + "type": + { + "const": "mcp_kafka" + } + } + }, + "then": + { + "properties": + { + "type": + { + "const": "mcp_kafka" + }, + "kind": + { + "enum": [ "proxy" ] + }, + "catalog": false, + "vault": false, + "options": false, + "routes": + { + "title": "Routes", + "type": "array", + "items": + { + "type": "object", + "properties": + { + "when": + { + "title": "When", + "type": "array", + "items": + { + "type": "object", + "properties": + { + "tool": + { + "title": "Tool Name", + "type": "string" + }, + "resource": + { + "title": "Resource URI", + "type": "string" + } + }, + "additionalProperties": false + } + } + } + } + } + }, + "required": + [ + "routes" + ] + } + } + } +] diff --git a/specs/binding-mcp.spec/src/main/moditect/module-info.java b/specs/binding-mcp.spec/src/main/moditect/module-info.java new file mode 100644 index 0000000000..5acfb0c836 --- /dev/null +++ b/specs/binding-mcp.spec/src/main/moditect/module-info.java @@ -0,0 +1,18 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +open module io.aklivity.zilla.specs.binding.mcp +{ + requires transitive io.aklivity.zilla.specs.engine; +} diff --git a/specs/pom.xml b/specs/pom.xml index a04ebc5efe..c01ab068c7 100644 --- a/specs/pom.xml +++ b/specs/pom.xml @@ -28,6 +28,10 @@ binding-http.spec binding-grpc.spec binding-mqtt.spec + binding-mcp.spec + binding-mcp-http.spec + binding-mcp-kafka.spec + binding-mcp-openapi.spec binding-openapi.spec binding-openapi-asyncapi.spec binding-sse.spec @@ -66,6 +70,11 @@ engine.spec ${project.version} + + ${project.groupId} + binding-mcp-openapi.spec + ${project.version} + ${project.groupId} binding-echo.spec @@ -126,6 +135,21 @@ binding-mqtt.spec ${project.version} + + ${project.groupId} + binding-mcp.spec + ${project.version} + + + ${project.groupId} + binding-mcp-http.spec + ${project.version} + + + ${project.groupId} + binding-mcp-kafka.spec + ${project.version} + ${project.groupId} binding-mqtt-kafka.spec