Skip to content

Commit a9393cf

Browse files
committed
Add FFM (Foreign Function & Memory) hot-path support for S3 upload/download
Adds a useFFM flag to S3MetaRequestOptions that activates zero-copy callbacks on the two performance-critical paths: Download (s_on_s3_meta_request_body_callback): - JNI path: allocates a byte[] and copies every downloaded chunk into it - FFM path: passes the raw native pointer + length as jlong primitives; Java wraps it as a MemorySegment (zero-copy) via onResponseBodyFFM() Upload (s_aws_input_stream_read): - JNI path: creates a DirectByteBuffer wrapper object per part, reads back position() via a second JNI call - FFM path: passes raw pointer + capacity as jlong primitives; Java writes directly into native memory and returns bytes-written as int Java-side changes: - S3MetaRequestOptions: add withUseFFM(boolean) / getUseFFM() - S3Client: pass useFFM to native s3ClientMakeMetaRequest() - S3MetaRequestResponseHandler: add onResponseBody(MemorySegment,...) overload - S3MetaRequestResponseHandlerNativeAdapter: add onResponseBodyFFM(long,long,long) - HttpRequestBodyStream: add sendRequestBody(long address, long length) -> int Native-side changes: - java_class_ids.h/c: register send_outgoing_body_ffm (JJ)I and onResponseBodyFFM (JJJ)I method IDs - s3_client.c: add use_ffm to callback struct; branch in body callback - http_request_utils.h/c: add use_ffm to stream impl; branch in read fn; propagate use_ffm through aws_apply_java_http_request_changes_to_native_request
1 parent 70b555d commit a9393cf

10 files changed

Lines changed: 280 additions & 47 deletions

src/main/java/software/amazon/awssdk/crt/http/HttpRequestBodyStream.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
package software.amazon.awssdk.crt.http;
77

8+
import java.lang.foreign.Arena;
9+
import java.lang.foreign.MemorySegment;
810
import java.nio.ByteBuffer;
911

1012
/**
@@ -29,6 +31,44 @@ default boolean sendRequestBody(ByteBuffer bodyBytesOut) {
2931
return true;
3032
}
3133

34+
/**
35+
* FFM variant of {@link #sendRequestBody(ByteBuffer)}.
36+
* <p>
37+
* Called from native when the meta request was created with
38+
* {@link software.amazon.awssdk.crt.s3.S3MetaRequestOptions#withUseFFM(boolean)
39+
* useFFM=true}. The native layer passes the destination buffer as a raw
40+
* pointer ({@code address}) and its capacity ({@code length}) as {@code long}
41+
* primitives — no {@code DirectByteBuffer} wrapper object is allocated.
42+
* <p>
43+
* Implementations should write up to {@code length} bytes of request body
44+
* data into the native buffer starting at {@code address} and return the
45+
* number of bytes actually written. Returning {@code 0} signals that the
46+
* body is complete (end-of-stream).
47+
* <p>
48+
* The default implementation bridges to the {@link #sendRequestBody(ByteBuffer)}
49+
* overload via a {@link MemorySegment} → {@link ByteBuffer} view, so existing
50+
* implementations that only override the {@code ByteBuffer} version continue
51+
* to work correctly (at the cost of the {@code ByteBuffer} wrapper allocation
52+
* that FFM mode is trying to avoid).
53+
*
54+
* @param address Raw native pointer to the start of the destination buffer.
55+
* @param length Capacity of the destination buffer in bytes.
56+
* @return Number of bytes written into the buffer, or {@code 0} when the
57+
* body is fully consumed (end-of-stream).
58+
*/
59+
default int sendRequestBody(long address, long length) {
60+
// Default: wrap the native buffer as a ByteBuffer and delegate to the
61+
// existing overload so that implementations that only override the
62+
// ByteBuffer version still work.
63+
MemorySegment seg = MemorySegment.ofAddress(address)
64+
.reinterpret(length, Arena.ofAuto(), null);
65+
ByteBuffer buf = seg.asByteBuffer();
66+
boolean done = sendRequestBody(buf);
67+
// Return how many bytes were written (ByteBuffer.position() tracks this).
68+
// If done==true and nothing was written, return 0 to signal end-of-stream.
69+
return buf.position();
70+
}
71+
3272
/**
3373
* Called from native when the processing needs the stream to rewind itself back to its beginning.
3474
* If the stream does not support rewinding or the rewind fails, false should be returned

src/main/java/software/amazon/awssdk/crt/s3/S3Client.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,8 @@ public S3MetaRequest makeMetaRequest(S3MetaRequestOptions options) {
226226
fioOptionsSet,
227227
shouldStream,
228228
diskThroughputGbps,
229-
directIo);
229+
directIo,
230+
options.getUseFFM());
230231

231232
metaRequest.setMetaRequestNativeHandle(metaRequestNativeHandle);
232233

@@ -305,5 +306,6 @@ private static native long s3ClientMakeMetaRequest(long clientId, S3MetaRequest
305306
boolean fioOptionsSet,
306307
boolean shouldStream,
307308
double diskThroughputGbps,
308-
boolean directIo);
309+
boolean directIo,
310+
boolean useFFM);
309311
}

src/main/java/software/amazon/awssdk/crt/s3/S3MetaRequestOptions.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,13 @@ private static Map<Integer, MetaRequestType> buildEnumMapping() {
9696
private ResumeToken resumeToken;
9797
private Long objectSizeHint;
9898
private FileIoOptions fileIoOptions;
99+
/**
100+
* When true, the native layer uses FFM (Foreign Function & Memory) style
101+
* callbacks that pass raw native pointers as {@code long} values instead of
102+
* allocating JNI wrapper objects (byte[] for downloads, DirectByteBuffer for
103+
* uploads). Requires Java 22+ and {@code --enable-native-access=ALL-UNNAMED}.
104+
*/
105+
private boolean useFFM = false;
99106

