Skip to content

Commit da931c0

Browse files
authored
Overriding correlation-id via AsyncAPI Specification (#1482)
1 parent a7cdbfd commit da931c0

27 files changed

Lines changed: 637 additions & 20 deletions

File tree

runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/composite/AsyncapiProxyGenerator.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -545,9 +545,11 @@ private final class HttpKafkaBindingsHelper extends BindingsHelper
545545

546546
private static final Pattern HEADER_LOCATION_PATTERN = Pattern.compile("([^/]+)$");
547547
private static final Pattern PARAMETER_PATTERN = Pattern.compile("\\{([^}]+)\\}");
548+
private static final Pattern CORRELATION_HEADERS_NAME = Pattern.compile("\\$message\\.header#\\/(.+)");
548549

549550
private final Matcher headerLocation = HEADER_LOCATION_PATTERN.matcher("");
550551
private final Matcher parameters = PARAMETER_PATTERN.matcher("");
552+
private final Matcher correlation = CORRELATION_HEADERS_NAME.matcher("");
551553

552554
private final List<ProxyRouteHelper> httpKafkaRoutes;
553555

@@ -823,6 +825,16 @@ private <C> HttpKafkaWithProduceConfigBuilder<C> injectHttpKafkaRouteProduceWith
823825
produce.replyTo(kafkaOperation.reply.channel.address);
824826
}
825827

