Skip to content

Commit c0f2c9b

Browse files
committed
feat(extstore): add aws s3 driver.
1 parent f434a51 commit c0f2c9b

13 files changed

Lines changed: 1279 additions & 0 deletions

File tree

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
description = '''Temporal Java SDK External Storage Driver for AWS S3'''
2+
3+
ext {
4+
awsSdkVersion = '2.31.0'
5+
}
6+
7+
dependencies {
8+
compileOnly project(':temporal-serviceclient')
9+
compileOnly project(':temporal-sdk')
10+
11+
api platform("software.amazon.awssdk:bom:$awsSdkVersion")
12+
api "software.amazon.awssdk:s3"
13+
14+
testImplementation project(':temporal-serviceclient')
15+
testImplementation project(':temporal-sdk')
16+
testImplementation "junit:junit:${junitVersion}"
17+
testRuntimeOnly group: 'ch.qos.logback', name: 'logback-classic', version: "${logbackVersion}"
18+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package io.temporal.payload.storage.s3;
2+
3+
import io.temporal.api.common.v1.Payload;
4+
import io.temporal.common.Experimental;
5+
import io.temporal.payload.storage.StorageDriverStoreContext;
6+
import javax.annotation.Nonnull;
7+
8+
/**
9+
* Resolves the target S3 bucket for a payload. Use {@link
10+
* S3StorageDriver.Builder#setBucket(String)} for a fixed bucket, or supply a resolver via {@link
11+
* S3StorageDriver.Builder#setBucketResolver(BucketResolver)} to choose a bucket per payload.
12+
*/
13+
@Experimental
14+
@FunctionalInterface
15+
public interface BucketResolver {
16+
@Nonnull
17+
String resolveBucket(@Nonnull StorageDriverStoreContext context, @Nonnull Payload payload);
18+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package io.temporal.payload.storage.s3;
2+
3+
import java.util.ArrayList;
4+
import java.util.List;
5+
import java.util.concurrent.CompletableFuture;
6+
7+
final class CompletableFutures {
8+
private CompletableFutures() {}
9+
10+
static <T> CompletableFuture<List<T>> allOf(List<CompletableFuture<T>> futures) {
11+
return CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0]))
12+
.thenApply(
13+
ignored -> {
14+
List<T> results = new ArrayList<>(futures.size());
15+
for (CompletableFuture<T> future : futures) {
16+
results.add(future.join());
17+
}
18+
return results;
19+
});
20+
}
21+
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
# AWS S3 Driver
2+
3+
Temporal's S3 Driver for External Storage. Uses the official [AWS S3 Java SDK](https://github.com/aws/aws-sdk-java-v2).
4+
5+
## Usage
6+
7+
Construct the S3 storage driver:
8+
9+
```java
10+
import io.temporal.payload.storage.s3.S3AsyncClientAdapter;
11+
import io.temporal.payload.storage.s3.S3StorageDriver;
12+
import software.amazon.awssdk.regions.Region;
13+
import software.amazon.awssdk.services.s3.S3AsyncClient;
14+
15+
S3AsyncClient s3Client =
16+
S3AsyncClient.builder().region(Region.US_EAST_1).build();
17+
18+
S3StorageDriver driver =
19+
S3StorageDriver.newBuilder()
20+
.setClient(new S3AsyncClientAdapter(s3Client))
21+
.setBucket("temporal-payloads")
22+
.build();
23+
```
24+
25+
Register the driver in external storage config:
26+
27+
```java
28+
import io.temporal.payload.storage.ExternalStorage;
29+
30+
ExternalStorage externalStorage =
31+
ExternalStorage.newBuilder()
32+
.setDriver(driver)
33+
.build();
34+
```
35+
36+
Use `setBucketResolver(...)` instead of `setBucket(...)` when bucket selection must vary per
37+
payload.
38+
39+
## S3 Storage Key Specification
40+
41+
All Temporal S3 drivers generate S3 keys in a consistent manner.
42+
43+
### Key format
44+
45+
Workflow key:
46+
```text
47+
v0/ns/{namespace}/wt/{workflow-type}/wi/{workflow-id}/ri/{run-id}/d/{hash-algorithm}/{hex-digest}
48+
```
49+
50+
Activity key:
51+
```text
52+
v0/ns/{namespace}/at/{activity-type}/ai/{activity-id}/ri/{run-id}/d/{hash-algorithm}/{hex-digest}
53+
```
54+
55+
Fallback key (unknown target):
56+
```text
57+
v0/d/{hash-algorithm}/{hex-digest}
58+
```
59+
60+
- If no namespace, workflow, or activity information is available, the fallback is used.
61+
- Dynamic path segments are percent-encoded (rules below).
62+
- Missing values (including a missing `run-id`) are encoded as `null`.
63+
- `hex-digest` is lower-case SHA-256 hex (64 characters).
64+
65+
### Percent-encoding rules
66+
67+
1. Treat each key path component as UTF-8 bytes.
68+
2. Leave ASCII letters and digits unescaped.
69+
3. Leave the following ASCII characters unescaped: `- _ . ~ $ & + : = @`
70+
4. Encode all other bytes as % followed by two uppercase hexadecimal digits.
71+
5. Empty or null values are encoded as the literal string `null`.
72+
6. This is path-segment escaping, not form encoding (`+` stays `+`).
73+
74+
### Examples
75+
76+
Workflow key example:
77+
78+
```text
79+
input:
80+
namespace=payments prod
81+
workflow-type=ChargeWorkflow
82+
workflow-id=order+123=abc
83+
run-id=3f1d6c7a-8b2e-4f7a-9d0a-87a6f95e4d31
84+
hash-algorithm=sha256
85+
hex-digest=9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08
86+
87+
output:
88+
v0/ns/payments%20prod/wt/ChargeWorkflow/wi/order+123=abc/ri/3f1d6c7a-8b2e-4f7a-9d0a-87a6f95e4d31/d/sha256/9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08
89+
```
90+
91+
Activity key example:
92+
93+
```text
94+
input:
95+
namespace=payments prod
96+
activity-type=Capture/Charge
97+
activity-id=activity id+42
98+
run-id=9e1d1fd9-2f8a-4c40-93e2-731f31b9268b
99+
hash-algorithm=sha256
100+
hex-digest=2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824
101+
102+
output:
103+
v0/ns/payments%20prod/at/Capture%2FCharge/ai/activity%20id+42/ri/9e1d1fd9-2f8a-4c40-93e2-731f31b9268b/d/sha256/2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824
104+
```
105+
106+
Fallback key example:
107+
108+
```text
109+
input:
110+
hash-algorithm=sha256
111+
hex-digest=486ea46224d1bb4fb680f34f7c9ad96a8f24ec88be73ea8e5a6c65260e9cb8a7
112+
113+
output:
114+
v0/d/sha256/486ea46224d1bb4fb680f34f7c9ad96a8f24ec88be73ea8e5a6c65260e9cb8a7
115+
```
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package io.temporal.payload.storage.s3;
2+
3+
import io.temporal.common.Experimental;
4+
import java.util.Collections;
5+
import java.util.Map;
6+
import java.util.Objects;
7+
import java.util.concurrent.CompletableFuture;
8+
import java.util.concurrent.CompletionException;
9+
import javax.annotation.Nonnull;
10+
import software.amazon.awssdk.core.async.AsyncRequestBody;
11+
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
12+
import software.amazon.awssdk.regions.Region;
13+
import software.amazon.awssdk.services.s3.S3AsyncClient;
14+
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
15+
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
16+
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
17+
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
18+
import software.amazon.awssdk.services.s3.model.S3Exception;
19+
20+
/**
21+
* {@link S3Client} backed by the AWS SDK for Java v2 {@link S3AsyncClient}. The wrapped client must
22+
* be configured with credentials and a region by the caller.
23+
*/
24+
@Experimental
25+
public final class S3AsyncClientAdapter implements S3Client {
26+
private final S3AsyncClient client;
27+
28+
public S3AsyncClientAdapter(@Nonnull S3AsyncClient client) {
29+
this.client = Objects.requireNonNull(client, "client");
30+
}
31+
32+
@Nonnull
33+
@Override
34+
public CompletableFuture<Void> putObject(
35+
@Nonnull String bucket, @Nonnull String key, @Nonnull byte[] data) {
36+
// fromBytesUnsafe avoids a defensive copy of data; the driver never mutates it after this call.
37+
return client
38+
.putObject(
39+
PutObjectRequest.builder().bucket(bucket).key(key).build(),
40+
AsyncRequestBody.fromBytesUnsafe(data))
41+
.thenApply(response -> (Void) null);
42+
}
43+
44+
@Nonnull
45+
@Override
46+
public CompletableFuture<Boolean> objectExists(@Nonnull String bucket, @Nonnull String key) {
47+
return client
48+
.headObject(HeadObjectRequest.builder().bucket(bucket).key(key).build())
49+
.handle(
50+
(response, ex) -> {
51+
if (ex == null) {
52+
return true;
53+
}
54+
Throwable cause =
55+
(ex instanceof CompletionException && ex.getCause() != null) ? ex.getCause() : ex;
56+
if (cause instanceof NoSuchKeyException) {
57+
return false;
58+
}
59+
if (cause instanceof S3Exception && ((S3Exception) cause).statusCode() == 404) {
60+
return false;
61+
}
62+
if (cause instanceof RuntimeException) {
63+
throw (RuntimeException) cause;
64+
}
65+
throw new RuntimeException(cause);
66+
});
67+
}
68+
69+
@Nonnull
70+
@Override
71+
public CompletableFuture<byte[]> getObject(@Nonnull String bucket, @Nonnull String key) {
72+
return client
73+
.getObject(
74+
GetObjectRequest.builder().bucket(bucket).key(key).build(),
75+
AsyncResponseTransformer.toBytes())
76+
// asByteArrayUnsafe avoids a copy; the driver only reads the bytes (hash + parse).
77+
.thenApply(response -> response.asByteArrayUnsafe());
78+
}
79+
80+
@Nonnull
81+
@Override
82+
public Map<String, String> describe() {
83+
Region region = client.serviceClientConfiguration().region();
84+
if (region == null) {
85+
return Collections.emptyMap();
86+
}
87+
return Collections.singletonMap("client_region", region.id());
88+
}
89+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package io.temporal.payload.storage.s3;
2+
3+
import io.temporal.common.Experimental;
4+
import java.util.Collections;
5+
import java.util.Map;
6+
import java.util.concurrent.CompletableFuture;
7+
import javax.annotation.Nonnull;
8+
9+
/** Interface for S3 {@link S3StorageDriver} operations: upload, existence check, and download. */
10+
@Experimental
11+
public interface S3Client {
12+
/**
13+
* Uploads {@code data} to the given {@code bucket} and {@code key}, overwriting any existing
14+
* object at that key. Implementations must be safe to call concurrently for different keys.
15+
*/
16+
@Nonnull
17+
CompletableFuture<Void> putObject(
18+
@Nonnull String bucket, @Nonnull String key, @Nonnull byte[] data);
19+
20+
/**
21+
* Reports whether an object exists at the given {@code bucket} and {@code key}. The future
22+
* completes with {@code false} when the object is absent, and completes exceptionally when
23+
* existence cannot be determined (e.g. a network or permission failure).
24+
*/
25+
@Nonnull
26+
CompletableFuture<Boolean> objectExists(@Nonnull String bucket, @Nonnull String key);
27+
28+
/**
29+
* Downloads the bytes stored at the given {@code bucket} and {@code key}. The future completes
30+
* exceptionally if the object does not exist.
31+
*/
32+
@Nonnull
33+
CompletableFuture<byte[]> getObject(@Nonnull String bucket, @Nonnull String key);
34+
35+
/**
36+
* Diagnostic metadata about the client configuration, such as {@code {"client_region":
37+
* "us-west-2"}}, that the driver appends to error messages. Returns an empty map by default.
38+
*/
39+
@Nonnull
40+
default Map<String, String> describe() {
41+
return Collections.emptyMap();
42+
}
43+
}

0 commit comments

Comments
 (0)