Skip to content

Commit 150385c

Browse files
authored
Merge pull request #4 from danube-messaging/security_update
replace api key with jwt token for secure connection
2 parents 821e67c + 731c280 commit 150385c

8 files changed

Lines changed: 106 additions & 102 deletions

File tree

danube-client-proto/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<parent>
88
<groupId>com.danube-messaging</groupId>
99
<artifactId>danube-java</artifactId>
10-
<version>0.3.0</version>
10+
<version>0.4.0</version>
1111
<relativePath>../pom.xml</relativePath>
1212
</parent>
1313

danube-client-proto/src/main/proto/DanubeApi.proto

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -209,20 +209,4 @@ message HealthCheckResponse {
209209
CLOSE = 1;
210210
}
211211
ClientStatus status = 1;
212-
}
213-
214-
// ============================================================================================
215-
216-
service AuthService {
217-
rpc Authenticate (AuthRequest) returns (AuthResponse);
218-
}
219-
220-
message AuthRequest {
221-
string api_key = 1;
222-
}
223-
224-
message AuthResponse {
225-
string token = 1;
226-
}
227-
228-
// ============================================================================================
212+
}

danube-client/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<parent>
88
<groupId>com.danube-messaging</groupId>
99
<artifactId>danube-java</artifactId>
10-
<version>0.3.0</version>
10+
<version>0.4.0</version>
1111
<relativePath>../pom.xml</relativePath>
1212
</parent>
1313

danube-client/src/main/java/com/danubemessaging/client/DanubeClientBuilder.java

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import java.nio.file.Path;
1212
import java.util.Optional;
1313
import java.util.concurrent.Executors;
14+
import java.util.function.Supplier;
1415

1516
/**
1617
* Builder for {@link DanubeClient}.
@@ -28,7 +29,8 @@ public final class DanubeClientBuilder {
2829
private Path caCertPath;
2930
private Path clientCertPath;
3031
private Path clientKeyPath;
31-
private String apiKey;
32+
private String token;
33+
private Supplier<String> tokenSupplier;
3234

3335
DanubeClientBuilder() {
3436
}
@@ -71,15 +73,42 @@ public DanubeClientBuilder withMutualTls(Path caCertPath, Path clientCertPath, P
7173
}
7274

7375
/**
74-
* Enables JWT authentication using an API key.
75-
* The client exchanges the API key for a bearer token on first use and caches it
76-
* with automatic renewal (default token lifetime: 1 hour).
77-
* Calling this method also enables TLS automatically.
76+
* Sets the authentication token (JWT) for the client.
7877
*
79-
* @param apiKey the API key issued by the Danube broker
78+
* <p>Use {@code danube-admin security tokens create} to generate a token.
79+
* Automatically enables TLS. If no TLS config has been set via
80+
* {@link #withTls} or {@link #withMutualTls}, a default TLS config using
81+
* system root certificates is applied.
82+
*
83+
* <p>For tokens that expire, consider {@link #withTokenSupplier} instead,
84+
* which allows runtime token refresh.
85+
*
86+
* @param token the JWT token
8087
*/
81-
public DanubeClientBuilder withApiKey(String apiKey) {
82-
this.apiKey = apiKey;
88+
public DanubeClientBuilder withToken(String token) {
89+
this.token = token;
90+
this.useTls = true;
91+
return this;
92+
}
93+
94+
/**
95+
* Sets a dynamic token supplier for the client.
96+
*
97+
* <p>The supplier is called on <b>every gRPC request</b> to obtain the
98+
* current token, enabling runtime token refresh without restarting the
99+
* client. Useful for:
100+
* <ul>
101+
* <li>File-based tokens updated by infrastructure (K8s projected volumes)</li>
102+
* <li>Environment-based tokens</li>
103+
* <li>Custom refresh logic</li>
104+
* </ul>
105+
*
106+
* <p>Automatically enables TLS (same as {@link #withToken}).
107+
*
108+
* @param supplier a function that returns the current JWT token
109+
*/
110+
public DanubeClientBuilder withTokenSupplier(Supplier<String> supplier) {
111+
this.tokenSupplier = supplier;
83112
this.useTls = true;
84113
return this;
85114
}
@@ -91,14 +120,11 @@ public DanubeClient build() {
91120
Optional.ofNullable(caCertPath),
92121
Optional.ofNullable(clientCertPath),
93122
Optional.ofNullable(clientKeyPath),
94-
Optional.ofNullable(apiKey));
123+
Optional.ofNullable(token),
124+
Optional.ofNullable(tokenSupplier));
95125

