Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,17 @@
import com.predic8.membrane.core.http.*;
import com.predic8.membrane.core.interceptor.*;
import com.predic8.membrane.core.interceptor.Interceptor.*;
import com.predic8.membrane.core.interceptor.balancer.ExchangeNodeStatusTracker;
import com.predic8.membrane.core.interceptor.balancer.*;
import com.predic8.membrane.core.model.*;
import com.predic8.membrane.core.proxies.Proxy;
import com.predic8.membrane.core.proxies.*;
import com.predic8.membrane.core.proxies.Proxy;
import org.slf4j.*;

import java.io.*;
import java.net.*;
import java.text.*;
import java.util.*;

import static com.predic8.membrane.core.exchange.Exchange.TRACK_NODE_STATUS;
import static com.predic8.membrane.core.exchange.ExchangeState.*;
import static java.lang.Boolean.TRUE;

public abstract class AbstractExchange {
private static final Logger log = LoggerFactory.getLogger(AbstractExchange.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,12 @@ public class Exchange extends AbstractExchange {
* compatibility (i.e. for Java's internal HTTP client).
*/
public static final String ALLOW_H2 = "membrane.use.h2";
public static final String TRACK_NODE_STATUS = "membrane.track.node.status";
public static final String SSL_CONTEXT = "membrane.ssl.context";
public static final String OAUTH2 = "membrane.oauth2";
public static final String SNI_SERVER_NAME = "membrane.sni.server.name";
public static final String WS_ORIGINAL_EXCHANGE = "membrane.ws.original.exchange";
public static final String SECURITY_SCHEMES = "membrane.security.schemes";

private static final Logger log = LoggerFactory.getLogger(Exchange.class.getName());

private AbstractHttpHandler handler;

private String originalHostHeader = "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@

package com.predic8.membrane.core.interceptor.balancer;

import static com.predic8.membrane.core.exchange.Exchange.TRACK_NODE_STATUS;
import static java.lang.Boolean.TRUE;

/**
* Used by the {@link LoadBalancingInterceptor} to track the status of nodes during load balancing.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ public class Connection implements Closeable, MessageObserver, NonRelevantBodyOb
private Exchange exchange;
private boolean keepAttachedToExchange;

/**
* If the connection is tunneled through a proxy and the connection to the real backend uses TLS
*/
private boolean tunneled;

public static Connection open(String host, int port, String localHost, SSLProvider sslProvider, int connectTimeout) throws IOException {
return open(host, port, localHost, sslProvider, null, connectTimeout);
}
Expand Down Expand Up @@ -120,18 +125,18 @@ public static Connection open(String host, int port, String localHost, SSLProvid

log.debug("Opened connection on localPort: {}", con.socket.getLocalPort());

setupStreams(con);
con.setupStreams();
return con;
}

private static void setupStreams(Connection con) throws IOException {
private void setupStreams() throws IOException {
if (ByteStreamLogging.isLoggingEnabled()) {
String connectionName = chooseNewConnectionName();
con.out = new BufferedOutputStream(wrapConnectionOutputStream(con.socket.getOutputStream(), connectionName + " out"), BUFFER_SIZE);
con.in = new BufferedInputStream(wrapConnectionInputStream(con.socket.getInputStream(), connectionName + " in"), BUFFER_SIZE);
out = new BufferedOutputStream(wrapConnectionOutputStream(socket.getOutputStream(), connectionName + " out"), BUFFER_SIZE);
in = new BufferedInputStream(wrapConnectionInputStream(socket.getInputStream(), connectionName + " in"), BUFFER_SIZE);
} else {
con.out = new BufferedOutputStream(con.socket.getOutputStream(), BUFFER_SIZE);
con.in = new BufferedInputStream(con.socket.getInputStream(), BUFFER_SIZE);
out = new BufferedOutputStream(socket.getOutputStream(), BUFFER_SIZE);
in = new BufferedInputStream(socket.getInputStream(), BUFFER_SIZE);
}
}

Expand Down Expand Up @@ -417,15 +422,17 @@ private void doTunnelHandshake(ProxyConfiguration proxy, Socket tunnel, String h
throw new UnableToTunnelException("Unable to tunnel through %s:%d. Proxy returns '%s'".formatted(proxy.getHost(), proxy.getPort(), replyStr));
}

/* tunneling Handshake was successful! */
/* tunneling Handshake was successful! */
tunneled = true;
}

private static @NotNull String getHostString(ProxyConfiguration proxy) {
return proxy.getHost() + ((proxy.isAuthentication()) ? " authenticated" : "");
}

private static byte @NotNull [] createConnectMessage(ProxyConfiguration proxy, String host, int port) {
var msg = "CONNECT %s:%d HTTP/1.0\r\nUser-Agent: %s\r\n%s\r\n".formatted(host, port, USERAGENT, getProxyAuthenticationHeader(proxy));
var msg = "CONNECT %s:%d HTTP/1.0\r\nUser-Agent: %s\r\n%s\r\n"
.formatted(host, port, USERAGENT, getProxyAuthenticationHeader(proxy));
byte[] b;
try {
/*
Expand All @@ -450,4 +457,11 @@ private void doTunnelHandshake(ProxyConfiguration proxy, Socket tunnel, String h
public SSLProvider getSslProvider() {
return sslProvider;
}

/**
* @return true If the connection is tunneled through a proxy and the connection to the real backend uses TLS
*/
public boolean isTunneled() {
return tunneled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,21 @@
limitations under the License. */
package com.predic8.membrane.core.transport.http;

import org.jetbrains.annotations.*;

import java.net.*;
import java.util.regex.*;

public record HostColonPort(boolean useSSL, String host, int port) {

private static final Pattern pattern = Pattern.compile("^(https?)://([^:/#]+)(?::(\\d+))?.*");

/**
*
* @param useSSL To compute default ports
* @param host Hostname, e.g. api.predic8.de, 127.0.0.1, [3:34:55]
* @param port port number
*/
public HostColonPort(boolean useSSL, String host, int port) {
this.useSSL = useSSL;
this.host = host;
Expand All @@ -30,6 +38,11 @@ public HostColonPort(boolean useSSL, String host, int port) {
}
}

/**
*
* @param useSSL To compute default ports
* @param hostAndPort e.g. api.predic8.de:443
*/
public HostColonPort(boolean useSSL, String hostAndPort) {
this(useSSL, hostPart(hostAndPort), portPart(hostAndPort, useSSL ? 443 : 80));
}
Expand All @@ -52,17 +65,17 @@ public URI toURI() throws URISyntaxException {
}

@Override
public String toString() {
public @NotNull String toString() {
return host + ":" + port;
}

private static String hostPart(String addr) {
var colon = addr.indexOf(":");
var colon = addr.lastIndexOf(":");
return (colon > -1 ? addr.substring(0, colon) : addr);
}

private static int portPart(String addr, int defaultValue) {
var colon = addr.indexOf(":");
var colon = addr.lastIndexOf(":");
return (colon > -1 ? Integer.parseInt(addr.substring(colon + 1)) : defaultValue);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
import com.predic8.membrane.core.transport.http.client.protocol.*;
import com.predic8.membrane.core.transport.http.streampump.*;
import com.predic8.membrane.core.util.*;
import org.jetbrains.annotations.*;
import org.slf4j.*;

import javax.annotation.*;
import javax.annotation.Nullable;
import java.io.*;
import java.net.*;

import static com.predic8.membrane.core.exchange.Exchange.*;
import static com.predic8.membrane.core.transport.http.client.protocol.Http2ProtocolHandler.*;
import static com.predic8.membrane.core.util.HttpUtil.*;

Expand Down Expand Up @@ -67,16 +67,18 @@ public HttpClient(@Nullable HttpClientConfiguration clientConfiguration, @Nullab
}

public void call(Exchange exc) throws Exception {
ProtocolHandler ph = protocolHandlerFactory.getHandler(exc, exc.getRequest().getHeader().getUpgradeProtocol());
var ph = protocolHandlerFactory.getHandler(exc, exc.getRequest().getHeader().getUpgradeProtocol());
ph.checkUpgradeRequest(exc);
configuration.getRetryHandler().executeWithRetries(exc, this::dispatchCall);
ph.cleanup(exc);
}

private boolean dispatchCall(Exchange exc, String target, int attempt) throws Exception {
HostColonPort hcp = initializeRequest(exc, target);

OutgoingConnectionType outConType = connectionFactory.getConnection(exc, hcp, attempt);
setAuthorizationHeader(exc);
var hcp = getHostColonPort(exc, target);
adjustHostHeader(exc, hcp);
var outConType = connectionFactory.getConnection(exc, hcp, attempt);
setRequestURI(exc.getRequest(), target, outConType.con());

if (configuration.getProxy() != null && outConType.sslProvider() == null) {
// if we use a proxy for a plain HTTP (=non-HTTPS) request, attach the proxy credentials.
Expand All @@ -89,38 +91,49 @@ private boolean dispatchCall(Exchange exc, String target, int attempt) throws Ex
exc.getNodeStatusTracker().setNodeStatusCode(attempt, exc.getResponse().getStatusCode());

// Check for protocol upgrades
String upgradedProtocol = exc.getProperty(UPGRADED_PROTOCOL, String.class);
var upgradedProtocol = exc.getProperty(UPGRADED_PROTOCOL, String.class);
if (upgradedProtocol == null)
return false;

log.debug("Upgrading to {}",upgradedProtocol);
log.debug("Upgrading to {}", upgradedProtocol);

StreamPump.setupConnectionForwarding(exc, outConType.con(), upgradedProtocol, streamPumpStats);
outConType.con().setExchange(exc);
return true;
}

HostColonPort initializeRequest(Exchange exc, String dest) throws IOException {
setRequestURI(exc.getRequest(), dest);
if (configuration.getAuthentication() != null)
exc.getRequest().getHeader().setAuthorization(configuration.getAuthentication().getUsername(), configuration.getAuthentication().getPassword());

HostColonPort target = getTargetHostAndPort(exc.getRequest().isCONNECTRequest(), dest);
protected void adjustHostHeader(Exchange exc, HostColonPort target) {
if (configuration.isAdjustHostHeader() && (exc.getProxy() == null || exc.getProxy().isTargetAdjustHostHeader())) {
exc.getRequest().getHeader().setHost(target.toString());
}
return target;
}

private void setAuthorizationHeader(Exchange exc) {
if (configuration.getAuthentication() != null)
exc.getRequest().getHeader().setAuthorization(configuration.getAuthentication().getUsername(), configuration.getAuthentication().getPassword());
}

public void setStreamPumpStats(StreamPump.StreamPumpStats streamPumpStats) {
this.streamPumpStats = streamPumpStats;
/**
* @param exc Exchange
* @param dest URL for normal requests and host:port for CONNECT requests
* @return HostColonPort
*/
protected @NotNull HostColonPort getHostColonPort(Exchange exc, String dest) throws MalformedURLException {
if (exc.getRequest().isCONNECTRequest())
return new HostColonPort(false, dest);
Comment thread
predic8 marked this conversation as resolved.

try {
return HostColonPort.parse(dest);
} catch (MalformedURLException e) {
throw new MalformedURLException("""
The exchange's destination URI %s is not valid. Specify a 'target' within
the API configuration or make sure the exchanges destinations list contains a valid URI.
""".formatted(dest));
Comment thread
predic8 marked this conversation as resolved.
}
}

private static boolean trackNodeStatus(Exchange exc) {
if (exc.getProperty(TRACK_NODE_STATUS) instanceof Boolean status)
return status;
return false;
public void setStreamPumpStats(StreamPump.StreamPumpStats streamPumpStats) {
this.streamPumpStats = streamPumpStats;
}

@Override
Expand All @@ -134,30 +147,31 @@ public ConnectionFactory getConnectionFactory() {
return connectionFactory;
}

void setRequestURI(Request req, String dest) throws MalformedURLException {
void setRequestURI(Request req, String dest, @NotNull Connection con) throws MalformedURLException {
// Only on none TLS connections
if (req.isCONNECTRequest()) {
req.setUri(getHostColonPort(dest));
return;
}

// Use complete URL with protocol and host. The proxy needs to know where to forward
if (configuration.getProxy() != null || req.isCONNECTRequest()) {
if (configuration.getProxy() != null && !con.isTunneled()) {
req.setUri(dest);
return;
}
if (!dest.startsWith("http")) {
throw new MalformedURLException("""
The exchange's destination URI %s does not start with 'http'. Specify a <target> within the API configuration or make sure the exchanges destinations list contains a valid URI.
The exchange's destination URI %s does not start with 'http'. Specify a 'target' within
the API configuration or make sure the exchanges destinations list contains a valid URI.
""".formatted(dest));
}
req.setUri(getPathAndQueryString(dest));
}

/**
* @param connect If true, do not use TLS even when the URL starts with https
* @param dest URL
* @return HostColonPort
* @throws MalformedURLException
*/
private HostColonPort getTargetHostAndPort(boolean connect, String dest) throws MalformedURLException {
if (connect)
return new HostColonPort(false, dest);
return HostColonPort.parse(dest);
protected static @NotNull String getHostColonPort(String dest) throws MalformedURLException {
return dest.startsWith("http")
? HostColonPort.parse(dest).toString()
: dest;
}

// TODO Rewrite all clients to use try with resources and then remove it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void executeWithRetries(Exchange exc, RetryableCall call) throws Exceptio
Exception exceptionInLastCall = null;
double currentDelay = delay;
for (int attempt = 0; attempt <= retries; attempt++) {
String dest = getDestination(exc, attempt);
var dest = getDestination(exc, attempt);
log.debug("Attempt #{} from #{} to {}", attempt, retries + 1, dest);
try {
if (call.execute(exc, dest, attempt)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
/* Copyright 2026 predic8 GmbH, www.predic8.com

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

package com.predic8.membrane.core.interceptor.flow;

import com.predic8.membrane.core.interceptor.log.*;
Expand Down
Loading
Loading