Skip to content

Commit dad07a5

Browse files
committed
Add discard() and optimal writeTo
Discard is used for ensuring a datastream is drained if needed, but a no-op when no drain is needed. Added an optimal writeTo for MultiByteBufferDataStream.
1 parent af66736 commit dad07a5

9 files changed

Lines changed: 319 additions & 6 deletions

File tree

io/src/main/java/software/amazon/smithy/java/io/datastream/DataStream.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,20 @@ default void writeTo(OutputStream out) throws IOException {
101101
}
102102
}
103103

104+
/**
105+
* Drain and release the underlying source without producing the bytes.
106+
*
107+
* <p>For in-memory backings this is a no-op. The data already arrived and lives in heap memory, so there's
108+
* nothing to release. For sources backed by an {@link InputStream} or reactive
109+
* {@link java.util.concurrent.Flow.Publisher}, this method consumes (or cancels and closes) the source so any
110+
* underlying transport (socket, file descriptor, subscriber registration) can be released.
111+
*
112+
* @throws IOException if releasing the underlying source fails.
113+
*/
114+
default void discard() throws IOException {
115+
// no-op for in-memory backings
116+
}
117+
104118
/**
105119
* Read the contents of the stream into a ByteBuffer by reading all bytes from {@link #asInputStream()}.
106120
*

io/src/main/java/software/amazon/smithy/java/io/datastream/InputStreamDataStream.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,21 @@ public void writeTo(OutputStream out) throws IOException {
3838
asInputStream().transferTo(out);
3939
}
4040

41+
@Override
42+
public void discard() throws IOException {
43+
if (consumed || closed) {
44+
return;
45+
}
46+
47+
consumed = true;
48+
closed = true;
49+
try (var is = inputStream) {
50+
// skipNBytes is allowed to refuse and return early on some streams; transferTo to a
51+
// null sink is the contract-correct way to drain to EOF without holding the bytes.
52+
is.transferTo(OutputStream.nullOutputStream());
53+
}
54+
}
55+
4156
@Override
4257
public boolean isReplayable() {
4358
return false;

io/src/main/java/software/amazon/smithy/java/io/datastream/MultiByteBufferDataStream.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
*/
3636
final class MultiByteBufferDataStream implements DataStream {
3737

38+
private static final int COPY_BUF_SIZE = 16384;
39+
3840
private final List<ByteBuffer> buffers;
3941
private final long contentLength;
4042
private final String contentType;
@@ -124,14 +126,20 @@ public InputStream nextElement() {
124126

125127
@Override
126128
public void writeTo(OutputStream out) throws IOException {
129+
byte[] scratch = null;
127130
for (var buf : buffers) {
128-
var dup = buf.duplicate();
129-
if (dup.hasArray()) {
130-
out.write(dup.array(), dup.arrayOffset() + dup.position(), dup.remaining());
131+
if (buf.hasArray()) {
132+
out.write(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining());
131133
} else {
132-
var tmp = new byte[dup.remaining()];
133-
dup.get(tmp);
134-
out.write(tmp);
134+
if (scratch == null) {
135+
scratch = new byte[COPY_BUF_SIZE];
136+
}
137+
var dup = buf.duplicate();
138+
while (dup.hasRemaining()) {
139+
int n = Math.min(scratch.length, dup.remaining());
140+
dup.get(scratch, 0, n);
141+
out.write(scratch, 0, n);
142+
}
135143
}
136144
}
137145
}

io/src/main/java/software/amazon/smithy/java/io/datastream/PublisherDataStream.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,54 @@ public void onComplete() {
143143
}
144144
}
145145

146+
@Override
147+
public void discard() throws IOException {
148+
if (consumed) {
149+
return;
150+
}
151+
152+
consumed = true;
153+
var error = new AtomicReference<Throwable>();
154+
var done = new CountDownLatch(1);
155+
156+
publisher.subscribe(new Flow.Subscriber<>() {
157+
@Override
158+
public void onSubscribe(Flow.Subscription s) {
159+
s.request(Long.MAX_VALUE);
160+
}
161+
162+
@Override
163+
public void onNext(ByteBuffer buf) {
164+
// drop on the floor
165+
}
166+
167+
@Override
168+
public void onError(Throwable t) {
169+
error.set(t);
170+
done.countDown();
171+
}
172+
173+
@Override
174+
public void onComplete() {
175+
done.countDown();
176+
}
177+
});
178+
179+
try {
180+
done.await();
181+
} catch (InterruptedException e) {
182+
Thread.currentThread().interrupt();
183+
throw new IOException("Interrupted while discarding publisher data", e);
184+
}
185+
186+
Throwable t = error.get();
187+
if (t instanceof IOException ioe) {
188+
throw ioe;
189+
} else if (t != null) {
190+
throw new IOException("Publisher error", t);
191+
}
192+
}
193+
146194
private void innerSubscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
147195
consumed = true;
148196
publisher.subscribe(subscriber);

io/src/main/java/software/amazon/smithy/java/io/datastream/WrappedDataStream.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ public void writeTo(OutputStream out) throws IOException {
4545
delegate.writeTo(out);
4646
}
4747

48+
@Override
49+
public void discard() throws IOException {
50+
delegate.discard();
51+
}
52+
4853
@Override
4954
public long contentLength() {
5055
return contentLength;

io/src/test/java/software/amazon/smithy/java/io/datastream/InputStreamDataStreamTest.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,4 +111,42 @@ public void writeToNotReplayable() throws IOException {
111111

112112
assertThrows(IllegalStateException.class, () -> ds.writeTo(new ByteArrayOutputStream()));
113113
}
114+
115+
@Test
116+
public void discardDrainsAndClosesUnderlyingStream() throws IOException {
117+
// Track that the underlying stream got closed and drained without copying out the bytes.
118+
var data = new byte[] {1, 2, 3, 4, 5};
119+
var closed = new boolean[] {false};
120+
var src = new ByteArrayInputStream(data) {
121+
@Override
122+
public void close() throws IOException {
123+
closed[0] = true;
124+
super.close();
125+
}
126+
};
127+
var ds = DataStream.ofInputStream(src);
128+
129+
ds.discard();
130+
131+
assertThat(closed[0], is(true));
132+
assertThat(ds.isAvailable(), is(false));
133+
// Drained to EOF.
134+
assertThat(src.available(), equalTo(0));
135+
}
136+
137+
@Test
138+
public void discardIsIdempotent() throws IOException {
139+
var ds = DataStream.ofInputStream(new ByteArrayInputStream(new byte[] {1, 2, 3}));
140+
141+
ds.discard();
142+
ds.discard(); // Second discard must not throw
143+
}
144+
145+
@Test
146+
public void discardAfterConsumptionIsNoOp() throws IOException {
147+
var ds = DataStream.ofInputStream(new ByteArrayInputStream(new byte[] {1, 2, 3}));
148+
ds.asInputStream();
149+
150+
ds.discard(); // Don't reopen the stream just to discard it
151+
}
114152
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package software.amazon.smithy.java.io.datastream;
7+
8+
import static org.hamcrest.MatcherAssert.assertThat;
9+
import static org.hamcrest.Matchers.equalTo;
10+
import static org.hamcrest.Matchers.is;
11+
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
12+
13+
import java.io.ByteArrayOutputStream;
14+
import java.io.IOException;
15+
import java.nio.ByteBuffer;
16+
import java.nio.charset.StandardCharsets;
17+
import java.util.List;
18+
import org.junit.jupiter.api.Test;
19+
20+
public class MultiByteBufferDataStreamTest {
21+
22+
@Test
23+
public void writeToHeapBuffers() throws IOException {
24+
// hasArray() == true for heap buffers, exercising the array-fast path.
25+
var b1 = ByteBuffer.wrap("hello ".getBytes(StandardCharsets.UTF_8));
26+
var b2 = ByteBuffer.wrap("world".getBytes(StandardCharsets.UTF_8));
27+
var ds = DataStream.ofByteBuffers(List.of(b1, b2), 11, "text/plain");
28+
var out = new ByteArrayOutputStream();
29+
30+
ds.writeTo(out);
31+
32+
assertArrayEquals("hello world".getBytes(StandardCharsets.UTF_8), out.toByteArray());
33+
}
34+
35+
@Test
36+
public void writeToReadOnlyBuffersUsesScratch() throws IOException {
37+
// asReadOnlyBuffer().hasArray() returns false (HeapByteBufferR) — exercises the scratch
38+
// copy path. This is the case that motivated the optimization: the JDK HttpClient produces
39+
// read-only heap buffers via ZeroCopyBodySubscriber, and writeTo previously allocated a new
40+
// byte[] per chunk. Verify the bytes still come out correctly.
41+
var b1 = ByteBuffer.wrap("read-only ".getBytes(StandardCharsets.UTF_8)).asReadOnlyBuffer();
42+
var b2 = ByteBuffer.wrap("path".getBytes(StandardCharsets.UTF_8)).asReadOnlyBuffer();
43+
assertThat(b1.hasArray(), is(false));
44+
var ds = DataStream.ofByteBuffers(List.of(b1, b2), 14, "text/plain");
45+
var out = new ByteArrayOutputStream();
46+
47+
ds.writeTo(out);
48+
49+
assertArrayEquals("read-only path".getBytes(StandardCharsets.UTF_8), out.toByteArray());
50+
}
51+
52+
@Test
53+
public void writeToReadOnlyBuffersDoesNotMutatePosition() throws IOException {
54+
// Defends the contract: replayable means writeTo is safe to call again. The previous
55+
// implementation called dup.get(tmp) which advanced the duplicate's position, but the
56+
// current implementation must use a duplicate so the original buffer's position survives.
57+
var b1 = ByteBuffer.wrap("a".getBytes(StandardCharsets.UTF_8)).asReadOnlyBuffer();
58+
var b2 = ByteBuffer.wrap("b".getBytes(StandardCharsets.UTF_8)).asReadOnlyBuffer();
59+
var ds = DataStream.ofByteBuffers(List.of(b1, b2), 2, "text/plain");
60+
61+
var out1 = new ByteArrayOutputStream();
62+
ds.writeTo(out1);
63+
var out2 = new ByteArrayOutputStream();
64+
ds.writeTo(out2);
65+
66+
assertArrayEquals("ab".getBytes(StandardCharsets.UTF_8), out1.toByteArray());
67+
assertArrayEquals("ab".getBytes(StandardCharsets.UTF_8), out2.toByteArray());
68+
}
69+
70+
@Test
71+
public void writeToBufferLargerThanScratch() throws IOException {
72+
// Drive the inner copy loop past one scratch fill (scratch is 16 KiB internally).
73+
byte[] payload = new byte[40_000];
74+
for (int i = 0; i < payload.length; i++) {
75+
payload[i] = (byte) (i & 0xff);
76+
}
77+
var b1 = ByteBuffer.wrap(payload, 0, 25_000).asReadOnlyBuffer();
78+
var b2 = ByteBuffer.wrap(payload, 25_000, 15_000).asReadOnlyBuffer();
79+
var ds = DataStream.ofByteBuffers(List.of(b1, b2), payload.length, "application/octet-stream");
80+
var out = new ByteArrayOutputStream();
81+
82+
ds.writeTo(out);
83+
84+
assertArrayEquals(payload, out.toByteArray());
85+
}
86+
87+
@Test
88+
public void writeToMixedBuffers() throws IOException {
89+
// First buffer hits the heap-array fast path, second buffer hits the scratch copy path.
90+
// Validates the scratch is allocated lazily on the first non-array buffer.
91+
var b1 = ByteBuffer.wrap("heap ".getBytes(StandardCharsets.UTF_8));
92+
var b2 = ByteBuffer.wrap("readonly".getBytes(StandardCharsets.UTF_8)).asReadOnlyBuffer();
93+
var ds = DataStream.ofByteBuffers(List.of(b1, b2), 13, "text/plain");
94+
var out = new ByteArrayOutputStream();
95+
96+
ds.writeTo(out);
97+
98+
assertArrayEquals("heap readonly".getBytes(StandardCharsets.UTF_8), out.toByteArray());
99+
}
100+
101+
@Test
102+
public void asByteBufferStitchesAcrossChunks() {
103+
var b1 = ByteBuffer.wrap("aa".getBytes(StandardCharsets.UTF_8)).asReadOnlyBuffer();
104+
var b2 = ByteBuffer.wrap("bb".getBytes(StandardCharsets.UTF_8)).asReadOnlyBuffer();
105+
var ds = DataStream.ofByteBuffers(List.of(b1, b2), 4, "text/plain");
106+
107+
assertThat(ds.asByteBuffer(), equalTo(ByteBuffer.wrap("aabb".getBytes(StandardCharsets.UTF_8))));
108+
}
109+
}

io/src/test/java/software/amazon/smithy/java/io/datastream/PublisherDataStreamTest.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,4 +78,59 @@ void writeToEmpty() throws IOException {
7878

7979
assertEquals(0, out.size());
8080
}
81+
82+
@Test
83+
void discardDrainsPublisherToCompletion() throws IOException {
84+
var publisher = new SubmissionPublisher<ByteBuffer>();
85+
var ds = DataStream.ofPublisher(publisher, null, -1);
86+
87+
var discardThread = Thread.startVirtualThread(() -> {
88+
try {
89+
ds.discard();
90+
} catch (IOException e) {
91+
throw new RuntimeException(e);
92+
}
93+
});
94+
95+
// Wait for discard's subscriber to register before submitting items.
96+
while (publisher.getNumberOfSubscribers() < 1) {
97+
Thread.onSpinWait();
98+
}
99+
publisher.submit(ByteBuffer.wrap("dropped".getBytes(StandardCharsets.UTF_8)));
100+
publisher.close();
101+
102+
try {
103+
discardThread.join();
104+
} catch (InterruptedException e) {
105+
throw new RuntimeException(e);
106+
}
107+
108+
// After discard, the stream is consumed and no longer available for replay.
109+
assertEquals(false, ds.isAvailable());
110+
}
111+
112+
@Test
113+
void discardIsIdempotent() throws IOException {
114+
var publisher = new SubmissionPublisher<ByteBuffer>();
115+
var ds = DataStream.ofPublisher(publisher, null, -1);
116+
117+
Thread.startVirtualThread(publisher::close);
118+
ds.discard();
119+
120+
// Second call exits immediately without re-subscribing.
121+
ds.discard();
122+
}
123+
124+
@Test
125+
void discardPropagatesPublisherError() {
126+
var publisher = new SubmissionPublisher<ByteBuffer>();
127+
var ds = DataStream.ofPublisher(publisher, null, -1);
128+
129+
var ex = new RuntimeException("boom");
130+
Thread.startVirtualThread(() -> publisher.closeExceptionally(ex));
131+
132+
var thrown = assertThrows(IOException.class, ds::discard);
133+
assertEquals("Publisher error", thrown.getMessage());
134+
assertEquals(ex, thrown.getCause());
135+
}
81136
}

io/src/test/java/software/amazon/smithy/java/io/datastream/WrappedDataStreamTest.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,25 @@ public void writeToDelegates() throws IOException {
4848

4949
assertArrayEquals(data, out.toByteArray());
5050
}
51+
52+
@Test
53+
public void discardDelegates() throws IOException {
54+
// The wrapped stream's discard must reach through to the underlying source so InputStream
55+
// / Publisher resources actually get released. Verify via observable side effect on
56+
// InputStreamDataStream (drain-to-EOF + close).
57+
var closed = new boolean[] {false};
58+
var src = new ByteArrayInputStream("payload".getBytes(StandardCharsets.UTF_8)) {
59+
@Override
60+
public void close() throws IOException {
61+
closed[0] = true;
62+
super.close();
63+
}
64+
};
65+
var inner = DataStream.ofInputStream(src);
66+
var ds = DataStream.withMetadata(inner, "text/plain", 7L, false);
67+
68+
ds.discard();
69+
70+
assertThat(closed[0], is(true));
71+
}
5172
}

0 commit comments

Comments
 (0)