Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
import java.io.StringWriter;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -100,6 +103,8 @@ public abstract class AsyncapiCompositeGenerator
.build())
);

private final Set<String> unresolved = new LinkedHashSet<>();

public final AsyncapiCompositeConfig generate(
AsyncapiBindingConfig binding)
{
Expand All @@ -123,13 +128,20 @@ public final AsyncapiCompositeConfig generate(
: specification.servers;
final AsyncapiView asyncapi = AsyncapiView.of(tagIndex++, label, parser.parse(payload), configs);

unresolved.addAll(asyncapi.unresolvedRefs());

schemas.add(new AsyncapiSchemaConfig(label, schemaId, asyncapi));
}
}

return generate(binding, schemas);
}

public final Collection<String> unresolvedRefs()
{
return unresolved;
}

protected abstract AsyncapiCompositeConfig generate(
AsyncapiBindingConfig binding,
List<AsyncapiSchemaConfig> schemas);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 2021-2024 Aklivity Inc
*
* Licensed under the Aklivity Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* https://www.aklivity.io/aklivity-community-license/
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.aklivity.zilla.runtime.binding.asyncapi.internal.event;

import static io.aklivity.zilla.runtime.binding.asyncapi.internal.types.event.AsyncapiEventType.UNRESOLVED_REF;

import java.nio.ByteBuffer;
import java.time.Clock;

import org.agrona.concurrent.AtomicBuffer;
import org.agrona.concurrent.UnsafeBuffer;

import io.aklivity.zilla.runtime.binding.asyncapi.internal.AsyncapiBinding;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.types.event.AsyncapiEventExFW;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.types.event.EventFW;
import io.aklivity.zilla.runtime.engine.EngineContext;
import io.aklivity.zilla.runtime.engine.binding.function.MessageConsumer;