828+
AsyncapiMessageView messageView = kafkaOperation.messages.get(0);
829+
if (messageView.correlationId != null && messageView.correlationId.location != null)
830+
{
831+
String correlationId = messageView.correlationId.location;
832+
if (correlation.reset(correlationId).matches())
833+
{
834+
produce.correlationId(correlation.group(1));
835+
}
836+
}
837+
826838
AsyncapiHttpKafkaOperationBinding httpKafkaBinding = httpOperation.bindings.httpKafka;
827839
if (httpKafkaBinding != null)
828840
{

runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/config/HttpKafkaCorrelationConfig.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
*/
1515
package io.aklivity.zilla.runtime.binding.http.kafka.config;
1616

17+
import java.util.function.Function;
18+
1719
import io.aklivity.zilla.runtime.binding.http.kafka.internal.types.String16FW;
1820

1921
public final class HttpKafkaCorrelationConfig
@@ -28,4 +30,15 @@ public HttpKafkaCorrelationConfig(
2830
this.replyTo = replyTo;
2931
this.correlationId = correlationId;
3032
}
33+
34+
public static HttpKafkaCorrelationConfigBuilder<HttpKafkaCorrelationConfig> builder()
35+
{
36+
return new HttpKafkaCorrelationConfigBuilder<>(HttpKafkaCorrelationConfig.class::cast);
37+
}
38+
39+
public static <T> HttpKafkaCorrelationConfigBuilder<T> builder(
40+
Function<HttpKafkaCorrelationConfig, T> mapper)
41+
{
42+
return new HttpKafkaCorrelationConfigBuilder<>(mapper);
43+
}
3144
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright 2021-2024 Aklivity Inc
3+
*
4+
* Licensed under the Aklivity Community License (the "License"); you may not use
5+
* this file except in compliance with the License. You may obtain a copy of the
6+
* License at
7+
*
8+
* https://www.aklivity.io/aklivity-community-license/
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
13+
* specific language governing permissions and limitations under the License.
14+
*/
15+
package io.aklivity.zilla.runtime.binding.http.kafka.config;
16+
17+
import java.util.function.Function;
18+
19+
import io.aklivity.zilla.runtime.binding.http.kafka.internal.types.String16FW;
20+
import io.aklivity.zilla.runtime.engine.config.ConfigBuilder;
21+
22+
public final class HttpKafkaCorrelationConfigBuilder<T> extends ConfigBuilder<T, HttpKafkaCorrelationConfigBuilder<T>>
23+
{
24+
private static final String16FW CORRELATION_HEADERS_REPLY_TO_DEFAULT = new String16FW("zilla:reply-to");
25+
private static final String16FW CORRELATION_HEADERS_CORRELATION_ID_DEFAULT = new String16FW("zilla:correlation-id");
26+
27+
private final Function<HttpKafkaCorrelationConfig, T> mapper;
28+
29+
private String replyTo;
30+
private String correlationId;
31+
32+
HttpKafkaCorrelationConfigBuilder(
33+
Function<HttpKafkaCorrelationConfig, T> mapper)
34+
{
35+
this.mapper = mapper;
36+
}
37+
38+
public HttpKafkaCorrelationConfigBuilder<T> replyTo(
39+
String replyTo)
40+
{
41+
this.replyTo = replyTo;
42+
return this;
43+
}
44+
45+
public HttpKafkaCorrelationConfigBuilder<T> correlationId(
46+
String correlationId)
47+
{
48+
this.correlationId = correlationId;
49+
return this;
50+
}
51+
52+
@Override
53+
@SuppressWarnings("unchecked")
54+
protected Class<HttpKafkaCorrelationConfigBuilder<T>> thisType()
55+
{
56+
return (Class<HttpKafkaCorrelationConfigBuilder<T>>) getClass();
57+
}
58+
59+
@Override
60+
public T build()
61+
{
62+
String16FW replyTo = this.replyTo != null
63+
? new String16FW(this.replyTo)
64+
: CORRELATION_HEADERS_REPLY_TO_DEFAULT;
65+
String16FW correlationId = this.correlationId != null
66+
? new String16FW(this.correlationId)
67+
: CORRELATION_HEADERS_CORRELATION_ID_DEFAULT;
68+
return mapper.apply(new HttpKafkaCorrelationConfig(replyTo, correlationId));
69+
}
70+
}

runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/config/HttpKafkaIdempotencyConfig.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
*/
1515
package io.aklivity.zilla.runtime.binding.http.kafka.config;
1616

17+
import java.util.function.Function;
18+
1719
import io.aklivity.zilla.runtime.binding.http.kafka.internal.types.String8FW;
1820

1921
public final class HttpKafkaIdempotencyConfig
@@ -25,4 +27,15 @@ public HttpKafkaIdempotencyConfig(
2527
{
2628
this.header = header;
2729
}
30+
31+
public static HttpKafkaIdempotencyConfigBuilder<HttpKafkaIdempotencyConfig> builder()
32+
{
33+
return new HttpKafkaIdempotencyConfigBuilder<>(HttpKafkaIdempotencyConfig.class::cast);
34+
}
35+
36+
public static <T> HttpKafkaIdempotencyConfigBuilder<T> builder(
37+
Function<HttpKafkaIdempotencyConfig, T> mapper)
38+
{
39+
return new HttpKafkaIdempotencyConfigBuilder<>(mapper);
40+
}
2841
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright 2021-2024 Aklivity Inc
3+
*
4+
* Licensed under the Aklivity Community License (the "License"); you may not use
5+
* this file except in compliance with the License. You may obtain a copy of the
6+
* License at
7+
*
8+
* https://www.aklivity.io/aklivity-community-license/
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
13+
* specific language governing permissions and limitations under the License.
14+
*/
15+
package io.aklivity.zilla.runtime.binding.http.kafka.config;
16+
17+
import java.util.function.Function;
18+
19+
import io.aklivity.zilla.runtime.binding.http.kafka.internal.types.String8FW;
20+
import io.aklivity.zilla.runtime.engine.config.ConfigBuilder;
21+
22+
public final class HttpKafkaIdempotencyConfigBuilder<T> extends ConfigBuilder<T, HttpKafkaIdempotencyConfigBuilder<T>>
23+
{
24+
private static final String8FW IDEMPOTENCY_HEADER_DEFAULT = new String8FW("idempotency-key");
25+
26+
private final Function<HttpKafkaIdempotencyConfig, T> mapper;
27+
28+
private String header;
29+
30+
HttpKafkaIdempotencyConfigBuilder(
31+
Function<HttpKafkaIdempotencyConfig, T> mapper)
32+
{
33+
this.mapper = mapper;
34+
}
35+
36+
public HttpKafkaIdempotencyConfigBuilder<T> header(
37+
String header)
38+
{
39+
this.header = header;
40+
return this;
41+
}
42+
43+
@Override
44+
@SuppressWarnings("unchecked")
45+
protected Class<HttpKafkaIdempotencyConfigBuilder<T>> thisType()
46+
{
47+
return (Class<HttpKafkaIdempotencyConfigBuilder<T>>) getClass();
48+
}
49+
50+
@Override
51+
public T build()
52+
{
53+
String8FW header = this.header != null ? new String8FW(this.header) : IDEMPOTENCY_HEADER_DEFAULT;
54+
return mapper.apply(new HttpKafkaIdempotencyConfig(header));
55+
}
56+
}

runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/config/HttpKafkaOptionsConfig.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
*/
1515
package io.aklivity.zilla.runtime.binding.http.kafka.config;
1616

17+
import java.util.function.Function;
18+
1719
import io.aklivity.zilla.runtime.engine.config.OptionsConfig;
1820

1921
public final class HttpKafkaOptionsConfig extends OptionsConfig
@@ -28,4 +30,15 @@ public HttpKafkaOptionsConfig(
2830
this.idempotency = idempotency;
2931
this.correlation = correlation;
3032
}
33+
34+
public static HttpKafkaOptionsConfigBuilder<HttpKafkaOptionsConfig> builder()
35+
{
36+
return new HttpKafkaOptionsConfigBuilder<>(HttpKafkaOptionsConfig.class::cast);
37+
}
38+
39+
public static <T> HttpKafkaOptionsConfigBuilder<T> builder(
40+
Function<OptionsConfig, T> mapper)
41+
{
42+
return new HttpKafkaOptionsConfigBuilder<>(mapper);
43+
}
3144
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright 2021-2024 Aklivity Inc
3+
*
4+
* Licensed under the Aklivity Community License (the "License"); you may not use
5+
* this file except in compliance with the License. You may obtain a copy of the
6+
* License at
7+
*
8+
* https://www.aklivity.io/aklivity-community-license/
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
13+
* specific language governing permissions and limitations under the License.
14+
*/
15+
package io.aklivity.zilla.runtime.binding.http.kafka.config;
16+
17+
import java.util.function.Function;
18+
19+
import io.aklivity.zilla.runtime.engine.config.ConfigBuilder;
20+
import io.aklivity.zilla.runtime.engine.config.OptionsConfig;
21+
22+
public final class HttpKafkaOptionsConfigBuilder<T> extends ConfigBuilder<T, HttpKafkaOptionsConfigBuilder<T>>
23+
{
24+
private final Function<OptionsConfig, T> mapper;
25+
26+
private HttpKafkaCorrelationConfig correlation;
27+
private HttpKafkaIdempotencyConfig idempotency;
28+
29+
HttpKafkaOptionsConfigBuilder(
30+
Function<OptionsConfig, T> mapper)
31+
{
32+
this.mapper = mapper;
33+
}
34+
35+
public HttpKafkaCorrelationConfigBuilder<HttpKafkaOptionsConfigBuilder<T>> correlation()
36+
{
37+
return new HttpKafkaCorrelationConfigBuilder<>(this::correlation);
38+
}
39+
40+
public HttpKafkaIdempotencyConfigBuilder<HttpKafkaOptionsConfigBuilder<T>> idempotency()
41+
{
42+
return new HttpKafkaIdempotencyConfigBuilder<>(this::idempotency);
43+
}
44+
45+
private HttpKafkaOptionsConfigBuilder<T> correlation(
46+
HttpKafkaCorrelationConfig correlation)
47+
{
48+
this.correlation = correlation;
49+
return this;
50+
}
51+
52+
private HttpKafkaOptionsConfigBuilder<T> idempotency(
53+
HttpKafkaIdempotencyConfig idempotency)
54+
{
55+
this.idempotency = idempotency;
56+
return this;
57+
}
58+
59+
@Override
60+
@SuppressWarnings("unchecked")
61+
protected Class<HttpKafkaOptionsConfigBuilder<T>> thisType()
62+
{
63+
return (Class<HttpKafkaOptionsConfigBuilder<T>>) getClass();
64+
}
65+
66+
@Override
67+
public T build()
68+
{
69+
return mapper.apply(new HttpKafkaOptionsConfig(idempotency, correlation));
70+
}
71+
}

runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/config/HttpKafkaWithProduceConfig.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public final class HttpKafkaWithProduceConfig
3434
public final Optional<List<HttpKafkaWithProduceOverrideConfig>> overrides;
3535
public final Optional<String> replyTo;
3636
public final Optional<List<HttpKafkaWithProduceAsyncHeaderConfig>> async;
37+
public final String16FW correlationId;
3738

3839
private final List<Matcher> asyncMatchers;
3940

@@ -43,13 +44,15 @@ public final class HttpKafkaWithProduceConfig
4344
String key,
4445
List<HttpKafkaWithProduceOverrideConfig> overrides,
4546
String replyTo,
47+
String16FW correlationId,
4648
List<HttpKafkaWithProduceAsyncHeaderConfig> async)
4749
{
4850
this.topic = topic;
4951
this.acks = acks;
5052
this.key = Optional.ofNullable(key);
5153
this.overrides = Optional.ofNullable(overrides);
5254
this.replyTo = Optional.ofNullable(replyTo);
55+
this.correlationId = correlationId;
5356
this.async = Optional.ofNullable(async);
5457

5558
this.asyncMatchers = this.async.isPresent()

runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/config/HttpKafkaWithProduceConfigBuilder.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.function.Function;
2121

2222
import io.aklivity.zilla.runtime.binding.http.kafka.internal.types.KafkaAckMode;
23+
import io.aklivity.zilla.runtime.binding.http.kafka.internal.types.String16FW;
2324
import io.aklivity.zilla.runtime.engine.config.ConfigBuilder;
2425

2526
public final class HttpKafkaWithProduceConfigBuilder<T> extends ConfigBuilder<T, HttpKafkaWithProduceConfigBuilder<T>>
@@ -31,6 +32,7 @@ public final class HttpKafkaWithProduceConfigBuilder<T> extends ConfigBuilder<T,
3132
private List<HttpKafkaWithProduceOverrideConfig> overrides;
3233
private String replyTo;
3334
private List<HttpKafkaWithProduceAsyncHeaderConfig> async;
35+
private String16FW correlationId;
3436

3537

3638
HttpKafkaWithProduceConfigBuilder(
@@ -79,6 +81,13 @@ public HttpKafkaWithProduceConfigBuilder<T> replyTo(
7981
return this;
8082
}
8183

84+
public HttpKafkaWithProduceConfigBuilder<T> correlationId(
85+
String correlationId)
86+
{
87+
this.correlationId = correlationId != null ? new String16FW(correlationId) : null;
88+
return this;
89+
}
90+
8291
public HttpKafkaWithProduceAsyncHeaderConfigBuilder<HttpKafkaWithProduceConfigBuilder<T>> async()
8392
{
8493
return HttpKafkaWithProduceAsyncHeaderConfig.builder(this::async);
@@ -115,7 +124,7 @@ protected Class<HttpKafkaWithProduceConfigBuilder<T>> thisType()
115124
@Override
116125
public T build()
117126
{
118-
return mapper.apply(new HttpKafkaWithProduceConfig(topic, acks, key, overrides, replyTo, async));
127+
return mapper.apply(new HttpKafkaWithProduceConfig(topic, acks, key, overrides, replyTo, correlationId, async));
119128
}
120129

121130
private HttpKafkaWithProduceConfigBuilder<T> override(

0 commit comments

Comments
 (0)