diff --git a/Dockerfile b/Dockerfile index 004661956..77761bea2 100644 --- a/Dockerfile +++ b/Dockerfile @@ -111,7 +111,7 @@ ENV MAX_REQUEST_RETRIES=3 ENV IMPORT_KEEPALIVE= -ENV MAX_IMPORT_THREADS=10 +ENV MAX_IMPORT_THREADS=1 ENV SERVLET_NAME= diff --git a/docker-compose.yml b/docker-compose.yml index 8e8c6fd66..9c75d79e3 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -34,7 +34,7 @@ services: - SSL_VERIFY_CLIENT=optional_no_ca - MAX_BODY_SIZE=2097152 volumes: - - ./platform/nginx.conf.template:/etc/nginx/nginx.conf.template:ro + - ./platform/nginx-frontend.conf.template:/etc/nginx/nginx.conf.template:ro - ./ssl/server:/etc/nginx/ssl:ro linkeddatahub: user: root # otherwise the ldh user does not have permissions to the mounted folder which is owner by root @@ -63,6 +63,7 @@ services: - SELF_SIGNED_CERT=true # only on localhost - SIGN_UP_CERT_VALIDITY=180 - MAX_CONTENT_LENGTH=2097152 + - MAX_IMPORT_THREADS=1 - NOTIFICATION_ADDRESS=LinkedDataHub - MAIL_SMTP_HOST=email-server - MAIL_SMTP_PORT=25 @@ -133,10 +134,11 @@ services: user: root # otherwise the varnish user does not have permissions to the mounted folder which is owner by root depends_on: - linkeddatahub + - nginx-admin tmpfs: /var/lib/varnish/varnishd:exec environment: - - BACKEND_HOST=fuseki-admin - - BACKEND_PORT=3030 + - BACKEND_HOST=nginx-admin + - BACKEND_PORT=8080 - CLIENT_HOST=linkeddatahub - VARNISH_SIZE=1G entrypoint: /bin/sh -c "cp /etc/varnish/default.vcl.template /etc/varnish/default.vcl && sed -i 's|$${BACKEND_HOST}|'"$$BACKEND_HOST"'|g' /etc/varnish/default.vcl && sed -i 's|$${BACKEND_PORT}|'"$$BACKEND_PORT"'|g' /etc/varnish/default.vcl && sed -i 's|$${CLIENT_HOST}|'"$$CLIENT_HOST"'|g' /etc/varnish/default.vcl && /usr/local/bin/docker-varnish-entrypoint \"$$0\" \"$$@\"" @@ -148,16 +150,39 @@ services: user: root # otherwise varnish user does not have permissions to the mounted folder which is owner by root depends_on: - linkeddatahub + - nginx-end-user tmpfs: /var/lib/varnish/varnishd:exec environment: - - BACKEND_HOST=fuseki-end-user - - BACKEND_PORT=3030 + - BACKEND_HOST=nginx-end-user + - BACKEND_PORT=8080 - CLIENT_HOST=linkeddatahub - VARNISH_SIZE=1G entrypoint: /bin/sh -c "cp /etc/varnish/default.vcl.template /etc/varnish/default.vcl && sed -i 's|$${BACKEND_HOST}|'"$$BACKEND_HOST"'|g' /etc/varnish/default.vcl && sed -i 's|$${BACKEND_PORT}|'"$$BACKEND_PORT"'|g' /etc/varnish/default.vcl && sed -i 's|$${CLIENT_HOST}|'"$$CLIENT_HOST"'|g' /etc/varnish/default.vcl && /usr/local/bin/docker-varnish-entrypoint \"$$0\" \"$$@\"" command: [ "-t", "86400", "-p", "timeout_idle=60s" ] # time to live volumes: - ./platform/varnish-backend.vcl.template:/etc/varnish/default.vcl.template:ro + nginx-admin: + image: nginx:1.23.3 + depends_on: + - fuseki-admin + command: /bin/sh -c "cp /etc/nginx/nginx.conf.template /etc/nginx/nginx.conf && sed -i 's|$${UPSTREAM_SERVER}|'"$$UPSTREAM_SERVER"'|g' /etc/nginx/nginx.conf && sed -i 's|$${UPSTREAM_HTTP_PORT}|'"$$UPSTREAM_HTTP_PORT"'|g' /etc/nginx/nginx.conf && sed -i 's|$${SERVER_HTTP_PORT}|'"$$SERVER_HTTP_PORT"'|g' /etc/nginx/nginx.conf && nginx -g 'daemon off;'" + environment: + - UPSTREAM_SERVER=fuseki-admin + - UPSTREAM_HTTP_PORT=3030 + - SERVER_HTTP_PORT=8080 # because of nginx-unprivileged + volumes: + - ./platform/nginx-backend.conf.template:/etc/nginx/nginx.conf.template:ro + nginx-end-user: + image: nginx:1.23.3 + depends_on: + - fuseki-end-user + command: /bin/sh -c "cp /etc/nginx/nginx.conf.template /etc/nginx/nginx.conf && sed -i 's|$${UPSTREAM_SERVER}|'"$$UPSTREAM_SERVER"'|g' /etc/nginx/nginx.conf && sed -i 's|$${UPSTREAM_HTTP_PORT}|'"$$UPSTREAM_HTTP_PORT"'|g' /etc/nginx/nginx.conf && sed -i 's|$${SERVER_HTTP_PORT}|'"$$SERVER_HTTP_PORT"'|g' /etc/nginx/nginx.conf && nginx -g 'daemon off;'" + environment: + - UPSTREAM_SERVER=fuseki-end-user + - UPSTREAM_HTTP_PORT=3030 + - SERVER_HTTP_PORT=8080 # because of nginx-unprivileged + volumes: + - ./platform/nginx-backend.conf.template:/etc/nginx/nginx.conf.template:ro email-server: image: namshi/smtp environment: diff --git a/platform/nginx-backend.conf.template b/platform/nginx-backend.conf.template new file mode 100644 index 000000000..7a3a768a7 --- /dev/null +++ b/platform/nginx-backend.conf.template @@ -0,0 +1,40 @@ +worker_processes 1; + +events { + worker_connections 1024; +} + +http { + include mime.types; + default_type application/octet-stream; + + # Rate limiting: 5 requests per second per IP + limit_req_zone $binary_remote_addr zone=api_ratelimit:10m rate=5r/s; + limit_req_status 429; + + upstream varnish_backend { + server ${UPSTREAM_SERVER}:${UPSTREAM_HTTP_PORT}; + } + + server { + listen ${SERVER_HTTP_PORT}; + + # Health check path + location = /healthz { + return 200 'ok'; + add_header Content-Type text/plain; + } + + location / { + # Apply rate limiting with burst buffer + limit_req zone=api_ratelimit burst=10; + # Optional: tell clients how long to wait (1s = 5r/s baseline) + add_header Retry-After 1 always; + + proxy_pass http://varnish_backend; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + } + } +} diff --git a/platform/nginx.conf.template b/platform/nginx-frontend.conf.template similarity index 100% rename from platform/nginx.conf.template rename to platform/nginx-frontend.conf.template diff --git a/platform/varnish-backend.vcl.template b/platform/varnish-backend.vcl.template index 40b46a10e..e80bf1292 100644 --- a/platform/varnish-backend.vcl.template +++ b/platform/varnish-backend.vcl.template @@ -54,6 +54,11 @@ sub vcl_recv { } sub vcl_backend_response { + if (beresp.status == 429) { + set beresp.uncacheable = true; + return (deliver); + } + /* purge URLs after updates */ if ((beresp.status == 200 || beresp.status == 201 || beresp.status == 204) && bereq.method ~ "POST|PUT|DELETE|PATCH") { set beresp.http.X-LinkedDataHub = "Banned"; diff --git a/src/main/java/com/atomgraph/linkeddatahub/client/GraphStoreClient.java b/src/main/java/com/atomgraph/linkeddatahub/client/GraphStoreClient.java new file mode 100644 index 000000000..ecf5fd647 --- /dev/null +++ b/src/main/java/com/atomgraph/linkeddatahub/client/GraphStoreClient.java @@ -0,0 +1,197 @@ +/** + * Copyright 2025 Martynas Jusevičius + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.atomgraph.linkeddatahub.client; + +import com.atomgraph.core.MediaTypes; +import com.atomgraph.linkeddatahub.client.util.RetryingInvocationBuilder; +import jakarta.ws.rs.NotFoundException; +import jakarta.ws.rs.client.Entity; +import jakarta.ws.rs.client.WebTarget; +import jakarta.ws.rs.core.MultivaluedHashMap; +import jakarta.ws.rs.core.MultivaluedMap; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.Response.Status; +import org.apache.jena.rdf.model.Model; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * @author Martynas.Jusevicius + */ +public class GraphStoreClient extends com.atomgraph.core.client.GraphStoreClient { + + private static final Logger log = LoggerFactory.getLogger(GraphStoreClient.class); + + private final long defaultDelayMillis; // = 5000L; + private final int maxRetryCount; // = 3; + + protected GraphStoreClient(MediaTypes mediaTypes, WebTarget endpoint, long defaultDelayMillis, int maxRetryCount) { + super(mediaTypes, endpoint); + this.defaultDelayMillis = defaultDelayMillis; + this.maxRetryCount = maxRetryCount; + } + + public static GraphStoreClient create(MediaTypes mediaTypes, WebTarget endpoint, long defaultDelayMillis, int maxRetryCount) { + return new GraphStoreClient(mediaTypes, endpoint, defaultDelayMillis, maxRetryCount); + } + + @Override + public boolean containsModel(String uri) { + MultivaluedMap params = new MultivaluedHashMap<>(); + params.putSingle(GRAPH_PARAM_NAME, uri); + + try (Response cr = new RetryingInvocationBuilder( + applyParams(params).request(getReadableMediaTypes(Model.class)), + defaultDelayMillis, + maxRetryCount + ).head()) { + return cr.getStatusInfo().getFamily().equals(Response.Status.Family.SUCCESSFUL); + } + } + + @Override + public Model getModel() { + MultivaluedMap params = new MultivaluedHashMap<>(); + params.putSingle(DEFAULT_PARAM_NAME, Boolean.TRUE.toString()); + + try (Response cr = new RetryingInvocationBuilder( + applyParams(params).request(getReadableMediaTypes(Model.class)), + defaultDelayMillis, + maxRetryCount + ).get()) { + if (cr.getStatus() == Status.NOT_FOUND.getStatusCode()) { + throw new NotFoundException(); + } + return cr.readEntity(Model.class); + } + } + + @Override + public Model getModel(String uri) { + MultivaluedMap params = new MultivaluedHashMap<>(); + params.putSingle(GRAPH_PARAM_NAME, uri); + + try (Response cr = new RetryingInvocationBuilder( + applyParams(params).request(getReadableMediaTypes(Model.class)), + defaultDelayMillis, + maxRetryCount + ).get()) { + if (cr.getStatus() == Status.NOT_FOUND.getStatusCode()) { + throw new NotFoundException(); + } + return cr.readEntity(Model.class); + } + } + + @Override + public void add(Model model) { + MultivaluedMap params = new MultivaluedHashMap<>(); + params.putSingle(DEFAULT_PARAM_NAME, Boolean.TRUE.toString()); + + try (Response cr = new RetryingInvocationBuilder( + applyParams(params).request(), + defaultDelayMillis, + maxRetryCount + ).post(Entity.entity(model, getDefaultMediaType()))) { + if (cr.getStatus() == Status.NOT_FOUND.getStatusCode()) { + throw new NotFoundException(); + } + } + } + + @Override + public void add(String uri, Model model) { + MultivaluedMap params = new MultivaluedHashMap<>(); + params.putSingle(GRAPH_PARAM_NAME, uri); + + try (Response cr = new RetryingInvocationBuilder( + applyParams(params).request(), + defaultDelayMillis, + maxRetryCount + ).post(Entity.entity(model, getDefaultMediaType()))) { + if (cr.getStatus() == Status.NOT_FOUND.getStatusCode()) { + throw new NotFoundException(); + } + } + } + + @Override + public void putModel(Model model) { + MultivaluedMap params = new MultivaluedHashMap<>(); + params.putSingle(DEFAULT_PARAM_NAME, Boolean.TRUE.toString()); + + try (Response cr = new RetryingInvocationBuilder( + applyParams(params).request(), + defaultDelayMillis, + maxRetryCount + ).put(Entity.entity(model, getDefaultMediaType()))) { + if (cr.getStatus() == Status.NOT_FOUND.getStatusCode()) { + throw new NotFoundException(); + } + } + } + + @Override + public void putModel(String uri, Model model) { + MultivaluedMap params = new MultivaluedHashMap<>(); + params.putSingle(GRAPH_PARAM_NAME, uri); + + try (Response cr = new RetryingInvocationBuilder( + applyParams(params).request(), + defaultDelayMillis, + maxRetryCount + ).put(Entity.entity(model, getDefaultMediaType()))) { + if (cr.getStatus() == Status.NOT_FOUND.getStatusCode()) { + throw new NotFoundException(); + } + } + } + + @Override + public void deleteDefault() { + MultivaluedMap params = new MultivaluedHashMap<>(); + params.putSingle(DEFAULT_PARAM_NAME, Boolean.TRUE.toString()); + + try (Response cr = new RetryingInvocationBuilder( + applyParams(params).request(), + defaultDelayMillis, + maxRetryCount + ).delete()) { + if (cr.getStatus() == Status.NOT_FOUND.getStatusCode()) { + throw new NotFoundException(); + } + } + } + + @Override + public void deleteModel(String uri) { + MultivaluedMap params = new MultivaluedHashMap<>(); + params.putSingle(GRAPH_PARAM_NAME, uri); + + try (Response cr = new RetryingInvocationBuilder( + applyParams(params).request(), + defaultDelayMillis, + maxRetryCount + ).delete()) { + if (cr.getStatus() == Status.NOT_FOUND.getStatusCode()) { + throw new NotFoundException(); + } + } + } + +} diff --git a/src/main/java/com/atomgraph/linkeddatahub/client/LinkedDataClient.java b/src/main/java/com/atomgraph/linkeddatahub/client/LinkedDataClient.java index 2138a5f30..21aab28ee 100644 --- a/src/main/java/com/atomgraph/linkeddatahub/client/LinkedDataClient.java +++ b/src/main/java/com/atomgraph/linkeddatahub/client/LinkedDataClient.java @@ -19,14 +19,13 @@ import com.atomgraph.core.MediaTypes; import com.atomgraph.linkeddatahub.client.filter.auth.IDTokenDelegationFilter; import com.atomgraph.linkeddatahub.client.filter.auth.WebIDDelegationFilter; -import com.atomgraph.linkeddatahub.client.util.RetryAfterHelper; +import com.atomgraph.linkeddatahub.client.util.RetryingInvocationBuilder; import com.atomgraph.linkeddatahub.server.security.AgentContext; import com.atomgraph.linkeddatahub.server.security.IDTokenSecurityContext; import com.atomgraph.linkeddatahub.server.security.WebIDSecurityContext; import java.net.URI; import jakarta.ws.rs.client.Client; import jakarta.ws.rs.client.Entity; -import static jakarta.ws.rs.client.Entity.entity; import jakarta.ws.rs.client.WebTarget; import jakarta.ws.rs.core.HttpHeaders; import jakarta.ws.rs.core.MediaType; @@ -160,43 +159,62 @@ protected WebTarget getWebTarget(URI uri) public Response get(URI uri, jakarta.ws.rs.core.MediaType[] acceptedTypes) { WebTarget webTarget = getWebTarget(uri); - return new RetryAfterHelper(getDefaultDelayMillis(), getMaxRetryCount()).invokeWithRetry(() -> - webTarget.request(acceptedTypes) - .header(HttpHeaders.USER_AGENT, getUserAgentHeaderValue()) - .get()); + + return new RetryingInvocationBuilder( + webTarget. + request(acceptedTypes). + header(HttpHeaders.USER_AGENT, getUserAgentHeaderValue()), getDefaultDelayMillis(), getMaxRetryCount()). + get(); } @Override public Response post(URI uri, MediaType[] acceptedTypes, Entity entity) { WebTarget webTarget = getWebTarget(uri); - return new RetryAfterHelper(getDefaultDelayMillis(), getMaxRetryCount()).invokeWithRetry(() -> - webTarget.request(acceptedTypes).post(entity)); + + return new RetryingInvocationBuilder( + webTarget.request(acceptedTypes), + getDefaultDelayMillis(), + getMaxRetryCount()). + post(entity); } @Override public Response put(URI uri, MediaType[] acceptedTypes, Entity entity) { WebTarget webTarget = getWebTarget(uri); - return new RetryAfterHelper(getDefaultDelayMillis(), getMaxRetryCount()).invokeWithRetry(() -> - webTarget.request(acceptedTypes).put(entity)); + + return new RetryingInvocationBuilder( + webTarget.request(acceptedTypes), + getDefaultDelayMillis(), + getMaxRetryCount() + ). + put(entity); } public Response put(URI uri, Model model, MultivaluedMap headers) { WebTarget webTarget = getWebTarget(uri); - return new RetryAfterHelper(getDefaultDelayMillis(), getMaxRetryCount()).invokeWithRetry(() -> - webTarget.request(getReadableMediaTypes(Model.class)). - headers(headers). - put(Entity.entity(model, getDefaultMediaType()))); + + return new RetryingInvocationBuilder( + webTarget.request(getReadableMediaTypes(Model.class)).headers(headers), + getDefaultDelayMillis(), + getMaxRetryCount() + ). + put(Entity.entity(model, getDefaultMediaType())); } @Override public Response delete(URI uri) { WebTarget webTarget = getWebTarget(uri); - return new RetryAfterHelper(getDefaultDelayMillis(), getMaxRetryCount()).invokeWithRetry(() -> - webTarget.request().delete()); + + return new RetryingInvocationBuilder( + webTarget.request(), + getDefaultDelayMillis(), + getMaxRetryCount() + ). + delete(); } /** diff --git a/src/main/java/com/atomgraph/linkeddatahub/client/SPARQLClient.java b/src/main/java/com/atomgraph/linkeddatahub/client/SPARQLClient.java new file mode 100644 index 000000000..e9f1016ab --- /dev/null +++ b/src/main/java/com/atomgraph/linkeddatahub/client/SPARQLClient.java @@ -0,0 +1,106 @@ +/** + * Copyright 2025 Martynas Jusevičius + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.atomgraph.linkeddatahub.client; + +import com.atomgraph.core.MediaTypes; +import com.atomgraph.linkeddatahub.client.util.RetryingInvocationBuilder; +import jakarta.ws.rs.client.Entity; +import jakarta.ws.rs.client.Invocation; +import jakarta.ws.rs.client.WebTarget; +import jakarta.ws.rs.core.MultivaluedHashMap; +import jakarta.ws.rs.core.MultivaluedMap; +import jakarta.ws.rs.core.Response; +import org.apache.jena.query.Query; +import org.apache.jena.update.UpdateRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class SPARQLClient extends com.atomgraph.core.client.SPARQLClient { + + private static final Logger log = LoggerFactory.getLogger(SPARQLClient.class); + + private final long defaultDelayMillis; // = 1000L; + private final int maxRetryCount; // = 3; + + protected SPARQLClient(MediaTypes mediaTypes, WebTarget endpoint, int maxGetRequestSize, long defaultDelayMillis, int maxRetryCount) { + super(mediaTypes, endpoint, maxGetRequestSize); + this.defaultDelayMillis = defaultDelayMillis; + this.maxRetryCount = maxRetryCount; + } + + protected SPARQLClient(MediaTypes mediaTypes, WebTarget endpoint, long defaultDelayMillis, int maxRetryCount) { + super(mediaTypes, endpoint); + this.defaultDelayMillis = defaultDelayMillis; + this.maxRetryCount = maxRetryCount; + } + + public static SPARQLClient create(MediaTypes mediaTypes, WebTarget endpoint, int maxGetRequestSize, long defaultDelayMillis, int maxRetryCount) { + return new SPARQLClient(mediaTypes, endpoint, maxGetRequestSize, defaultDelayMillis, maxRetryCount); + } + + public static SPARQLClient create(MediaTypes mediaTypes, WebTarget endpoint, long defaultDelayMillis, int maxRetryCount) { + return new SPARQLClient(mediaTypes, endpoint, defaultDelayMillis, maxRetryCount); + } + + @Override + public Response query(Query query, Class clazz, MultivaluedMap params, MultivaluedMap headers) { + if (params == null) { + throw new IllegalArgumentException("params cannot be null"); + } + if (headers == null) { + throw new IllegalArgumentException("headers cannot be null"); + } + + MultivaluedMap mergedParams = new MultivaluedHashMap<>(); + mergedParams.putAll(params); + mergedParams.putSingle(QUERY_PARAM_NAME, query.toString()); + + Invocation.Builder builder; + if (getQueryURLLength(params) > getMaxGetRequestSize()) { + builder = new RetryingInvocationBuilder( + applyHeaders(getEndpoint().request(getReadableMediaTypes(clazz)), headers), + defaultDelayMillis, + maxRetryCount + ); + return builder.post(Entity.form(mergedParams)); + } else { + builder = new RetryingInvocationBuilder( + applyHeaders(applyParams(mergedParams).request(getReadableMediaTypes(clazz)), headers), + defaultDelayMillis, + maxRetryCount + ); + return builder.get(); + } + } + + @Override + public void update(UpdateRequest updateRequest, MultivaluedMap params) { + MultivaluedMap formData = new MultivaluedHashMap<>(); + if (params != null) { + formData.putAll(params); + } + formData.putSingle(UPDATE_PARAM_NAME, updateRequest.toString()); + + new RetryingInvocationBuilder( + getEndpoint().request(), + defaultDelayMillis, + maxRetryCount + ).post(Entity.form(formData)).close(); + } + +} diff --git a/src/main/java/com/atomgraph/linkeddatahub/client/util/RetryAfterHelper.java b/src/main/java/com/atomgraph/linkeddatahub/client/util/RetryAfterHelper.java index d2340cd74..9912233b2 100644 --- a/src/main/java/com/atomgraph/linkeddatahub/client/util/RetryAfterHelper.java +++ b/src/main/java/com/atomgraph/linkeddatahub/client/util/RetryAfterHelper.java @@ -43,8 +43,10 @@ * * @author Martynas Jusevičius {@literal } */ +@Deprecated public class RetryAfterHelper { + private static final Logger log = LoggerFactory.getLogger(RetryAfterHelper.class); private final long defaultDelayMillis; diff --git a/src/main/java/com/atomgraph/linkeddatahub/client/util/RetryingInvocationBuilder.java b/src/main/java/com/atomgraph/linkeddatahub/client/util/RetryingInvocationBuilder.java new file mode 100644 index 000000000..fddc05891 --- /dev/null +++ b/src/main/java/com/atomgraph/linkeddatahub/client/util/RetryingInvocationBuilder.java @@ -0,0 +1,153 @@ +/** + * Copyright 2025 Martynas Jusevičius + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.atomgraph.linkeddatahub.client.util; + +import jakarta.ws.rs.ProcessingException; +import jakarta.ws.rs.client.*; +import jakarta.ws.rs.core.*; + +import java.util.Locale; +import java.util.concurrent.Callable; + +public class RetryingInvocationBuilder implements Invocation.Builder +{ + + private final Invocation.Builder delegate; + private final long defaultDelayMillis; + private final int maxRetryCount; + + public RetryingInvocationBuilder(Invocation.Builder delegate, long defaultDelayMillis, int maxRetryCount) + { + this.delegate = delegate; + this.defaultDelayMillis = defaultDelayMillis; + this.maxRetryCount = maxRetryCount; + } + + private T invokeWithRetry(Callable operation) + { + int retryCount = 0; + + while (true) + { + try + { + T result = operation.call(); + + if (result instanceof Response response) + { + if (response.getStatusInfo().equals(Response.Status.TOO_MANY_REQUESTS)) + { + long delay = defaultDelayMillis; + String retryAfter = response.getHeaderString(HttpHeaders.RETRY_AFTER); + if (retryAfter != null) + { + try + { + delay = Long.parseLong(retryAfter) * 1000; + } + catch (NumberFormatException ignore) {} + } + + response.close(); + + if (++retryCount > maxRetryCount) + throw new ProcessingException("Max retries exceeded"); + + try + { + Thread.sleep(delay); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + throw new ProcessingException("Interrupted during retry", e); + } + + continue; + } + } + + return result; + } + catch (RuntimeException e) + { + throw e; + } + catch (Exception e) + { + throw new ProcessingException("Unexpected checked exception during retry", e); + } + } + } + + @Override public Response get() { return invokeWithRetry(delegate::get); } + @Override public T get(Class responseType) { return invokeWithRetry(() -> delegate.get(responseType)); } + @Override public T get(GenericType responseType) { return invokeWithRetry(() -> delegate.get(responseType)); } + + @Override public Response put(Entity entity) { return invokeWithRetry(() -> delegate.put(entity)); } + @Override public T put(Entity entity, Class responseType) { return invokeWithRetry(() -> delegate.put(entity, responseType)); } + @Override public T put(Entity entity, GenericType responseType) { return invokeWithRetry(() -> delegate.put(entity, responseType)); } + + @Override public Response post(Entity entity) { return invokeWithRetry(() -> delegate.post(entity)); } + @Override public T post(Entity entity, Class responseType) { return invokeWithRetry(() -> delegate.post(entity, responseType)); } + @Override public T post(Entity entity, GenericType responseType) { return invokeWithRetry(() -> delegate.post(entity, responseType)); } + + @Override public Response delete() { return invokeWithRetry(delegate::delete); } + @Override public T delete(Class responseType) { return invokeWithRetry(() -> delegate.delete(responseType)); } + @Override public T delete(GenericType responseType) { return invokeWithRetry(() -> delegate.delete(responseType)); } + + @Override public Response head() { return invokeWithRetry(delegate::head); } + @Override public Response options() { return invokeWithRetry(delegate::options); } + @Override public T options(Class responseType) { return invokeWithRetry(() -> delegate.options(responseType)); } + @Override public T options(GenericType responseType) { return invokeWithRetry(() -> delegate.options(responseType)); } + + @Override public Response trace() { return invokeWithRetry(delegate::trace); } + @Override public T trace(Class responseType) { return invokeWithRetry(() -> delegate.trace(responseType)); } + @Override public T trace(GenericType responseType) { return invokeWithRetry(() -> delegate.trace(responseType)); } + + @Override public Response method(String name) { return invokeWithRetry(() -> delegate.method(name)); } + @Override public T method(String name, Class responseType) { return invokeWithRetry(() -> delegate.method(name, responseType)); } + @Override public T method(String name, GenericType responseType) { return invokeWithRetry(() -> delegate.method(name, responseType)); } + @Override public Response method(String name, Entity entity) { return invokeWithRetry(() -> delegate.method(name, entity)); } + @Override public T method(String name, Entity entity, Class responseType) { return invokeWithRetry(() -> delegate.method(name, entity, responseType)); } + @Override public T method(String name, Entity entity, GenericType responseType) { return invokeWithRetry(() -> delegate.method(name, entity, responseType)); } + + // Delegate non-invocation methods directly + @Override public Invocation build(String method) { return delegate.build(method); } + @Override public Invocation build(String method, Entity entity) { return delegate.build(method, entity); } + @Override public Invocation buildGet() { return delegate.buildGet(); } + @Override public Invocation buildDelete() { return delegate.buildDelete(); } + @Override public Invocation buildPost(Entity entity) { return delegate.buildPost(entity); } + @Override public Invocation buildPut(Entity entity) { return delegate.buildPut(entity); } + + @Override public AsyncInvoker async() { return delegate.async(); } + @Override public CompletionStageRxInvoker rx() { return delegate.rx(); } + @Override public T rx(Class clazz) { return delegate.rx(clazz); } + + @Override public Invocation.Builder accept(String... mediaTypes) { delegate.accept(mediaTypes); return this; } + @Override public Invocation.Builder accept(MediaType... mediaTypes) { delegate.accept(mediaTypes); return this; } + @Override public Invocation.Builder acceptLanguage(Locale... locales) { delegate.acceptLanguage(locales); return this; } + @Override public Invocation.Builder acceptLanguage(String... locales) { delegate.acceptLanguage(locales); return this; } + @Override public Invocation.Builder acceptEncoding(String... encodings) { delegate.acceptEncoding(encodings); return this; } + @Override public Invocation.Builder cookie(Cookie cookie) { delegate.cookie(cookie); return this; } + @Override public Invocation.Builder cookie(String name, String value) { delegate.cookie(name, value); return this; } + @Override public Invocation.Builder cacheControl(CacheControl cacheControl) { delegate.cacheControl(cacheControl); return this; } + @Override public Invocation.Builder header(String name, Object value) { delegate.header(name, value); return this; } + @Override public Invocation.Builder headers(MultivaluedMap headers) { delegate.headers(headers); return this; } + @Override public Invocation.Builder property(String name, Object value) { delegate.property(name, value); return this; } + +} diff --git a/src/main/java/com/atomgraph/linkeddatahub/model/impl/ServiceImpl.java b/src/main/java/com/atomgraph/linkeddatahub/model/impl/ServiceImpl.java index c6c03f80e..77cc8585b 100644 --- a/src/main/java/com/atomgraph/linkeddatahub/model/impl/ServiceImpl.java +++ b/src/main/java/com/atomgraph/linkeddatahub/model/impl/ServiceImpl.java @@ -17,9 +17,7 @@ package com.atomgraph.linkeddatahub.model.impl; import com.atomgraph.core.MediaTypes; -import com.atomgraph.core.client.GraphStoreClient; import com.atomgraph.core.client.QuadStoreClient; -import com.atomgraph.core.client.SPARQLClient; import com.atomgraph.core.model.DatasetAccessor; import com.atomgraph.core.model.DatasetQuadAccessor; import com.atomgraph.core.model.EndpointAccessor; @@ -28,6 +26,8 @@ import com.atomgraph.core.model.impl.remote.EndpointAccessorImpl; import com.atomgraph.core.vocabulary.A; import com.atomgraph.core.vocabulary.SD; +import com.atomgraph.linkeddatahub.client.GraphStoreClient; +import com.atomgraph.linkeddatahub.client.SPARQLClient; import com.atomgraph.linkeddatahub.model.Service; import com.atomgraph.linkeddatahub.vocabulary.LAPP; import java.net.URI; @@ -131,11 +131,13 @@ public SPARQLClient getSPARQLClient() public SPARQLClient getSPARQLClient(WebTarget webTarget) { SPARQLClient sparqlClient; - + final long defaultDelayMillis = 1000L; + final int maxRetryCount = 3; + if (getMaxGetRequestSize() != null) - sparqlClient = SPARQLClient.create(getMediaTypes(), webTarget, getMaxGetRequestSize()); + sparqlClient = SPARQLClient.create(getMediaTypes(), webTarget, getMaxGetRequestSize(), defaultDelayMillis, maxRetryCount); else - sparqlClient = SPARQLClient.create(getMediaTypes(), webTarget); + sparqlClient = SPARQLClient.create(getMediaTypes(), webTarget, defaultDelayMillis, maxRetryCount); if (getAuthUser() != null && getAuthPwd() != null) { @@ -156,7 +158,7 @@ public EndpointAccessor getEndpointAccessor() } @Override - public GraphStoreClient getGraphStoreClient() + public com.atomgraph.core.client.GraphStoreClient getGraphStoreClient() { return getGraphStoreClient(getClient().target(getProxiedURI(URI.create(getGraphStore().getURI())))); } @@ -169,8 +171,10 @@ public GraphStoreClient getGraphStoreClient() */ public GraphStoreClient getGraphStoreClient(WebTarget webTarget) { - GraphStoreClient graphStoreClient = GraphStoreClient.create(webTarget); - + final long defaultDelayMillis = 1000L; + final int maxRetryCount = 3; + GraphStoreClient graphStoreClient = GraphStoreClient.create(getMediaTypes(), webTarget, defaultDelayMillis, maxRetryCount); + if (getAuthUser() != null && getAuthPwd() != null) { HttpAuthenticationFeature authFeature = HttpAuthenticationFeature.basicBuilder().