public class AsyncapiEventContext
{
private static final int EVENT_BUFFER_CAPACITY = 1024;

private final AtomicBuffer eventBuffer = new UnsafeBuffer(ByteBuffer.allocate(EVENT_BUFFER_CAPACITY));
private final AtomicBuffer extensionBuffer = new UnsafeBuffer(ByteBuffer.allocate(EVENT_BUFFER_CAPACITY));
private final EventFW.Builder eventRW = new EventFW.Builder();
private final AsyncapiEventExFW.Builder asyncapiEventExRW = new AsyncapiEventExFW.Builder();
private final int asyncapiTypeId;
private final int unresolvedRef;
private final MessageConsumer eventWriter;
private final Clock clock;

public AsyncapiEventContext(
EngineContext context)
{
this.asyncapiTypeId = context.supplyTypeId(AsyncapiBinding.NAME);
this.unresolvedRef = context.supplyEventId("binding.asyncapi.unresolved.ref");
this.eventWriter = context.supplyEventWriter();
this.clock = context.clock();
}

public void unresolvedRef(
long bindingId,
String ref)
{
AsyncapiEventExFW extension = asyncapiEventExRW
.wrap(extensionBuffer, 0, extensionBuffer.capacity())
.unresolvedRef(e -> e
.typeId(UNRESOLVED_REF.value())
.ref(ref)
)
.build();
EventFW event = eventRW
.wrap(eventBuffer, 0, eventBuffer.capacity())
.id(unresolvedRef)
.timestamp(clock.millis())
.traceId(0L)
.namespacedId(bindingId)
.extension(extension.buffer(), extension.offset(), extension.limit())
.build();
eventWriter.accept(asyncapiTypeId, event.buffer(), event.offset(), event.limit());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2021-2024 Aklivity Inc
*
* Licensed under the Aklivity Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* https://www.aklivity.io/aklivity-community-license/
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.aklivity.zilla.runtime.binding.asyncapi.internal.event;

import org.agrona.DirectBuffer;

import io.aklivity.zilla.runtime.binding.asyncapi.internal.types.String16FW;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.types.event.AsyncapiEventExFW;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.types.event.AsyncapiUnresolvedRefExFW;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.types.event.EventFW;
import io.aklivity.zilla.runtime.engine.Configuration;
import io.aklivity.zilla.runtime.engine.event.EventFormatterSpi;

public final class AsyncapiEventFormatter implements EventFormatterSpi
{
private final EventFW eventRO = new EventFW();
private final AsyncapiEventExFW asyncapiEventExRO = new AsyncapiEventExFW();

AsyncapiEventFormatter(
Configuration config)
{
}

public String format(
DirectBuffer buffer,
int index,
int length)
{
final EventFW event = eventRO.wrap(buffer, index, index + length);
final AsyncapiEventExFW extension = asyncapiEventExRO
.wrap(event.extension().buffer(), event.extension().offset(), event.extension().limit());
String result = null;
switch (extension.kind())
{
case UNRESOLVED_REF:
{
AsyncapiUnresolvedRefExFW ex = extension.unresolvedRef();
result = String.format("Unresolved reference (%s).", asString(ex.ref()));
break;
}
}
return result;
}

private static String asString(
String16FW stringFW)
{
String s = stringFW.asString();
return s == null ? "" : s;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2021-2024 Aklivity Inc
*
* Licensed under the Aklivity Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* https://www.aklivity.io/aklivity-community-license/
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.aklivity.zilla.runtime.binding.asyncapi.internal.event;

import io.aklivity.zilla.runtime.binding.asyncapi.internal.AsyncapiBinding;
import io.aklivity.zilla.runtime.engine.Configuration;
import io.aklivity.zilla.runtime.engine.event.EventFormatterFactorySpi;

public final class AsyncapiEventFormatterFactory implements EventFormatterFactorySpi
{
@Override
public AsyncapiEventFormatter create(
Configuration config)
{
return new AsyncapiEventFormatter(config);
}

@Override
public String type()
{
return AsyncapiBinding.NAME;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
*/
package io.aklivity.zilla.runtime.binding.asyncapi.internal.model;


import java.io.Reader;
import java.lang.reflect.Type;

import jakarta.json.JsonObject;
Expand All @@ -35,30 +33,23 @@ public AsyncapiSchemaItem deserialize(
Type rtType) throws JsonbException
{
Jsonb jsonb = JsonbBuilder.create();
JsonParser.Event event = parser.next();
JsonObject value = parser.getObject();

AsyncapiSchemaItem schema = null;

if (event == JsonParser.Event.KEY_NAME)
if (value.containsKey("$ref"))
{
parser.next();
AsyncapiSchemaItem asyncapiSchemaItem = new AsyncapiSchemaItem();
asyncapiSchemaItem.ref = parser.getString();
schema = asyncapiSchemaItem;
AsyncapiSchemaItem schemaItem = new AsyncapiSchemaItem();
schemaItem.ref = value.getString("$ref");
schema = schemaItem;
}
else if (event == JsonParser.Event.START_OBJECT)
else if (value.containsKey("type"))
{
JsonObject accessor = parser.getObject();
String schemaFormat = accessor.getString("schemaFormat");

if (schemaFormat != null)
{
schema = jsonb.fromJson((Reader) accessor, AsyncapiMultiFormatSchema.class);
}
else
{
schema = jsonb.fromJson((Reader) accessor, AsyncapiSchema.class);
}
schema = jsonb.fromJson(value.toString(), AsyncapiSchema.class);
}
else if (value.containsKey("schemaFormat"))
{
schema = jsonb.fromJson(value.toString(), AsyncapiMultiFormatSchema.class);
}

return schema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package io.aklivity.zilla.runtime.binding.asyncapi.internal.model.resolver;

import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand All @@ -24,20 +25,28 @@ public abstract class AbstractAsyncapiResolver<T extends AbstractAsyncapiResolva
{
private final Map<String, T> resolvables;
private final Matcher matcher;
private final Set<String> unresolved;

public AbstractAsyncapiResolver(
Map<String, T> resolvables,
Pattern pattern)
Pattern pattern,
Set<String> unresolved)
{
this.resolvables = resolvables;
this.matcher = pattern.matcher("");
this.unresolved = unresolved;
}

public final T resolve(
T model)
{
final String key = resolveRef(model.ref);
return key != null ? resolvables.get(key) : model;
T candidate = key != null ? resolvables.getOrDefault(key, model) : model;
if (candidate.ref != null)
{
unresolved.add(candidate.ref);
}
return candidate;
}

public T resolve(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package io.aklivity.zilla.runtime.binding.asyncapi.internal.model.resolver;

import java.util.Set;
import java.util.regex.Pattern;

import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.Asyncapi;
Expand All @@ -22,8 +23,9 @@
public final class AsyncapiChannelResolver extends AbstractAsyncapiResolver<AsyncapiChannel>
{
public AsyncapiChannelResolver(
Asyncapi model)
Asyncapi model,
Set<String> unresolved)
{
super(model.channels, Pattern.compile("#/channels/(.+)"));
super(model.channels, Pattern.compile("#/channels/(.+)"), unresolved);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;

import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.Asyncapi;
Expand All @@ -24,12 +25,14 @@
public final class AsyncapiCorrelationIdResolver extends AbstractAsyncapiResolver<AsyncapiCorrelationId>
{
public AsyncapiCorrelationIdResolver(
Asyncapi model)
Asyncapi model,
Set<String> unresolved)
{
super(
Optional.ofNullable(model.components)
.map(c -> c.correlationIds)
.orElseGet(Map::of),
Pattern.compile("#/components/correlationIds/(.+)"));
Pattern.compile("#/components/correlationIds/(.+)"),
unresolved);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;

import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.Asyncapi;
Expand All @@ -24,12 +25,14 @@
public final class AsyncapiMessageResolver extends AbstractAsyncapiResolver<AsyncapiMessage>
{
public AsyncapiMessageResolver(
Asyncapi model)
Asyncapi model,
Set<String> unresolved)
{
super(
Optional.ofNullable(model.components)
.map(c -> c.messages)
.orElseGet(Map::of),
Pattern.compile("#/components/messages/(.+)"));
Pattern.compile("#/components/messages/(.+)"),
unresolved);
}
}
Loading
Loading