Skip to content

Commit 80534f9

Browse files
committed
Polishing contribution
This commit adds further fixes in the same area, since there were similar bugs in the WriteCompletionHandler: * databuffers were not always emitted when fully read in the onNext hook * on completion, the iterator was closed too early, before it was fully read * on completion, writing the next bytebuffers from the iterator would always reuse the first one and not update the attachment Closes gh-36714
1 parent d8bc54d commit 80534f9

2 files changed

Lines changed: 29 additions & 2 deletions

File tree

spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1215,6 +1215,11 @@ protected void hookOnNext(DataBuffer dataBuffer) {
12151215
failed(ex, attachment);
12161216
}
12171217
}
1218+
else {
1219+
iterator.close();
1220+
this.sink.next(dataBuffer);
1221+
request(1);
1222+
}
12181223
}
12191224

12201225
@Override
@@ -1238,7 +1243,6 @@ protected void hookOnComplete() {
12381243
@Override
12391244
public void completed(Integer written, Attachment attachment) {
12401245
DataBuffer.ByteBufferIterator iterator = attachment.iterator();
1241-
iterator.close();
12421246

12431247
long pos = this.position.addAndGet(written);
12441248
ByteBuffer byteBuffer = attachment.byteBuffer();
@@ -1248,9 +1252,11 @@ public void completed(Integer written, Attachment attachment) {
12481252
}
12491253
else if (iterator.hasNext()) {
12501254
ByteBuffer next = iterator.next();
1251-
this.channel.write(next, pos, attachment, this);
1255+
Attachment nextAttachment = new Attachment(next, attachment.dataBuffer(), iterator);
1256+
this.channel.write(next, pos, nextAttachment, this);
12521257
}
12531258
else {
1259+
iterator.close();
12541260
this.sink.next(attachment.dataBuffer());
12551261
this.writing.set(false);
12561262

spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,27 @@ private void verifyWrittenData(Flux<DataBuffer> writeResult) throws IOException
466466
assertThat(result).isEqualTo("foobarbazqux");
467467
}
468468

469+
@ParameterizedDataBufferAllocatingTest
470+
void writeAsynchronousFileChannelWithJoinedBuffer(DataBufferFactory bufferFactory) throws Exception {
471+
super.bufferFactory = bufferFactory;
472+
473+
DataBuffer foo = stringBuffer("foo");
474+
DataBuffer bar = stringBuffer("bar");
475+
DataBuffer joined = bufferFactory.join(List.of(foo, bar));
476+
477+
AsynchronousFileChannel channel = AsynchronousFileChannel.open(tempFile, StandardOpenOption.WRITE);
478+
479+
Flux<DataBuffer> writeResult = DataBufferUtils.write(Flux.just(joined), channel);
480+
StepVerifier.create(writeResult)
481+
.consumeNextWith(stringConsumer("foobar"))
482+
.verifyComplete();
483+
484+
String result = String.join("", Files.readAllLines(tempFile));
485+
486+
assertThat(result).isEqualTo("foobar");
487+
channel.close();
488+
}
489+
469490
@ParameterizedDataBufferAllocatingTest
470491
void writeAsynchronousFileChannelErrorInFlux(DataBufferFactory bufferFactory) throws Exception {
471492
super.bufferFactory = bufferFactory;

0 commit comments

Comments
 (0)