96126
ConnectionManager connectionManager = new ConnectionManager(options);
97-
AuthService authService = new AuthService(connectionManager, options);
98-
99-
if (apiKey != null && !apiKey.isBlank()) {
100-
authService.authenticateClient(uri, apiKey);
101-
}
127+
AuthService authService = new AuthService(options);
102128

103129
LookupService lookupService = new LookupService(connectionManager, authService);
104130
RetryManager retryManager = new RetryManager(0, 0, 0);
Lines changed: 15 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,86 +1,44 @@
11
package com.danubemessaging.client.internal.auth;
22

3-
import com.danubemessaging.client.errors.DanubeClientException;
4-
import com.danubemessaging.client.internal.connection.ConnectionManager;
53
import com.danubemessaging.client.internal.connection.ConnectionOptions;
6-
import danube.AuthServiceGrpc;
7-
import danube.DanubeApi;
84
import io.grpc.Metadata;
95
import io.grpc.stub.AbstractStub;
106
import io.grpc.stub.MetadataUtils;
117
import java.net.URI;
12-
import java.time.Duration;
13-
import java.time.Instant;
14-
import java.util.concurrent.locks.ReentrantLock;
158

169
/**
17-
* Handles API-key authentication and bearer token caching.
10+
* Handles JWT token insertion into gRPC request metadata.
11+
*
12+
* <p>With JWT-first authentication, the client uses a pre-generated JWT token
13+
* (from {@code danube-admin security tokens create}) that is sent as
14+
* {@code Authorization: Bearer <token>} on every gRPC request.
1815
*/
1916
public final class AuthService {
20-
private static final Duration TOKEN_EXPIRY = Duration.ofHours(1);
2117
private static final Metadata.Key<String> AUTHORIZATION = Metadata.Key.of("authorization",
2218
Metadata.ASCII_STRING_MARSHALLER);
2319

24-
private final ConnectionManager connectionManager;
2520
private final ConnectionOptions connectionOptions;
26-
private final ReentrantLock tokenLock = new ReentrantLock();
2721

28-
private volatile String token;
29-
private volatile Instant tokenExpiry;
30-
31-
public AuthService(ConnectionManager connectionManager, ConnectionOptions connectionOptions) {
32-
this.connectionManager = connectionManager;
22+
public AuthService(ConnectionOptions connectionOptions) {
3323
this.connectionOptions = connectionOptions;
3424
}
3525

36-
public String authenticateClient(URI address, String apiKey) {
37-
var grpcConnection = connectionManager.getConnection(address, address);
38-
var client = AuthServiceGrpc.newBlockingStub(grpcConnection.grpcChannel());
39-
var request = DanubeApi.AuthRequest.newBuilder().setApiKey(apiKey).build();
40-
41-
try {
42-
var response = client.authenticate(request);
43-
cacheToken(response.getToken());
44-
return response.getToken();
45-
} catch (Exception e) {
46-
throw new DanubeClientException("Authentication failed", e);
47-
}
48-
}
49-
50-
public String getValidToken(URI address, String apiKey) {
51-
String currentToken = token;
52-
Instant expiry = tokenExpiry;
53-
if (currentToken != null && expiry != null && Instant.now().isBefore(expiry)) {
54-
return currentToken;
55-
}
56-
57-
tokenLock.lock();
58-
try {
59-
currentToken = token;
60-
expiry = tokenExpiry;
61-
if (currentToken != null && expiry != null && Instant.now().isBefore(expiry)) {
62-
return currentToken;
63-
}
64-
return authenticateClient(address, apiKey);
65-
} finally {
66-
tokenLock.unlock();
67-
}
68-
}
69-
26+
/**
27+
* Inserts the Bearer token header into the given metadata if a token is
28+
* configured (static or via supplier).
29+
*/
7030
public void insertTokenIfNeeded(Metadata metadata, URI address) {
7131
connectionOptions
72-
.apiKey()
73-
.ifPresent(apiKey -> metadata.put(AUTHORIZATION, "Bearer " + getValidToken(address, apiKey)));
32+
.resolveToken()
33+
.ifPresent(token -> metadata.put(AUTHORIZATION, "Bearer " + token));
7434
}
7535

36+
/**
37+
* Returns the stub with auth headers attached if a token is configured.
38+
*/
7639
public <T extends AbstractStub<T>> T attachAuthIfNeeded(T stub, URI address) {
7740
Metadata metadata = new Metadata();
7841
insertTokenIfNeeded(metadata, address);
7942
return stub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata));
8043
}
81-
82-
private void cacheToken(String tokenValue) {
83-
token = tokenValue;
84-
tokenExpiry = Instant.now().plus(TOKEN_EXPIRY);
85-
}
8644
}

