diff --git a/charts/faaast-service/Chart.yaml b/charts/faaast-service/Chart.yaml index 877a9e71e..63f81d7e8 100644 --- a/charts/faaast-service/Chart.yaml +++ b/charts/faaast-service/Chart.yaml @@ -11,7 +11,7 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 1.0.0 +version: 1.0.1 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to diff --git a/charts/faaast-service/values.full.yaml b/charts/faaast-service/values.full.yaml index 572bb3ecc..049bc54fe 100644 --- a/charts/faaast-service/values.full.yaml +++ b/charts/faaast-service/values.full.yaml @@ -66,7 +66,6 @@ core: assetConnectionReadMaxThreadPoolSize: 1000 assetConnectionReadTimeout: 5000 assetConnectionWriteMaxThreadPoolSize: 1000 - callbackAddress: "https://invalid.local:443/mypath" requestHandlerThreadPoolSize: 2 submodelRegistries: - "http://example.com/MySubmodelRegistry" @@ -137,7 +136,7 @@ endpoints: corsEnabled: true corsExposedHeaders: "X-Custom-Header" corsMaxAge: 1000 - hostname: "localhost" + hostname: "https://example.invalid" httpVersion: "HTTP_2" pathPrefix: "/api/v3.0" includeErrorDetails: true diff --git a/charts/faaast-service/values.yaml b/charts/faaast-service/values.yaml index 5b5c32c13..b3b88848b 100644 --- a/charts/faaast-service/values.yaml +++ b/charts/faaast-service/values.yaml @@ -46,7 +46,6 @@ core: {} # core: # operationTimeout: 60000 # ms, 0 = no timeout # assetConnectionReadTimeout: 30000 # ms -# callbackAddress: "https://faaast.example.com" # validationOnLoad: # validateConstraints: true # idShortUniqueness: true diff --git a/core/src/main/java/de/fraunhofer/iosb/ilt/faaast/service/Service.java b/core/src/main/java/de/fraunhofer/iosb/ilt/faaast/service/Service.java index 0329f87e8..cdf23a92e 100644 --- a/core/src/main/java/de/fraunhofer/iosb/ilt/faaast/service/Service.java +++ b/core/src/main/java/de/fraunhofer/iosb/ilt/faaast/service/Service.java @@ -200,6 +200,12 @@ public AssetConnectionManager getAssetConnectionManager() { } + @Override + public List getEndpoints() { + return endpoints; + } + + public ServiceConfig getConfig() { return config; } diff --git a/core/src/main/java/de/fraunhofer/iosb/ilt/faaast/service/ServiceContext.java b/core/src/main/java/de/fraunhofer/iosb/ilt/faaast/service/ServiceContext.java index 04a0e3a9b..474f33938 100644 --- a/core/src/main/java/de/fraunhofer/iosb/ilt/faaast/service/ServiceContext.java +++ b/core/src/main/java/de/fraunhofer/iosb/ilt/faaast/service/ServiceContext.java @@ -24,6 +24,7 @@ import de.fraunhofer.iosb.ilt.faaast.service.model.exception.ResourceNotFoundException; import de.fraunhofer.iosb.ilt.faaast.service.persistence.Persistence; import de.fraunhofer.iosb.ilt.faaast.service.typing.TypeInfo; +import java.util.List; import org.eclipse.digitaltwin.aas4j.v3.model.Reference; @@ -98,4 +99,12 @@ public default T execute(Request request) { * @return the AssetConnectionManager of the service */ public AssetConnectionManager getAssetConnectionManager(); + + + /** + * Returns the endpoints of the service. + * + * @return the endpoints of the service + */ + public List getEndpoints(); } diff --git a/core/src/main/java/de/fraunhofer/iosb/ilt/faaast/service/config/CoreConfig.java b/core/src/main/java/de/fraunhofer/iosb/ilt/faaast/service/config/CoreConfig.java index 3353b563b..186ccd440 100644 --- a/core/src/main/java/de/fraunhofer/iosb/ilt/faaast/service/config/CoreConfig.java +++ b/core/src/main/java/de/fraunhofer/iosb/ilt/faaast/service/config/CoreConfig.java @@ -53,7 +53,6 @@ public class CoreConfig { private RegistrySynchronizationConfig registrySynchronization; private double minInflateRatio; private long operationTimeout; - private String callbackAddress; public CoreConfig() { this.assetConnectionRetryInterval = DEFAULT_ASSET_CONNECTION_RETRY_INTERVAL; @@ -238,16 +237,6 @@ public void setOperationTimeout(long operationTimeout) { } - public String getCallbackAddress() { - return callbackAddress; - } - - - public void setCallbackAddress(String callbackAddress) { - this.callbackAddress = callbackAddress; - } - - @Override public int hashCode() { return Objects.hash(assetConnectionRetryInterval, @@ -262,8 +251,7 @@ public int hashCode() { submodelRegistries, registrySynchronization, minInflateRatio, - operationTimeout, - callbackAddress); + operationTimeout); } @@ -291,8 +279,7 @@ public boolean equals(Object obj) { && Objects.equals(this.submodelRegistries, other.submodelRegistries) && Objects.equals(this.registrySynchronization, other.registrySynchronization) && Objects.equals(this.minInflateRatio, other.minInflateRatio) - && Objects.equals(this.operationTimeout, other.operationTimeout) - && Objects.equals(callbackAddress, other.callbackAddress); + && Objects.equals(this.operationTimeout, other.operationTimeout); } public static class Builder extends ExtendableBuilder { @@ -411,12 +398,6 @@ public Builder operationTimeout(long value) { } - public Builder callbackAddress(String value) { - getBuildingInstance().setCallbackAddress(value); - return getSelf(); - } - - @Override protected Builder getSelf() { return this; diff --git a/core/src/main/java/de/fraunhofer/iosb/ilt/faaast/service/util/EncodingHelper.java b/core/src/main/java/de/fraunhofer/iosb/ilt/faaast/service/util/EncodingHelper.java index 577ed1451..71729ac53 100644 --- a/core/src/main/java/de/fraunhofer/iosb/ilt/faaast/service/util/EncodingHelper.java +++ b/core/src/main/java/de/fraunhofer/iosb/ilt/faaast/service/util/EncodingHelper.java @@ -80,7 +80,9 @@ public static String base64Encode(String value) { * @return encoded value */ public static byte[] base64Encode(byte[] value) { - return Base64.getEncoder().encode(value); + return Objects.nonNull(value) + ? Base64.getEncoder().encode(value) + : new byte[0]; } diff --git a/docs/source/basics/configuration.md b/docs/source/basics/configuration.md index 6ee675843..c157383f0 100644 --- a/docs/source/basics/configuration.md +++ b/docs/source/basics/configuration.md @@ -44,7 +44,6 @@ The `core` configuration block contains properties not related to the implementa | assetConnectionReadTimeout
*(optional)* | Long | Timeout in ms for reading all asset connections for a single AAS command | 5000 | | assetConnectionRetryInterval
*(optional)* | Long | Interval in ms in which to retry establishing asset connections | 1000 | | assetConnectionWriteMaxThreadPoolSize
*(optional)* | Integer | Size of thread pool used to write to asset connections | 1000 | -| callbackAddress
*(optional)* | String | The external URI the FA³ST Service is reachable from. Used in registry synchronization and cloud events message bus. | | | minInflateRatio
*(optional)* | Double | Ratio between de- and inflated bytes to detect zipbomb when loading AASX files | 0.001 | | operationTimeout
*(optional)* | Long | Timeout in ms for executing AAS operations. Set to 0 for no timeout. | 3600000 | | requestHandlerThreadPoolSize
*(optional)* | Integer | Number of concurrent thread that can execute API requests | 2 | diff --git a/docs/source/interfaces/endpoint.md b/docs/source/interfaces/endpoint.md index 7ebb710a4..ef5dced98 100644 --- a/docs/source/interfaces/endpoint.md +++ b/docs/source/interfaces/endpoint.md @@ -46,26 +46,26 @@ The HTTP Endpoint is based on the document [Details of the Asset Administration ### Configuration :::{table} Configuration properties of HTTP Endpoint. -| Name | Allowed Value | Description | Default Value | -| --------------------------------------- | ----------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------- | -| certificate
*(optional)* | [CertificateInfo](#providing-certificates-in-configuration) | The HTTPS certificate to use.
| self-signed certificate | -| corsAllowCredentials
*(optional)* | Boolean | Sets the `Access-Control-Allow-Credentials` response header. | false | -| corsAllowedHeaders
*(optional)* | String (comma-separated list) | Sets the `Access-Control-Allow-Headers` response header. | * | -| corsAllowedMethods
*(optional)* | String (comma-separated list) | Sets the `Access-Control-Allow-Methods` response header. | GET, POST, HEAD | -| corsAllowedOrigin
*(optional)* | String | Sets the `Access-Control-Allow-Origin` response header. | * | -| corsEnabled
*(optional)* | Boolean | If Cross-Origin Resource Sharing (CORS) should be enabled.
Typically required if you want to access the REST interface from any machine other than the one running FA³ST Service. | false | -| corsExposedHeaders
*(optional)* | String (comma-separated list) | Sets the `Access-Control-Expose-Headers` response header. | | -| corsMaxAge
*(optional)* | Long | Sets the `Access-Control-Max-Age` response header. | 3600 | -| hostname
*(optional)* | String | The hostname to be used for automated registration with registry. | auto-detect (typically IP address) | -| httpVersion
*(optional)* | HTTP_1_1
HTTP_2 | The HTTP version to use for the connection. If HTTP 2 is used but not supported by the server, the connection will automatically downgrade. | HTTP_2 | -| pathPrefix
*(optional)* | String | The path prefix to be used for automatic registration with registry. Must start with a "/" and not end with a "/". Exceptions: "" and "/". (regex: `^(?:$\|/\|/.*[^/])$`) | /api/v3.0 | -| includeErrorDetails
*(optional)* | Boolean | If set, stack trace is added to the HTTP responses incase of error. | false | -| port
*(optional)* | Integer | The port to use. | 443 | -| sniEnabled
*(optional)* | Boolean | If Server Name Identification (SNI) should be enabled.
**This should only be disabled for testing purposes as it may present a security risk!** | true | -| sslEnabled
*(optional)* | Boolean | If SSL/HTTPS should be enabled.
**This should only be disabled for testing purposes as it may present a security risk!** | true | -| subprotocol
*(optional)* | String | The subprotocol to be used for automatic registration with registry. Limited to a maximum of 128 characters. | | -| subprotocolBody
*(optional)* | String | The subprotocol body to be used for automatic registration with registry. Limited to a maximum of 128 characters.
Supports templating: any "${id}" in the given subprotocolBody will be replaced with the AAS/submodel identifier. | | -| subprotocolBodyEncoding
*(optional)* | String | The subprotocol body encoding to be used for automatic registration with registry. | | +| Name | Allowed Value | Description | Default Value | +| --------------------------------------- | ----------------------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------- | +| certificate
*(optional)* | [CertificateInfo](#providing-certificates-in-configuration) | The HTTPS certificate to use.
| self-signed certificate | +| corsAllowCredentials
*(optional)* | Boolean | Sets the `Access-Control-Allow-Credentials` response header. | false | +| corsAllowedHeaders
*(optional)* | String (comma-separated list) | Sets the `Access-Control-Allow-Headers` response header. | * | +| corsAllowedMethods
*(optional)* | String (comma-separated list) | Sets the `Access-Control-Allow-Methods` response header. | GET, POST, HEAD | +| corsAllowedOrigin
*(optional)* | String | Sets the `Access-Control-Allow-Origin` response header. | * | +| corsEnabled
*(optional)* | Boolean | If Cross-Origin Resource Sharing (CORS) should be enabled.
Typically required if you want to access the REST interface from any machine other than the one running FA³ST Service. | false | +| corsExposedHeaders
*(optional)* | String (comma-separated list) | Sets the `Access-Control-Expose-Headers` response header. | | +| corsMaxAge
*(optional)* | Long | Sets the `Access-Control-Max-Age` response header. | 3600 | +| hostname
*(optional)* | String | The external URI the FA³ST Service is reachable from (e.g., Kubernetes Ingress URL). Used in registry synchronization and MessageBusCloudEvents. If this is defined, no additional scheme or port will be added. Example: | auto-detect (typically IP address) | +| httpVersion
*(optional)* | HTTP_1_1
HTTP_2 | The HTTP version to use for the connection. If HTTP 2 is used but not supported by the server, the connection will automatically downgrade. | HTTP_2 | +| pathPrefix
*(optional)* | String | The path prefix to be used for automatic registration with registry. Must start with a "/" and not end with a "/". Exceptions: "" and "/". (regex: `^(?:$\|/\|/.*[^/])$`) | /api/v3.0 | +| includeErrorDetails
*(optional)* | Boolean | If set, stack trace is added to the HTTP responses incase of error. | false | +| port
*(optional)* | Integer | The port to use. | 443 | +| sniEnabled
*(optional)* | Boolean | If Server Name Identification (SNI) should be enabled.
**This should only be disabled for testing purposes as it may present a security risk!** | true | +| sslEnabled
*(optional)* | Boolean | If SSL/HTTPS should be enabled.
**This should only be disabled for testing purposes as it may present a security risk!** | true | +| subprotocol
*(optional)* | String | The subprotocol to be used for automatic registration with registry. Limited to a maximum of 128 characters. | | +| subprotocolBody
*(optional)* | String | The subprotocol body to be used for automatic registration with registry. Limited to a maximum of 128 characters.
Supports templating: any "${id}" in the given subprotocolBody will be replaced with the AAS/submodel identifier. | | +| subprotocolBodyEncoding
*(optional)* | String | The subprotocol body encoding to be used for automatic registration with registry. | | ::: ```{code-block} json @@ -88,7 +88,7 @@ The HTTP Endpoint is based on the document [Details of the Asset Administration "corsEnabled": true, "corsExposedHeaders": "X-Custom-Header", "corsMaxAge": 1000, - "hostname": "localhost", + "hostname": "https://my-domain.example/path", "httpVersion": "HTTP_2", "pathPrefix": "/api/v3.0", "includeErrorDetails": true, diff --git a/docs/source/interfaces/message-bus.md b/docs/source/interfaces/message-bus.md index 722adb04e..11f685f86 100644 --- a/docs/source/interfaces/message-bus.md +++ b/docs/source/interfaces/message-bus.md @@ -173,7 +173,9 @@ This implementation of the `MessageBus` interface publishes CloudEvent messages Each message is published under the topic defined in `[topicPrefix]`. The payload is a JSON serialization of a CloudEvent as specified in the async-aas specification: [https://factory-x-contributions.github.io/async-aas-helm](https://factory-x-contributions.github.io/async-aas-helm) -Note: To modify the URL in a CloudEvent's `source` field, the configuration `core.callbackAddress` needs to be added. Else, `localhost` will be used. +Note: To modify the URL in a CloudEvent's `source` field, `HttpEndpointConfig.hostname` needs to be configured. Else, `http(s)://(localhost|127.0.0.1|local-ip-address):(port)` will be used, depending on the rest of the HttpEndpoint's config and your system. If multiple HttpEndpoints are defined, the MessageBus will try to select one with a non-local hostname. + +Note: MessageBusCloudEvents requires the definition of a HttpEndpoint in the FA³ST configuration. ### Configuration diff --git a/docs/source/other/release-notes.md b/docs/source/other/release-notes.md index 4ab0936b4..2503b685b 100644 --- a/docs/source/other/release-notes.md +++ b/docs/source/other/release-notes.md @@ -24,6 +24,7 @@ - Treat headers case-insenstive (see RFC 2616) - OperationProvider now supports use of input arguments via variables in headers - Add support for runtime update of OperationProvider + - Consolidated temporarily introduced `CoreConfig.callbackAddress` into `HttpEndpointConfig.hostname`. If `MessageBusCloudEvents` is used in combination with multiple `HttpEndpoints`, one of them will be chosen to fill a `CloudEvent`'s `source` URI. The implementation will prefer non-local hostnames (i.e. **not** `localhost`/`127.0.0.1`). **Internal changes & bugfixes** - General diff --git a/endpoint/http/src/main/java/de/fraunhofer/iosb/ilt/faaast/service/endpoint/http/HttpEndpoint.java b/endpoint/http/src/main/java/de/fraunhofer/iosb/ilt/faaast/service/endpoint/http/HttpEndpoint.java index 1bdbdf2b8..6a89cda0b 100644 --- a/endpoint/http/src/main/java/de/fraunhofer/iosb/ilt/faaast/service/endpoint/http/HttpEndpoint.java +++ b/endpoint/http/src/main/java/de/fraunhofer/iosb/ilt/faaast/service/endpoint/http/HttpEndpoint.java @@ -16,17 +16,16 @@ import static de.fraunhofer.iosb.ilt.faaast.service.certificate.util.KeyStoreHelper.DEFAULT_ALIAS; -import de.fraunhofer.iosb.ilt.faaast.service.ServiceContext; import de.fraunhofer.iosb.ilt.faaast.service.certificate.CertificateData; import de.fraunhofer.iosb.ilt.faaast.service.certificate.CertificateInformation; import de.fraunhofer.iosb.ilt.faaast.service.certificate.util.KeyStoreHelper; -import de.fraunhofer.iosb.ilt.faaast.service.config.CoreConfig; import de.fraunhofer.iosb.ilt.faaast.service.endpoint.AbstractEndpoint; import de.fraunhofer.iosb.ilt.faaast.service.endpoint.http.util.HttpHelper; import de.fraunhofer.iosb.ilt.faaast.service.exception.EndpointException; import de.fraunhofer.iosb.ilt.faaast.service.model.Interface; import de.fraunhofer.iosb.ilt.faaast.service.model.Version; import de.fraunhofer.iosb.ilt.faaast.service.util.EncodingHelper; +import de.fraunhofer.iosb.ilt.faaast.service.util.HostnameUtil; import java.io.File; import java.io.IOException; import java.net.URI; @@ -79,7 +78,6 @@ public class HttpEndpoint extends AbstractEndpoint { private static final String ENDPOINT_PROTOCOL = "HTTP"; private static final String ENDPOINT_PROTOCOL_VERSION = "1.1"; private Server server; - private String callbackAddress; @Override public HttpEndpointConfig asConfig() { @@ -124,13 +122,6 @@ public void start() throws EndpointException { } - @Override - public void init(CoreConfig coreConfig, HttpEndpointConfig config, ServiceContext serviceContext) { - callbackAddress = coreConfig.getCallbackAddress(); - super.init(coreConfig, config, serviceContext); - } - - private void configureHttpServer() throws EndpointException { HttpConfiguration httpConfig = new HttpConfiguration(); httpConfig.setSendServerVersion(false); @@ -271,6 +262,42 @@ public List getSubmodelEndpoint } + /** + * Return the address this endpoint is reachable from. Consists of hostname + pathPrefix if hostname is defined, else + * HostnameUtil.getHostname() + port + path + * + * @return the internet address of this endpoint + */ + public URI getEndpointUri() { + URI result = server.getURI(); + try { + if (Objects.nonNull(config.getHostname())) { + result = buildUri( + config.getHostname(), + // server URI path comes before configured prefix + result.getPath(), + config.getPathPrefix()); + } + else { + result = new URI( + result.getScheme(), + result.getUserInfo(), + HostnameUtil.getHostname(), + result.getPort(), + // server URI path comes before configured prefix + result.getPath().concat(config.getPathPrefix()), + result.getQuery(), + result.getFragment()); + } + } + catch (URISyntaxException e) { + LOGGER.error("error creating endpoint URI for HTTP endpoint based on hostname from configuration (hostname: {}): {}", + config.getHostname(), e.getMessage()); + } + return result; + } + + private org.eclipse.digitaltwin.aas4j.v3.model.Endpoint endpointFor(Interface iface, String path, String identifiableId) { URI endpointUri = buildUri(getEndpointUri().toString(), path); @@ -279,7 +306,7 @@ private org.eclipse.digitaltwin.aas4j.v3.model.Endpoint endpointFor(Interface if } return new DefaultEndpoint.Builder() - ._interface(String.format("%s-%d.%d", iface, API_VERSION.getMajor(), API_VERSION.getMinor())) + ._interface(String.format("%s-%d.%d", iface.getName(), API_VERSION.getMajor(), API_VERSION.getMinor())) .protocolInformation(new DefaultProtocolInformation.Builder() .href(endpointUri.toASCIIString()) .endpointProtocol(ENDPOINT_PROTOCOL) @@ -305,36 +332,6 @@ private String render(String subprotocolBodyTemplate, String identifiableId) { } - private URI getEndpointUri() { - URI result = server.getURI(); - try { - if (Objects.nonNull(callbackAddress)) { - result = buildUri( - callbackAddress, - // server URI path comes before configured prefix - result.getPath(), - config.getPathPrefix()); - } - else if (Objects.nonNull(config.getHostname())) { - result = new URI( - result.getScheme(), - result.getUserInfo(), - config.getHostname(), - result.getPort(), - // server URI path comes before configured prefix - result.getPath().concat(config.getPathPrefix()), - result.getQuery(), - result.getFragment()); - } - } - catch (URISyntaxException e) { - LOGGER.error("error creating endpoint URI for HTTP endpoint based on hostname from configuration (callbackAddress: {}, hostname: {}): {}", - callbackAddress, config.getHostname(), e.getMessage()); - } - return result; - } - - private URI buildUri(String base, String... paths) { String safeBase = base.endsWith("/") ? base : base.concat("/"); diff --git a/endpoint/http/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/endpoint/http/AbstractHttpEndpointTest.java b/endpoint/http/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/endpoint/http/AbstractHttpEndpointTest.java index 319e050e0..af01288c6 100644 --- a/endpoint/http/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/endpoint/http/AbstractHttpEndpointTest.java +++ b/endpoint/http/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/endpoint/http/AbstractHttpEndpointTest.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.core.type.TypeReference; import de.fraunhofer.iosb.ilt.faaast.service.Service; -import de.fraunhofer.iosb.ilt.faaast.service.config.CoreConfig; import de.fraunhofer.iosb.ilt.faaast.service.dataformat.DeserializationException; import de.fraunhofer.iosb.ilt.faaast.service.endpoint.http.request.mapper.QueryParameters; import de.fraunhofer.iosb.ilt.faaast.service.endpoint.http.serialization.HttpJsonApiDeserializer; @@ -133,7 +132,6 @@ public abstract class AbstractHttpEndpointTest { protected static HttpJsonApiDeserializer deserializer; protected static HttpJsonApiSerializer serializer; protected static Server server; - protected static CoreConfig coreConfig = CoreConfig.builder().callbackAddress("http://invalid.local").build(); @Before public void setUp() { @@ -302,14 +300,14 @@ public void testBogusAndMissingQueryValue() throws Exception { @Test - public void testGetAasEndpointInformationWithCallbackAddress() { + public void testGetAasEndpointInformationWithCustomHostname() { List actual = endpoint.getAasEndpointInformation(UUID.randomUUID().toString()); ProtocolInformation protocolInformation = actual.get(0).getProtocolInformation(); HttpEndpointConfig config = endpoint.asConfig(); - Assert.assertEquals(coreConfig.getCallbackAddress().concat(endpoint.getPathPrefix()).concat("/shells"), protocolInformation.getHref()); + Assert.assertEquals(config.getHostname().concat(endpoint.getPathPrefix()).concat("/shells"), protocolInformation.getHref()); Assert.assertEquals(config.getSubprotocol(), protocolInformation.getSubprotocol()); Assert.assertEquals(config.getSubprotocolBody(), protocolInformation.getSubprotocolBody()); Assert.assertEquals(config.getSubprotocolBodyEncoding(), protocolInformation.getSubprotocolBodyEncoding()); diff --git a/endpoint/http/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/endpoint/http/HttpEndpointWithAutoGeneratedCertificateTest.java b/endpoint/http/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/endpoint/http/HttpEndpointWithAutoGeneratedCertificateTest.java index 096c7e623..768b7bf6e 100644 --- a/endpoint/http/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/endpoint/http/HttpEndpointWithAutoGeneratedCertificateTest.java +++ b/endpoint/http/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/endpoint/http/HttpEndpointWithAutoGeneratedCertificateTest.java @@ -18,6 +18,7 @@ import static org.mockito.Mockito.spy; import de.fraunhofer.iosb.ilt.faaast.service.Service; +import de.fraunhofer.iosb.ilt.faaast.service.config.CoreConfig; import de.fraunhofer.iosb.ilt.faaast.service.endpoint.http.serialization.HttpJsonApiDeserializer; import de.fraunhofer.iosb.ilt.faaast.service.filestorage.FileStorage; import de.fraunhofer.iosb.ilt.faaast.service.messagebus.MessageBus; @@ -50,10 +51,11 @@ private static void startServer() throws Exception { scheme = HttpScheme.HTTPS.toString(); endpoint = new HttpEndpoint(); server = new Server(); - service = spy(new Service(coreConfig, persistence, fileStorage, mock(MessageBus.class), List.of(endpoint), List.of(), List.of())); + service = spy(new Service(CoreConfig.DEFAULT, persistence, fileStorage, mock(MessageBus.class), List.of(endpoint), List.of(), List.of())); endpoint.init( - coreConfig, + CoreConfig.DEFAULT, HttpEndpointConfig.builder() + .hostname(HOST) .port(port) .cors(true) .build(), diff --git a/endpoint/http/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/endpoint/http/HttpEndpointWithExtendedEndpointInformationTest.java b/endpoint/http/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/endpoint/http/HttpEndpointWithExtendedEndpointInformationTest.java index 2032b83a4..ce5648bfd 100644 --- a/endpoint/http/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/endpoint/http/HttpEndpointWithExtendedEndpointInformationTest.java +++ b/endpoint/http/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/endpoint/http/HttpEndpointWithExtendedEndpointInformationTest.java @@ -18,6 +18,7 @@ import static org.mockito.Mockito.spy; import de.fraunhofer.iosb.ilt.faaast.service.Service; +import de.fraunhofer.iosb.ilt.faaast.service.config.CoreConfig; import de.fraunhofer.iosb.ilt.faaast.service.endpoint.http.serialization.HttpJsonApiDeserializer; import de.fraunhofer.iosb.ilt.faaast.service.filestorage.FileStorage; import de.fraunhofer.iosb.ilt.faaast.service.messagebus.MessageBus; @@ -49,10 +50,11 @@ private static void startServer() throws Exception { scheme = HttpScheme.HTTP.toString(); endpoint = new HttpEndpoint(); server = new Server(); - service = spy(new Service(coreConfig, persistence, fileStorage, mock(MessageBus.class), List.of(endpoint), List.of(), List.of())); + service = spy(new Service(CoreConfig.DEFAULT, persistence, fileStorage, mock(MessageBus.class), List.of(endpoint), List.of(), List.of())); endpoint.init( - coreConfig, + CoreConfig.DEFAULT, HttpEndpointConfig.builder() + .hostname(HOST) .port(port) .cors(true) .ssl(false) diff --git a/endpoint/http/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/endpoint/http/HttpEndpointWithProvidedCertificateTest.java b/endpoint/http/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/endpoint/http/HttpEndpointWithProvidedCertificateTest.java index ec678c25d..f251cc3f3 100644 --- a/endpoint/http/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/endpoint/http/HttpEndpointWithProvidedCertificateTest.java +++ b/endpoint/http/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/endpoint/http/HttpEndpointWithProvidedCertificateTest.java @@ -21,6 +21,7 @@ import de.fraunhofer.iosb.ilt.faaast.service.certificate.CertificateInformation; import de.fraunhofer.iosb.ilt.faaast.service.certificate.util.KeyStoreHelper; import de.fraunhofer.iosb.ilt.faaast.service.config.CertificateConfig; +import de.fraunhofer.iosb.ilt.faaast.service.config.CoreConfig; import de.fraunhofer.iosb.ilt.faaast.service.filestorage.FileStorage; import de.fraunhofer.iosb.ilt.faaast.service.messagebus.MessageBus; import de.fraunhofer.iosb.ilt.faaast.service.persistence.Persistence; @@ -79,10 +80,11 @@ private static void startServer() throws Exception { scheme = HttpScheme.HTTPS.toString(); endpoint = new HttpEndpoint(); server = new Server(); - service = spy(new Service(coreConfig, persistence, fileStorage, mock(MessageBus.class), List.of(endpoint), List.of(), List.of())); + service = spy(new Service(CoreConfig.DEFAULT, persistence, fileStorage, mock(MessageBus.class), List.of(endpoint), List.of(), List.of())); endpoint.init( - coreConfig, + CoreConfig.DEFAULT, HttpEndpointConfig.builder() + .hostname(HOST) .port(port) .cors(true) .certificate(CertificateConfig.builder() diff --git a/endpoint/http/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/endpoint/http/HttpEndpointWithSslDisabledTest.java b/endpoint/http/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/endpoint/http/HttpEndpointWithSslDisabledTest.java index 90deda8f2..7df6dd91e 100644 --- a/endpoint/http/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/endpoint/http/HttpEndpointWithSslDisabledTest.java +++ b/endpoint/http/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/endpoint/http/HttpEndpointWithSslDisabledTest.java @@ -18,6 +18,7 @@ import static org.mockito.Mockito.spy; import de.fraunhofer.iosb.ilt.faaast.service.Service; +import de.fraunhofer.iosb.ilt.faaast.service.config.CoreConfig; import de.fraunhofer.iosb.ilt.faaast.service.filestorage.FileStorage; import de.fraunhofer.iosb.ilt.faaast.service.messagebus.MessageBus; import de.fraunhofer.iosb.ilt.faaast.service.persistence.Persistence; @@ -48,10 +49,11 @@ private static void startServer() throws Exception { scheme = HttpScheme.HTTP.toString(); endpoint = new HttpEndpoint(); server = new Server(); - service = spy(new Service(coreConfig, persistence, fileStorage, mock(MessageBus.class), List.of(endpoint), List.of(), List.of())); + service = spy(new Service(CoreConfig.DEFAULT, persistence, fileStorage, mock(MessageBus.class), List.of(endpoint), List.of(), List.of())); endpoint.init( - coreConfig, + CoreConfig.DEFAULT, HttpEndpointConfig.builder() + .hostname(HOST) .port(port) .cors(true) .ssl(false) diff --git a/messagebus/cloudevents/pom.xml b/messagebus/cloudevents/pom.xml index 8022006f1..3d7c4ed77 100644 --- a/messagebus/cloudevents/pom.xml +++ b/messagebus/cloudevents/pom.xml @@ -19,6 +19,11 @@ dataformat-json ${project.version} + + ${project.groupId} + endpoint-http + ${project.version} + ${project.groupId} messagebus-internal @@ -128,7 +133,7 @@ org.wiremock - wiremock + wiremock-standalone ${wiremock.version} test diff --git a/messagebus/cloudevents/src/main/java/de/fraunhofer/iosb/ilt/faaast/service/messagebus/cloudevents/MessageBusCloudEvents.java b/messagebus/cloudevents/src/main/java/de/fraunhofer/iosb/ilt/faaast/service/messagebus/cloudevents/MessageBusCloudEvents.java index 47e438bd7..71e3e8830 100644 --- a/messagebus/cloudevents/src/main/java/de/fraunhofer/iosb/ilt/faaast/service/messagebus/cloudevents/MessageBusCloudEvents.java +++ b/messagebus/cloudevents/src/main/java/de/fraunhofer/iosb/ilt/faaast/service/messagebus/cloudevents/MessageBusCloudEvents.java @@ -23,6 +23,9 @@ import com.fasterxml.jackson.databind.SerializationFeature; import de.fraunhofer.iosb.ilt.faaast.service.ServiceContext; import de.fraunhofer.iosb.ilt.faaast.service.config.CoreConfig; +import de.fraunhofer.iosb.ilt.faaast.service.endpoint.Endpoint; +import de.fraunhofer.iosb.ilt.faaast.service.endpoint.http.HttpEndpoint; +import de.fraunhofer.iosb.ilt.faaast.service.exception.ConfigurationInitializationException; import de.fraunhofer.iosb.ilt.faaast.service.exception.MessageBusException; import de.fraunhofer.iosb.ilt.faaast.service.messagebus.MessageBus; import de.fraunhofer.iosb.ilt.faaast.service.messagebus.cloudevents.mapper.CloudEventMapperConfig; @@ -39,9 +42,11 @@ import de.fraunhofer.iosb.ilt.faaast.service.model.messagebus.SubscriptionId; import de.fraunhofer.iosb.ilt.faaast.service.model.messagebus.SubscriptionInfo; import de.fraunhofer.iosb.ilt.faaast.service.util.EnvironmentHelper; +import de.fraunhofer.iosb.ilt.faaast.service.util.HostnameUtil; import io.cloudevents.CloudEvent; import io.cloudevents.jackson.JsonFormat; -import java.util.Optional; +import java.net.URI; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Function; @@ -58,7 +63,7 @@ public class MessageBusCloudEvents implements MessageBus { private static final String PUBLISH_ERROR_MSG = "{} publishing event via CloudEvents MQTT message bus for message type {}"; - private static final String DEFAULT_CALLBACK_ADDRESS = "localhost"; + private static final String NO_HTTP_ENDPOINT_FOUND_MESSAGE = "No HTTP Endpoint found but one is required for MessageBusCloudEvents"; private static final Logger LOGGER = LoggerFactory.getLogger(MessageBusCloudEvents.class); @@ -121,7 +126,7 @@ public MessageBusCloudEventsConfig asConfig() { @Override - public void init(CoreConfig coreConfig, MessageBusCloudEventsConfig config, ServiceContext serviceContext) { + public void init(CoreConfig coreConfig, MessageBusCloudEventsConfig config, ServiceContext serviceContext) throws ConfigurationInitializationException { messageBusInternal.init(coreConfig, MessageBusInternalConfig.builder().build(), serviceContext); this.config = config; @@ -142,13 +147,9 @@ public void init(CoreConfig coreConfig, MessageBusCloudEventsConfig config, Serv } }; - String callbackAddress = Optional.ofNullable(coreConfig.getCallbackAddress()) - .orElseGet(() -> { - LOGGER.warn("No callbackAddress configured in the core configuration. Using '{}' as CloudEvent 'source' URL", DEFAULT_CALLBACK_ADDRESS); - return DEFAULT_CALLBACK_ADDRESS; - }); + String sourceUri = getSourceUri(serviceContext.getEndpoints()); - mapperRegistry = defaultRegistry(CloudEventMapperConfig.from(config, callbackAddress), referableSupplier); + mapperRegistry = defaultRegistry(CloudEventMapperConfig.from(config, sourceUri), referableSupplier); } @@ -160,6 +161,28 @@ public void publish(EventMessage message) throws MessageBusException { } + private String getSourceUri(List httpEndpoints) throws ConfigurationInitializationException { + List endpointUri = httpEndpoints.stream() + .filter(HttpEndpoint.class::isInstance) + .map(ep -> (HttpEndpoint) ep) + .map(HttpEndpoint::getEndpointUri) + .toList(); + + if (endpointUri.isEmpty()) { + throw new ConfigurationInitializationException(NO_HTTP_ENDPOINT_FOUND_MESSAGE); + } + + // Try to get a sourceUri which is not localhost/127.0.0.1, else take any one + return endpointUri.stream() + .filter(uri -> !uri.getHost().equals(HostnameUtil.getHostname())) + .findAny() + .map(URI::toString) + .orElse(endpointUri.stream() + .findAny().get() + .toString()); + } + + private void distributeCloudEvent(EventMessage message) { try { if (mapperRegistry.canHandle(message)) { diff --git a/messagebus/cloudevents/src/main/java/de/fraunhofer/iosb/ilt/faaast/service/messagebus/cloudevents/mapper/CloudEventMapper.java b/messagebus/cloudevents/src/main/java/de/fraunhofer/iosb/ilt/faaast/service/messagebus/cloudevents/mapper/CloudEventMapper.java index 291b240ae..b94f617fe 100644 --- a/messagebus/cloudevents/src/main/java/de/fraunhofer/iosb/ilt/faaast/service/messagebus/cloudevents/mapper/CloudEventMapper.java +++ b/messagebus/cloudevents/src/main/java/de/fraunhofer/iosb/ilt/faaast/service/messagebus/cloudevents/mapper/CloudEventMapper.java @@ -219,9 +219,9 @@ private String getSpecificElementName(Reference reference) { private URI getSourceUri(Reference reference) { // base - String uriString = config.callbackAddress().endsWith("/") - ? config.callbackAddress().substring(0, config.callbackAddress().length() - 1) - : config.callbackAddress(); + String uriString = config.sourceUri().endsWith("/") + ? config.sourceUri().substring(0, config.sourceUri().length() - 1) + : config.sourceUri(); Key root = ReferenceHelper.getRoot(reference); if (root == null || root.getValue() == null) { diff --git a/messagebus/cloudevents/src/main/java/de/fraunhofer/iosb/ilt/faaast/service/messagebus/cloudevents/mapper/CloudEventMapperConfig.java b/messagebus/cloudevents/src/main/java/de/fraunhofer/iosb/ilt/faaast/service/messagebus/cloudevents/mapper/CloudEventMapperConfig.java index d728e1175..db699817f 100644 --- a/messagebus/cloudevents/src/main/java/de/fraunhofer/iosb/ilt/faaast/service/messagebus/cloudevents/mapper/CloudEventMapperConfig.java +++ b/messagebus/cloudevents/src/main/java/de/fraunhofer/iosb/ilt/faaast/service/messagebus/cloudevents/mapper/CloudEventMapperConfig.java @@ -20,22 +20,22 @@ /** * Configuration for a FA³ST event message to CloudEvent mapper. * - * @param callbackAddress Callback address for subscribers of the CloudEvents + * @param sourceUri URI to use in the "source" field of a CloudEvent * @param dataSchemaPrefix Data schema prefix used to assign semantic information to data elements * @param eventTypePrefix Prefix for event types * @param slimEvents If true, the data field in CloudEvents will be empty */ -public record CloudEventMapperConfig(String callbackAddress, String dataSchemaPrefix, String eventTypePrefix, boolean slimEvents) { +public record CloudEventMapperConfig(String sourceUri, String dataSchemaPrefix, String eventTypePrefix, boolean slimEvents) { /** * Build a CloudEventMapperConfig from MessageBusCloudEventsConfig and a referable supplier. * * @param messageBusConfig The MessageBusCloudEventsConfig - * @param callbackAddress The callback address for event subscribers. + * @param sourceUri URI to use in the "source" field of a CloudEvent. * @return The CloudEventMapperConfig */ - public static CloudEventMapperConfig from(MessageBusCloudEventsConfig messageBusConfig, String callbackAddress) { - return new CloudEventMapperConfig(callbackAddress, + public static CloudEventMapperConfig from(MessageBusCloudEventsConfig messageBusConfig, String sourceUri) { + return new CloudEventMapperConfig(sourceUri, messageBusConfig.getDataSchemaPrefix(), messageBusConfig.getEventTypePrefix(), messageBusConfig.isSlimEvents()); diff --git a/messagebus/cloudevents/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/messagebus/cloudevents/MessageBusCloudEventsTest.java b/messagebus/cloudevents/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/messagebus/cloudevents/MessageBusCloudEventsTest.java index 1e9210512..0a2036597 100644 --- a/messagebus/cloudevents/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/messagebus/cloudevents/MessageBusCloudEventsTest.java +++ b/messagebus/cloudevents/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/messagebus/cloudevents/MessageBusCloudEventsTest.java @@ -15,9 +15,12 @@ package de.fraunhofer.iosb.ilt.faaast.service.messagebus.cloudevents; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import de.fraunhofer.iosb.ilt.faaast.service.ServiceContext; import de.fraunhofer.iosb.ilt.faaast.service.config.CoreConfig; +import de.fraunhofer.iosb.ilt.faaast.service.endpoint.http.HttpEndpoint; +import de.fraunhofer.iosb.ilt.faaast.service.exception.ConfigurationInitializationException; import de.fraunhofer.iosb.ilt.faaast.service.exception.MessageBusException; import de.fraunhofer.iosb.ilt.faaast.service.model.messagebus.EventMessage; import de.fraunhofer.iosb.ilt.faaast.service.model.messagebus.SubscriptionInfo; @@ -32,8 +35,10 @@ import io.moquette.broker.config.IConfig; import io.moquette.broker.config.MemoryConfig; import java.io.IOException; +import java.net.URI; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; @@ -98,9 +103,9 @@ public static void cleanUp() { @Test - public void testExactTypeSubscription() throws InterruptedException, MessageBusException { + public void testExactTypeSubscription() throws InterruptedException, MessageBusException, ConfigurationInitializationException { MessageBusCloudEvents messageBus = new MessageBusCloudEvents(); - messageBus.init(CoreConfig.DEFAULT, MessageBusCloudEventsConfig.builder().host(String.format("tcp://localhost:%d", BROKER_PORT)).build(), mock(ServiceContext.class)); + messageBus.init(CoreConfig.DEFAULT, MessageBusCloudEventsConfig.builder().host(String.format("tcp://localhost:%d", BROKER_PORT)).build(), mockServiceContext()); messageBus.start(); CountDownLatch condition = new CountDownLatch(1); final AtomicReference response = new AtomicReference<>(); @@ -118,9 +123,9 @@ public void testExactTypeSubscription() throws InterruptedException, MessageBusE @Test - public void testSuperTypeSubscription() throws InterruptedException, MessageBusException { + public void testSuperTypeSubscription() throws InterruptedException, MessageBusException, ConfigurationInitializationException { MessageBusCloudEvents messageBus = new MessageBusCloudEvents(); - messageBus.init(CoreConfig.DEFAULT, MessageBusCloudEventsConfig.builder().host(String.format("tcp://localhost:%d", BROKER_PORT)).build(), mock(ServiceContext.class)); + messageBus.init(CoreConfig.DEFAULT, MessageBusCloudEventsConfig.builder().host(String.format("tcp://localhost:%d", BROKER_PORT)).build(), mockServiceContext()); messageBus.start(); Set messages = Set.of(valueChangeMessage, errorMessage); Set responses = Collections.synchronizedSet(new HashSet<>()); @@ -146,9 +151,9 @@ public void testSuperTypeSubscription() throws InterruptedException, MessageBusE @Test - public void testDistinctTypesSubscription() throws InterruptedException, MessageBusException { + public void testDistinctTypesSubscription() throws InterruptedException, MessageBusException, ConfigurationInitializationException { MessageBusCloudEvents messageBus = new MessageBusCloudEvents(); - messageBus.init(CoreConfig.DEFAULT, MessageBusCloudEventsConfig.builder().host(String.format("tcp://localhost:%d", BROKER_PORT)).build(), mock(ServiceContext.class)); + messageBus.init(CoreConfig.DEFAULT, MessageBusCloudEventsConfig.builder().host(String.format("tcp://localhost:%d", BROKER_PORT)).build(), mockServiceContext()); messageBus.start(); Map, Set> messages = Map.of( ChangeEventMessage.class, Set.of(valueChangeMessage), @@ -179,9 +184,9 @@ public void testDistinctTypesSubscription() throws InterruptedException, Message @Test - public void testNotMatchingSubscription() throws InterruptedException, MessageBusException { + public void testNotMatchingSubscription() throws InterruptedException, MessageBusException, ConfigurationInitializationException { MessageBusCloudEvents messageBus = new MessageBusCloudEvents(); - messageBus.init(CoreConfig.DEFAULT, MessageBusCloudEventsConfig.builder().host(String.format("tcp://localhost:%d", BROKER_PORT)).build(), mock(ServiceContext.class)); + messageBus.init(CoreConfig.DEFAULT, MessageBusCloudEventsConfig.builder().host(String.format("tcp://localhost:%d", BROKER_PORT)).build(), mockServiceContext()); messageBus.start(); CountDownLatch condition = new CountDownLatch(1); messageBus.subscribe(SubscriptionInfo.create( @@ -202,9 +207,9 @@ public void testNotMatchingSubscription() throws InterruptedException, MessageBu @Test - public void testSubscribeUnsubscribe() throws InterruptedException, MessageBusException { + public void testSubscribeUnsubscribe() throws InterruptedException, MessageBusException, ConfigurationInitializationException { MessageBusCloudEvents messageBus = new MessageBusCloudEvents(); - messageBus.init(CoreConfig.DEFAULT, MessageBusCloudEventsConfig.builder().host(String.format("tcp://localhost:%d", BROKER_PORT)).build(), mock(ServiceContext.class)); + messageBus.init(CoreConfig.DEFAULT, MessageBusCloudEventsConfig.builder().host(String.format("tcp://localhost:%d", BROKER_PORT)).build(), mockServiceContext()); messageBus.start(); CountDownLatch condition = new CountDownLatch(1); final AtomicReference response = new AtomicReference<>(); @@ -223,4 +228,14 @@ public void testSubscribeUnsubscribe() throws InterruptedException, MessageBusEx Assert.assertFalse(condition.await(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS)); messageBus.stop(); } + + + private ServiceContext mockServiceContext() { + ServiceContext mock = mock(ServiceContext.class); + HttpEndpoint mockEndpoint = mock(HttpEndpoint.class); + when(mockEndpoint.getEndpointUri()).thenReturn(URI.create("http://invalid.local/path/api/v3.1")); + when(mock.getEndpoints()).thenReturn(List.of(mockEndpoint)); + + return mock; + } } diff --git a/messagebus/cloudevents/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/messagebus/cloudevents/mapper/AbstractCloudEventMapperTest.java b/messagebus/cloudevents/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/messagebus/cloudevents/mapper/AbstractCloudEventMapperTest.java index 0420ea490..d4eb3323f 100644 --- a/messagebus/cloudevents/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/messagebus/cloudevents/mapper/AbstractCloudEventMapperTest.java +++ b/messagebus/cloudevents/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/messagebus/cloudevents/mapper/AbstractCloudEventMapperTest.java @@ -36,7 +36,7 @@ public abstract class AbstractCloudEventMapperTest { private final String eventTypePrefix = "my.prefix.test."; - private final String callbackAddress = "https://localhost:12345/api/v3.0"; + private final String sourceUri = "https://localhost:12345/api/v3.0"; private final String dataSchemaPrefix = "https://my-data-schema-prefix/path#"; protected final ObjectMapper mapper = new ObjectMapper() .enable(SerializationFeature.INDENT_OUTPUT) @@ -50,17 +50,17 @@ protected CloudEventMapper getCloudEventMapper(Function re protected CloudEventMapper getCloudEventMapper(boolean slimEvents, Function referableSupplier) { - return getCloudEventMapper(callbackAddress, dataSchemaPrefix, eventTypePrefix, slimEvents, referableSupplier); + return getCloudEventMapper(sourceUri, dataSchemaPrefix, eventTypePrefix, slimEvents, referableSupplier); } - protected abstract CloudEventMapper getCloudEventMapper(String callbackAddress, String dataSchemaPrefix, String eventTypePrefix, boolean slimEvents, + protected abstract CloudEventMapper getCloudEventMapper(String sourceUri, String dataSchemaPrefix, String eventTypePrefix, boolean slimEvents, Function referableSupplier); protected CloudEvent expectedFrom(String identifiableId, Referable referable, String dataSchemaSuffix, String eventTypeSuffix) throws JsonProcessingException { CloudEventBuilder builder = CloudEventBuilder.v1() - .withSource(URI.create(String.format("%s/submodels/%s/submodel-elements/%s", callbackAddress, EncodingHelper.base64UrlEncode(identifiableId), + .withSource(URI.create(String.format("%s/submodels/%s/submodel-elements/%s", sourceUri, EncodingHelper.base64UrlEncode(identifiableId), referable.getIdShort()))) .withId(UUID.randomUUID().toString()) .withType(eventTypePrefix + eventTypeSuffix) diff --git a/messagebus/cloudevents/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/messagebus/cloudevents/mapper/impl/DefaultCloudEventMapperTest.java b/messagebus/cloudevents/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/messagebus/cloudevents/mapper/impl/DefaultCloudEventMapperTest.java index adfb22f45..68d29f63e 100644 --- a/messagebus/cloudevents/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/messagebus/cloudevents/mapper/impl/DefaultCloudEventMapperTest.java +++ b/messagebus/cloudevents/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/messagebus/cloudevents/mapper/impl/DefaultCloudEventMapperTest.java @@ -36,9 +36,9 @@ public class DefaultCloudEventMapperTest extends AbstractCloudEventMapperTest { private final String updated = "updated"; - protected CloudEventMapper getCloudEventMapper(String callbackAddress, String dataSchemaPrefix, String eventTypePrefix, boolean slimEvents, + protected CloudEventMapper getCloudEventMapper(String sourceUri, String dataSchemaPrefix, String eventTypePrefix, boolean slimEvents, Function referableSupplier) { - return new DefaultCloudEventMapper(new CloudEventMapperConfig(callbackAddress, dataSchemaPrefix, eventTypePrefix, slimEvents)); + return new DefaultCloudEventMapper(new CloudEventMapperConfig(sourceUri, dataSchemaPrefix, eventTypePrefix, slimEvents)); } diff --git a/messagebus/cloudevents/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/messagebus/cloudevents/mapper/impl/ElementDeletedCloudEventMapperTest.java b/messagebus/cloudevents/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/messagebus/cloudevents/mapper/impl/ElementDeletedCloudEventMapperTest.java index 5ffcf0769..a182e8872 100644 --- a/messagebus/cloudevents/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/messagebus/cloudevents/mapper/impl/ElementDeletedCloudEventMapperTest.java +++ b/messagebus/cloudevents/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/messagebus/cloudevents/mapper/impl/ElementDeletedCloudEventMapperTest.java @@ -62,9 +62,9 @@ public void testElementDeletedMappingValid() throws Exception { @Override - protected CloudEventMapper getCloudEventMapper(String callbackAddress, String dataSchemaPrefix, String eventTypePrefix, boolean slimEvents, + protected CloudEventMapper getCloudEventMapper(String sourceUri, String dataSchemaPrefix, String eventTypePrefix, boolean slimEvents, Function referableSupplier) { - return new ElementDeletedCloudEventMapper(new CloudEventMapperConfig(callbackAddress, dataSchemaPrefix, eventTypePrefix, slimEvents)); + return new ElementDeletedCloudEventMapper(new CloudEventMapperConfig(sourceUri, dataSchemaPrefix, eventTypePrefix, slimEvents)); } diff --git a/messagebus/cloudevents/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/messagebus/cloudevents/mapper/impl/ValueChangeCloudEventMapperTest.java b/messagebus/cloudevents/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/messagebus/cloudevents/mapper/impl/ValueChangeCloudEventMapperTest.java index 9b8bb8d93..e319ba777 100644 --- a/messagebus/cloudevents/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/messagebus/cloudevents/mapper/impl/ValueChangeCloudEventMapperTest.java +++ b/messagebus/cloudevents/src/test/java/de/fraunhofer/iosb/ilt/faaast/service/messagebus/cloudevents/mapper/impl/ValueChangeCloudEventMapperTest.java @@ -35,9 +35,9 @@ public class ValueChangeCloudEventMapperTest extends AbstractCloudEventMapperTes private final String valueChanged = "valueChanged"; @Override - protected CloudEventMapper getCloudEventMapper(String callbackAddress, String dataSchemaPrefix, String eventTypePrefix, boolean slimEvents, + protected CloudEventMapper getCloudEventMapper(String sourceUri, String dataSchemaPrefix, String eventTypePrefix, boolean slimEvents, Function referableSupplier) { - return new ValueChangedCloudEventMapper(new CloudEventMapperConfig(callbackAddress, dataSchemaPrefix, eventTypePrefix, slimEvents), referableSupplier); + return new ValueChangedCloudEventMapper(new CloudEventMapperConfig(sourceUri, dataSchemaPrefix, eventTypePrefix, slimEvents), referableSupplier); }