100107
public S3MetaRequestOptions withMetaRequestType(MetaRequestType metaRequestType) {
101108
this.metaRequestType = metaRequestType;
@@ -472,4 +479,39 @@ public S3MetaRequestOptions withFileIoOptions(FileIoOptions fileIoOptions) {
472479
public FileIoOptions getFileIoOptions() {
473480
return fileIoOptions;
474481
}
482+
483+
/**
484+
* Enable FFM (Foreign Function &amp; Memory) mode for this meta request.
485+
* <p>
486+
* When {@code true}, the native layer passes raw native memory pointers as
487+
* {@code long} values to Java callbacks instead of allocating JNI wrapper
488+
* objects:
489+
* <ul>
490+
* <li><b>Downloads:</b> {@code onResponseBody} receives a
491+
* {@code MemorySegment} view of native memory (zero-copy) instead of
492+
* a heap-allocated {@code byte[]}.</li>
493+
* <li><b>Uploads:</b> {@code sendRequestBody} receives a raw pointer and
494+
* length as {@code long} values instead of a {@code DirectByteBuffer}
495+
* wrapper object.</li>
496+
* </ul>
497+
* Requires Java 22+ and the JVM flag
498+
* {@code --enable-native-access=ALL-UNNAMED}.
499+
*
500+
* @param useFFM {@code true} to use FFM callbacks, {@code false} (default)
501+
* to use the standard JNI callbacks.
502+
* @return this
503+
*/
504+
public S3MetaRequestOptions withUseFFM(boolean useFFM) {
505+
this.useFFM = useFFM;
506+
return this;
507+
}
508+
509+
/**
510+
* Returns whether FFM (Foreign Function &amp; Memory) mode is enabled.
511+
*
512+
* @return {@code true} if FFM mode is enabled, {@code false} otherwise.
513+
*/
514+
public boolean getUseFFM() {
515+
return useFFM;
516+
}
475517
}

src/main/java/software/amazon/awssdk/crt/s3/S3MetaRequestResponseHandler.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
*/
55
package software.amazon.awssdk.crt.s3;
66

7+
import java.lang.foreign.MemorySegment;
78
import java.nio.ByteBuffer;
89
import software.amazon.awssdk.crt.http.HttpHeader;
910

@@ -50,6 +51,38 @@ default int onResponseBody(ByteBuffer bodyBytesIn, long objectRangeStart, long o
5051
return 0;
5152
}
5253

54+
/**
55+
* FFM variant of {@link #onResponseBody(ByteBuffer, long, long)}.
56+
* <p>
57+
* Invoked instead of the {@code ByteBuffer} overload when the meta request was
58+
* created with {@link S3MetaRequestOptions#withUseFFM(boolean) useFFM=true}.
59+
* The body data is delivered as a {@link MemorySegment} that is a zero-copy
60+
* view directly into native (off-heap) memory — no {@code byte[]} is allocated
61+
* and no data is copied.
62+
* <p>
63+
* <b>Do NOT</b> retain a reference to {@code bodyBytesIn} beyond the lifetime
64+
* of this method call. The underlying native memory is only guaranteed to be
65+
* valid for the duration of the callback.
66+
* <p>
67+
* The default implementation falls back to the {@code ByteBuffer} overload by
68+
* copying the data, so existing implementations that only override the
69+
* {@code ByteBuffer} version continue to work correctly.
70+
*
71+
* @param bodyBytesIn A zero-copy view of the native response body chunk.
72+
* @param objectRangeStart Byte offset of the first byte in this chunk within
73+
* the full S3 object.
74+
* @param objectRangeEnd Past-the-end byte offset (i.e.
75+
* {@code objectRangeStart + chunk length}).
76+
* @return The number of bytes to increment the flow-control window by.
77+
* Ignored when backpressure is disabled.
78+
*/
79+
default int onResponseBody(MemorySegment bodyBytesIn, long objectRangeStart, long objectRangeEnd) {
80+
// Default: copy into a heap ByteBuffer and delegate to the ByteBuffer overload
81+
// so that implementations that only override the ByteBuffer version still work.
82+
ByteBuffer buf = ByteBuffer.wrap(bodyBytesIn.toArray(java.lang.foreign.ValueLayout.JAVA_BYTE));
83+
return onResponseBody(buf, objectRangeStart, objectRangeEnd);
84+
}
85+
5386
/**
5487
* Invoked when the entire meta request execution is complete.
5588
* @param context a wrapper object containing the following fields

src/main/java/software/amazon/awssdk/crt/s3/S3MetaRequestResponseHandlerNativeAdapter.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,54 @@
44
*/
55
package software.amazon.awssdk.crt.s3;
66

7-
import software.amazon.awssdk.crt.http.HttpHeader;
8-
7+
import java.lang.foreign.Arena;
8+
import java.lang.foreign.MemorySegment;
9+
import java.lang.foreign.ValueLayout;
910
import java.nio.ByteBuffer;
1011

12+
import software.amazon.awssdk.crt.http.HttpHeader;
13+
1114
class S3MetaRequestResponseHandlerNativeAdapter {
1215
private S3MetaRequestResponseHandler responseHandler;
1316

1417
S3MetaRequestResponseHandlerNativeAdapter(S3MetaRequestResponseHandler responseHandler) {
1518
this.responseHandler = responseHandler;
1619
}
1720

21+
/**
22+
* Standard JNI path: native code allocates a {@code byte[]} containing a copy
23+
* of the response body chunk and passes it here. Used when {@code useFFM=false}.
24+
*/
1825
int onResponseBody(byte[] bodyBytesIn, long objectRangeStart, long objectRangeEnd) {
1926
return this.responseHandler.onResponseBody(ByteBuffer.wrap(bodyBytesIn), objectRangeStart, objectRangeEnd);
2027
}
2128

29+
/**
30+
* FFM path: native code passes the raw native pointer ({@code address}) and
31+
* the chunk length ({@code length}) as {@code long} primitives — no heap
32+
* allocation, no copy. Used when {@code useFFM=true}.
33+
* <p>
34+
* We wrap the native memory as a {@link MemorySegment} scoped to a
35+
* {@link Arena#ofAuto() auto arena} so that the segment is valid for the
36+
* duration of this call but cannot be retained beyond it (the native buffer
37+
* is only guaranteed live for the callback's lifetime).
38+
*
39+
* @param address Raw native pointer to the start of the body chunk.
40+
* @param length Number of bytes in the chunk.
41+
* @param objectRangeStart Byte offset of the first byte within the S3 object.
42+
* @return The number of bytes to increment the flow-control window by.
43+
*/
44+
int onResponseBodyFFM(long address, long length, long objectRangeStart) {
45+
// Wrap the native pointer as a MemorySegment — zero copy.
46+
// reinterpret() is required to give the segment a known size; the
47+
// Arena.ofAuto() cleanup action is null because the native side owns
48+
// the memory and will free it after the callback returns.
49+
MemorySegment segment = MemorySegment.ofAddress(address)
50+
.reinterpret(length, Arena.ofAuto(), null);
51+
long objectRangeEnd = objectRangeStart + length;
52+
return this.responseHandler.onResponseBody(segment, objectRangeStart, objectRangeEnd);
53+
}
54+
2255
void onFinished(int errorCode, int responseStatus, byte[] errorPayload, String errorOperationName, int checksumAlgorithm, boolean didValidateChecksum, Throwable cause, final ByteBuffer headersBlob) {
2356
HttpHeader[] errorHeaders = headersBlob == null ? null : HttpHeader.loadHeadersFromMarshalledHeadersBlob(headersBlob);
2457
S3FinishedResponseContext context = new S3FinishedResponseContext(errorCode, responseStatus, errorPayload, errorOperationName, ChecksumAlgorithm.getEnumValueFromInteger(checksumAlgorithm), didValidateChecksum, cause, errorHeaders);

src/native/http_request_utils.c

Lines changed: 54 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ struct aws_http_request_body_stream_impl {
2424
jobject http_request_body_stream;
2525
bool body_done;
2626
bool is_valid;
27+
/* When true, call sendRequestBody(long address, long length) -> int
28+
* instead of sendRequestBody(ByteBuffer) -> boolean. */
29+
bool use_ffm;
2730
};
2831

2932
static int s_aws_input_stream_seek(struct aws_input_stream *stream, int64_t offset, enum aws_stream_seek_basis basis) {
@@ -94,21 +97,49 @@ static int s_aws_input_stream_read(struct aws_input_stream *stream, struct aws_b
9497
}
9598

9699
size_t out_remaining = dest->capacity - dest->len;
100+
int result = AWS_OP_SUCCESS;
97101

98-
jobject direct_buffer = aws_jni_direct_byte_buffer_from_raw_ptr(env, dest->buffer + dest->len, out_remaining);
102+
if (impl->use_ffm) {
103+
/*
104+
* FFM path: pass the raw native pointer and capacity as jlong primitives.
105+
* Java writes directly into the native buffer and returns the number of
106+
* bytes written as an int. Returning 0 signals end-of-stream.
107+
* No DirectByteBuffer wrapper object is allocated.
108+
*/
109+
jlong ptr = (jlong)(uintptr_t)(dest->buffer + dest->len);
110+
jlong len = (jlong)out_remaining;
99111

100-
impl->body_done = (*env)->CallBooleanMethod(
101-
env, impl->http_request_body_stream, http_request_body_stream_properties.send_outgoing_body, direct_buffer);
112+
jint bytes_written = (*env)->CallIntMethod(
113+
env, impl->http_request_body_stream, http_request_body_stream_properties.send_outgoing_body_ffm, ptr, len);
102114

103-
int result = AWS_OP_SUCCESS;
104-
if (aws_jni_check_and_clear_exception(env)) {
105-
result = aws_raise_error(AWS_ERROR_HTTP_CALLBACK_FAILURE);
115+
if (aws_jni_check_and_clear_exception(env)) {
116+
result = aws_raise_error(AWS_ERROR_HTTP_CALLBACK_FAILURE);
117+
} else {
118+
dest->len += (size_t)bytes_written;
119+
/* 0 bytes written signals end-of-stream */
120+
if (bytes_written == 0) {
121+
impl->body_done = true;
122+
}
123+
}
106124
} else {
107-
size_t amt_written = aws_jni_byte_buffer_get_position(env, direct_buffer);
108-
dest->len += amt_written;
109-
}
125+
/*
126+
* JNI path (unchanged): wrap the native buffer as a DirectByteBuffer,
127+
* call sendRequestBody(ByteBuffer), then read back position().
128+
*/
129+
jobject direct_buffer = aws_jni_direct_byte_buffer_from_raw_ptr(env, dest->buffer + dest->len, out_remaining);
110130

111-
(*env)->DeleteLocalRef(env, direct_buffer);
131+
impl->body_done = (*env)->CallBooleanMethod(
132+
env, impl->http_request_body_stream, http_request_body_stream_properties.send_outgoing_body, direct_buffer);
133+
134+
if (aws_jni_check_and_clear_exception(env)) {
135+
result = aws_raise_error(AWS_ERROR_HTTP_CALLBACK_FAILURE);
136+
} else {
137+
size_t amt_written = aws_jni_byte_buffer_get_position(env, direct_buffer);
138+
dest->len += amt_written;
139+
}
140+
141+
(*env)->DeleteLocalRef(env, direct_buffer);
142+
}
112143

113144
aws_jni_release_thread_env(impl->jvm, &jvm_env_context);
114145
/********** JNI ENV RELEASE **********/
@@ -188,7 +219,8 @@ static struct aws_input_stream_vtable s_aws_input_stream_vtable = {
188219
struct aws_input_stream *aws_input_stream_new_from_java_http_request_body_stream(
189220
struct aws_allocator *allocator,
190221
JNIEnv *env,
191-
jobject http_request_body_stream) {
222+
jobject http_request_body_stream,
223+
bool use_ffm) {
192224
struct aws_http_request_body_stream_impl *impl =
193225
aws_mem_calloc(allocator, 1, sizeof(struct aws_http_request_body_stream_impl));
194226

@@ -200,6 +232,8 @@ struct aws_input_stream *aws_input_stream_new_from_java_http_request_body_stream
200232
AWS_FATAL_ASSERT(jvmresult == 0);
201233

202234
impl->is_valid = true;
235+
impl->use_ffm = use_ffm;
236+
203237
if (http_request_body_stream != NULL) {
204238
impl->http_request_body_stream = (*env)->NewGlobalRef(env, http_request_body_stream);
205239
if (impl->http_request_body_stream == NULL) {
@@ -370,7 +404,8 @@ int aws_apply_java_http_request_changes_to_native_request(
370404
JNIEnv *env,
371405
jbyteArray marshalled_request,
372406
jobject jni_body_stream,
373-
struct aws_http_message *message) {
407+
struct aws_http_message *message,
408+
bool use_ffm) {
374409

375410
/* come back to this when we decide we need to. */
376411
(void)jni_body_stream;
@@ -400,8 +435,10 @@ int aws_apply_java_http_request_changes_to_native_request(
400435
}
401436

402437
if (jni_body_stream) {
403-
struct aws_input_stream *body_stream =
404-
aws_input_stream_new_from_java_http_request_body_stream(aws_jni_get_allocator(), env, jni_body_stream);
438+
/* use_ffm=false here: this path is used by non-S3 HTTP requests that
439+
* don't go through the FFM meta-request path. */
440+
struct aws_input_stream *body_stream = aws_input_stream_new_from_java_http_request_body_stream(
441+
aws_jni_get_allocator(), env, jni_body_stream, false);
405442

406443
aws_http_message_set_body_stream(message, body_stream);
407444
/* request controls the lifetime of body stream fully */
@@ -440,8 +477,9 @@ struct aws_http_message *aws_http_request_new_from_java_http_request(
440477
}
441478

442479
if (jni_body_stream != NULL) {
443-
struct aws_input_stream *body_stream =
444-
aws_input_stream_new_from_java_http_request_body_stream(aws_jni_get_allocator(), env, jni_body_stream);
480+
/* use_ffm=false: this function is used by non-S3 HTTP requests */
481+
struct aws_input_stream *body_stream = aws_input_stream_new_from_java_http_request_body_stream(
482+
aws_jni_get_allocator(), env, jni_body_stream, false);
445483
if (body_stream == NULL) {
446484
exception_message = "aws_fill_out_request: Error building body stream";
447485
goto on_error;

src/native/http_request_utils.h

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,18 @@ struct aws_http_headers;
1616
struct aws_http_message;
1717
struct aws_input_stream;
1818

19+
/**
20+
* Create a native aws_input_stream that reads from a Java HttpRequestBodyStream.
21+
*
22+
* @param use_ffm When true, the stream will call the FFM-style
23+
* sendRequestBody(long, long) -> int method instead of the
24+
* JNI-style sendRequestBody(ByteBuffer) -> boolean method.
25+
*/
1926
struct aws_input_stream *aws_input_stream_new_from_java_http_request_body_stream(
2027
struct aws_allocator *allocator,
2128
JNIEnv *env,
22-
jobject http_request_body_stream);
29+
jobject http_request_body_stream,
30+
bool use_ffm);
2331

2432
struct aws_http_message *aws_http_request_new_from_java_http_request(
2533
JNIEnv *env,
@@ -40,7 +48,8 @@ int aws_apply_java_http_request_changes_to_native_request(
4048
JNIEnv *env,
4149
jbyteArray marshalled_request,
4250
jobject jni_body_stream,
43-
struct aws_http_message *message);
51+
struct aws_http_message *message,
52+
bool use_ffm);
4453

4554
/* if this fails a java exception has been set. */
4655
jobject aws_java_http_request_from_native(JNIEnv *env, struct aws_http_message *message, jobject request_body_stream);

0 commit comments

Comments
 (0)