danube-client/src/main/java/com/danubemessaging/client/internal/connection/ConnectionOptions.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,27 +2,54 @@
22

33
import java.nio.file.Path;
44
import java.util.Optional;
5+
import java.util.function.Supplier;
56

67
/**
78
* Connection settings used when opening gRPC channels.
9+
*
10+
* @param useTls whether to use TLS for the connection
11+
* @param caCertPath optional path to the CA certificate file (PEM)
12+
* @param clientCertPath optional path to the client certificate file (mTLS)
13+
* @param clientKeyPath optional path to the client private key file (mTLS)
14+
* @param token optional static JWT token for authentication
15+
* @param tokenSupplier optional dynamic token supplier called per-request
816
*/
917
public record ConnectionOptions(
1018
boolean useTls,
1119
Optional<Path> caCertPath,
1220
Optional<Path> clientCertPath,
1321
Optional<Path> clientKeyPath,
14-
Optional<String> apiKey) {
22+
Optional<String> token,
23+
Optional<Supplier<String>> tokenSupplier) {
1524

1625
public static ConnectionOptions plainText() {
1726
return new ConnectionOptions(
1827
false,
1928
Optional.empty(),
2029
Optional.empty(),
2130
Optional.empty(),
31+
Optional.empty(),
2232
Optional.empty());
2333
}
2434

2535
public boolean isMutualTls() {
2636
return clientCertPath.isPresent() && clientKeyPath.isPresent();
2737
}
38+
39+
/**
40+
* Resolves the current token. If a supplier is set, calls it to get a
41+
* fresh token (enabling runtime rotation). Otherwise falls back to the
42+
* static token.
43+
*
44+
* @return the resolved token, or empty if no authentication is configured
45+
*/
46+
public Optional<String> resolveToken() {
47+
if (tokenSupplier.isPresent()) {
48+
String supplied = tokenSupplier.get().get();
49+
if (supplied != null && !supplied.isBlank()) {
50+
return Optional.of(supplied);
51+
}
52+
}
53+
return token.filter(t -> !t.isBlank());
54+
}
2855
}

docker/danube_broker.yml

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,18 +28,27 @@ bootstrap_namespaces:
2828
# Allow producers to auto-create topics when missing
2929
auto_create_topics: true
3030

31+
# Enable TLS on the admin API (default: false).
32+
# Set to true for remote cluster management over untrusted networks.
33+
# When true, uses the same cert/key from auth.tls below.
34+
# admin_tls: false
35+
3136
# Security Configuration
37+
# mode: none — no authentication, no encryption (development/testing only)
38+
# mode: tls — full security: TLS + JWT for clients, mTLS for inter-broker & Raft
39+
# For production with TLS enabled, see config/danube_broker_secure.yml
3240
auth:
33-
mode: none # Options: none, tls, tlswithjwt
34-
# tls:
35-
# cert_file: "./cert/server-cert.pem"
36-
# key_file: "./cert/server-key.pem"
37-
# ca_file: "./cert/ca-cert.pem"
38-
# verify_client: false
39-
# jwt:
40-
# secret_key: "your-secret-key"
41-
# issuer: "danube-auth"
42-
# expiration_time: 3600 # in seconds
41+
mode: none
42+
# tls:
43+
# cert_file: "./cert/server-cert.pem"
44+
# key_file: "./cert/server-key.pem"
45+
# ca_file: "./cert/ca-cert.pem"
46+
# jwt:
47+
# secret_key: "your-secret-key"
48+
# issuer: "danube-auth"
49+
# expiration_time: 3600
50+
# super_admins:
51+
# - "admin"
4352

4453
# Load Manager Configuration (Automated Proactive Rebalancing)
4554
load_manager:

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>com.danube-messaging</groupId>
88
<artifactId>danube-java</artifactId>
9-
<version>0.3.0</version>
9+
<version>0.4.0</version>
1010
<packaging>pom</packaging>
1111
<name>Danube Java</name>
1212
<description>Danube Java client multi-module build</description>

0 commit comments

Comments
 (0)