Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@
import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaBindingConfig;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.Array32FW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.ArrayFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaConfigFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaConfigDetailFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaOffsetFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaOffsetType;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaPartitionFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaPartitionMetadataFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.OctetsFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.String16FW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.AbortFW;
Expand Down Expand Up @@ -666,14 +666,14 @@ private void doBootstrapCleanup(

private void onTopicConfigChanged(
long traceId,
ArrayFW<KafkaConfigFW> changedConfigs)
ArrayFW<KafkaConfigDetailFW> changedConfigs)
{
metaStream.doMetaInitialBeginIfNecessary(traceId);
}

private void onTopicMetaDataChanged(
long traceId,
ArrayFW<KafkaPartitionFW> partitions)
ArrayFW<KafkaPartitionMetadataFW> partitions)
{
leadersByPartitionId.clear();
partitions.forEach(p -> leadersByPartitionId.put(p.partitionId(), p.leaderId()));
Expand All @@ -698,7 +698,7 @@ private void onTopicMetaDataChanged(

private void onPartitionMetaDataChangedIfNecessary(
long traceId,
KafkaPartitionFW partition)
KafkaPartitionMetadataFW partition)
{
final int partitionId = partition.partitionId();
final int leaderId = partition.leaderId();
Expand Down Expand Up @@ -886,7 +886,7 @@ private void doDescribeInitialBegin(
traceId, bootstrap.authorization, 0L,
ex -> ex.set((b, o, l) -> kafkaBeginExRW.wrap(b, o, l)
.typeId(kafkaTypeId)
.describe(m -> m.topic(bootstrap.topic)
.describe(m -> m.name(bootstrap.topic)
.configsItem(ci -> ci.set(CONFIG_NAME_CLEANUP_POLICY))
.configsItem(ci -> ci.set(CONFIG_NAME_MAX_MESSAGE_BYTES))
.configsItem(ci -> ci.set(CONFIG_NAME_SEGMENT_BYTES))
Expand Down Expand Up @@ -1009,7 +1009,7 @@ private void onDescribeReplyData(
{
final KafkaDataExFW kafkaDataEx = extension.get(kafkaDataExRO::wrap);
final KafkaDescribeDataExFW kafkaDescribeDataEx = kafkaDataEx.describe();
final ArrayFW<KafkaConfigFW> changedConfigs = kafkaDescribeDataEx.configs();
final ArrayFW<KafkaConfigDetailFW> changedConfigs = kafkaDescribeDataEx.configs();

bootstrap.onTopicConfigChanged(traceId, changedConfigs);

Expand Down Expand Up @@ -1266,7 +1266,7 @@ private void onMetaReplyData(
{
final KafkaDataExFW kafkaDataEx = extension.get(kafkaDataExRO::wrap);
final KafkaMetaDataExFW kafkaMetaDataEx = kafkaDataEx.meta();
final ArrayFW<KafkaPartitionFW> partitions = kafkaMetaDataEx.partitions();
final ArrayFW<KafkaPartitionMetadataFW> partitions = kafkaMetaDataEx.partitions();

bootstrap.onTopicMetaDataChanged(traceId, partitions);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaRouteConfig;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.ArrayFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.Flyweight;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaConfigFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaConfigDetailFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.OctetsFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.String16FW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.AbortFW;
Expand Down Expand Up @@ -127,7 +127,7 @@ public MessageConsumer newStream(
final KafkaBeginExFW kafkaBeginEx = extension.get(kafkaBeginExRO::tryWrap);
assert kafkaBeginEx.kind() == KafkaBeginExFW.KIND_DESCRIBE;
final KafkaDescribeBeginExFW kafkaDescribeBeginEx = kafkaBeginEx.describe();
final String16FW topic = kafkaDescribeBeginEx.topic();
final String16FW topic = kafkaDescribeBeginEx.name();
final String topicName = topic.asString();

MessageConsumer newStream = null;
Expand Down Expand Up @@ -484,7 +484,7 @@ private void doDescribeFanoutInitialBegin(
traceId, authorization, 0L,
ex -> ex.set((b, o, l) -> kafkaBeginExRW.wrap(b, o, l)
.typeId(kafkaTypeId)
.describe(d -> d.topic(topic).configs(cs -> configNames.forEach(c -> cs.item(i -> i.set(c, UTF_8)))))
.describe(d -> d.name(topic).configs(cs -> configNames.forEach(c -> cs.item(i -> i.set(c, UTF_8)))))
.build()
.sizeof()));
state = KafkaState.openingInitial(state);
Expand Down Expand Up @@ -606,7 +606,7 @@ private void onDescribeFanoutReplyData(

if (kafkaDescribeDataEx != null)
{
final ArrayFW<KafkaConfigFW> changedConfigs = kafkaDescribeDataEx.configs();
final ArrayFW<KafkaConfigDetailFW> changedConfigs = kafkaDescribeDataEx.configs();
if (configValues == null)
{
configValues = new LinkedHashMap<>();
Expand Down Expand Up @@ -852,7 +852,7 @@ private void doDescribeReplyBegin(
traceId, authorization, affinity,
ex -> ex.set((b, o, l) -> kafkaBeginExRW.wrap(b, o, l)
.typeId(kafkaTypeId)
.describe(m -> m.topic(group.topic)
.describe(m -> m.name(group.topic)
.configs(cs -> group.configNames.forEach(c -> cs.item(i -> i.set(c, UTF_8)))))
.build()
.sizeof()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaRouteConfig;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.ArrayFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.Flyweight;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaPartitionFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaPartitionMetadataFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.OctetsFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.String16FW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.AbortFW;
Expand Down Expand Up @@ -471,8 +471,10 @@ private void onMetaFanoutMemberOpened(
final KafkaDataExFW kafkaDataEx =
kafkaDataExRW.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(kafkaTypeId)
.meta(m -> leadersByPartitionId.forEach((p, l) -> m.partitionsItem(i -> i.partitionId(p)
.leaderId(l))))
.meta(m -> m.replicationFactor((short) 0)
.partitions(ps -> leadersByPartitionId.forEach(
(p, l) -> ps.item(i -> i.partitionId(p).leaderId(l)
.replicas(r -> {}).isr(r -> {})))))
.build();
member.doMetaReplyDataIfNecessary(traceId, kafkaDataEx);
}
Expand Down Expand Up @@ -699,7 +701,7 @@ private void onMetaFanoutReplyData(

if (kafkaMetaDataEx != null)
{
final ArrayFW<KafkaPartitionFW> partitions = kafkaMetaDataEx.partitions();
final ArrayFW<KafkaPartitionMetadataFW> partitions = kafkaMetaDataEx.partitions();
leadersByPartitionId.clear();
partitions.forEach(p -> leadersByPartitionId.put(p.partitionId(), p.leaderId()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaRouteConfig;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.ArrayFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.Flyweight;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaConfigFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaConfigDetailFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.OctetsFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.String16FW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.AbortFW;
Expand Down Expand Up @@ -152,7 +152,7 @@ public MessageConsumer newStream(
final KafkaBeginExFW kafkaBeginEx = extension.get(kafkaBeginExRO::tryWrap);
assert kafkaBeginEx.kind() == KafkaBeginExFW.KIND_DESCRIBE;
final KafkaDescribeBeginExFW kafkaDescribeBeginEx = kafkaBeginEx.describe();
final String16FW beginTopic = kafkaDescribeBeginEx.topic();
final String16FW beginTopic = kafkaDescribeBeginEx.name();
final String topicName = beginTopic.asString();

MessageConsumer newStream = null;
Expand Down Expand Up @@ -471,7 +471,8 @@ private void onDescribeFanoutMemberOpened(
final KafkaDataExFW kafkaDataEx =
kafkaDataExRW.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(kafkaTypeId)
.describe(d -> configValues.forEach((n, v) -> d.configsItem(i -> i.name(n).value(v))))
.describe(d -> d.configs(cs -> configValues.forEach(
(n, v) -> cs.item(i -> i.name(n).value(v).isDefault(0).isSensitive(0)))))
.build();
member.doDescribeReplyDataIfNecessary(traceId, kafkaDataEx);
}
Expand Down Expand Up @@ -527,7 +528,7 @@ private void doDescribeFanoutInitialBegin(
traceId, authorization, 0L,
ex -> ex.set((b, o, l) -> kafkaBeginExRW.wrap(b, o, l)
.typeId(kafkaTypeId)
.describe(d -> d.topic(topic.name())
.describe(d -> d.name(topic.name())
.configs(cs -> configNames.forEach(c -> cs.item(i -> i.set(c, UTF_8)))))
.build()
.sizeof()));
Expand Down Expand Up @@ -711,7 +712,7 @@ private void onDescribeFanoutReplyData(

if (kafkaDescribeDataEx != null)
{
final ArrayFW<KafkaConfigFW> changedConfigs = kafkaDescribeDataEx.configs();
final ArrayFW<KafkaConfigDetailFW> changedConfigs = kafkaDescribeDataEx.configs();
if (configValues == null)
{
configValues = new LinkedHashMap<>();
Expand All @@ -726,7 +727,7 @@ private void onDescribeFanoutReplyData(
}

private void onDescribeFanoutConfigChanged(
KafkaConfigFW config)
KafkaConfigDetailFW config)
{
final String16FW configName = config.name();
final String16FW configValue = config.value();
Expand Down Expand Up @@ -1021,7 +1022,7 @@ private void doDescribeReplyBegin(
traceId, authorization, affinity,
ex -> ex.set((b, o, l) -> kafkaBeginExRW.wrap(b, o, l)
.typeId(kafkaTypeId)
.describe(m -> m.topic(group.topic.name())
.describe(m -> m.name(group.topic.name())
.configs(cs -> group.configNames.forEach(c -> cs.item(i -> i.set(c, UTF_8)))))
.build()
.sizeof()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public MessageConsumer newStream(

assert kafkaBeginEx.kind() == KafkaBeginExFW.KIND_DESCRIBE;
final KafkaDescribeBeginExFW kafkaDescribeBeginEx = kafkaBeginEx.describe();
final String16FW beginTopic = kafkaDescribeBeginEx.topic();
final String16FW beginTopic = kafkaDescribeBeginEx.name();
final String topicName = beginTopic.asString();

MessageConsumer newStream = null;
Expand Down Expand Up @@ -823,7 +823,7 @@ private void doApplicationBegin(
ex -> ex.set((b, o, l) -> kafkaBeginExRW.wrap(b, o, l)
.typeId(kafkaTypeId)
.describe(m -> m
.topic(topic)
.name(topic)
.configs(cs -> configs.forEach(n ->
{
cs.item(i -> i.set(n, UTF_8));
Expand Down Expand Up @@ -1675,7 +1675,7 @@ private void onDecodeDescribeResponse(
{
final KafkaDataExFW kafkaDataEx = kafkaDataExRW.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(kafkaTypeId)
.describe(d -> changedConfigs.forEach(n -> d.configsItem(ci -> ci.name(n).value(configs.get(n)))))
.describe(d -> changedConfigs.forEach(n -> d.configsItem(ci -> ci.name(n).value(configs.get(n)).isDefault(0).isSensitive(0))))
.build();

doApplicationData(traceId, authorization, kafkaDataEx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1891,7 +1891,9 @@ private void onDecodeMetaResponse(

final KafkaDataExFW kafkaDataEx = kafkaDataExRW.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(kafkaTypeId)
.meta(m -> partitions.forEach((k, v) -> m.partitionsItem(pi -> pi.partitionId(k).leaderId(v))))
.meta(m -> m.replicationFactor((short) 0)
.partitions(ps -> partitions.forEach((k, v) ->
ps.item(pi -> pi.partitionId(k).leaderId(v).replicas(r -> {}).isr(r -> {})))))
.build();

doApplicationData(traceId, authorization, kafkaDataEx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaAckMode;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaCapabilities;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaConditionFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaConfigFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaConfigDetailFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaDeltaFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaDeltaType;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaEvaluation;
Expand All @@ -65,7 +65,7 @@
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaNotFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaOffsetFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaOffsetType;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaPartitionFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaPartitionMetadataFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaValueMatchFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.OctetsFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.String16FW;
Expand Down Expand Up @@ -1984,7 +1984,7 @@ private void cleanupBudgetCreditorIfNecessary()

private void onTopicConfigChanged(
long traceId,
ArrayFW<KafkaConfigFW> configs)
ArrayFW<KafkaConfigDetailFW> configs)
{
configs.forEach(c ->
{
Expand All @@ -1998,7 +1998,7 @@ private void onTopicConfigChanged(

private void onTopicMetaDataChanged(
long traceId,
ArrayFW<KafkaPartitionFW> partitions)
ArrayFW<KafkaPartitionMetadataFW> partitions)
{
leadersByPartitionId.clear();
partitions.forEach(p -> leadersByPartitionId.put(p.partitionId(), p.leaderId()));
Expand Down Expand Up @@ -2390,7 +2390,7 @@ private void doDescribeInitialBegin(
traceId, merged.authorization, 0L,
ex -> ex.set((b, o, l) -> kafkaBeginExRW.wrap(b, o, l)
.typeId(kafkaTypeId)
.describe(m -> m.topic(merged.topic)
.describe(m -> m.name(merged.topic)
.configsItem(ci -> ci.set(CONFIG_NAME_CLEANUP_POLICY))
.configsItem(ci -> ci.set(CONFIG_NAME_MAX_MESSAGE_BYTES))
.configsItem(ci -> ci.set(CONFIG_NAME_SEGMENT_BYTES))
Expand Down Expand Up @@ -2513,7 +2513,7 @@ private void onDescribeReplyData(
{
final KafkaDataExFW kafkaDataEx = extension.get(kafkaDataExRO::wrap);
final KafkaDescribeDataExFW kafkaDescribeDataEx = kafkaDataEx.describe();
final ArrayFW<KafkaConfigFW> configs = kafkaDescribeDataEx.configs();
final ArrayFW<KafkaConfigDetailFW> configs = kafkaDescribeDataEx.configs();
merged.onTopicConfigChanged(traceId, configs);

doDescribeReplyWindow(traceId, 0, replyMax);
Expand Down Expand Up @@ -2769,7 +2769,7 @@ private void onMetaReplyData(
{
final KafkaDataExFW kafkaDataEx = extension.get(kafkaDataExRO::wrap);
final KafkaMetaDataExFW kafkaMetaDataEx = kafkaDataEx.meta();
final ArrayFW<KafkaPartitionFW> partitions = kafkaMetaDataEx.partitions();
final ArrayFW<KafkaPartitionMetadataFW> partitions = kafkaMetaDataEx.partitions();
merged.onTopicMetaDataChanged(traceId, partitions);

doMetaReplyWindow(traceId, 0, replyMax);
Expand Down
Loading
Loading