Skip to content

Commit 0f5f8b1

Browse files
avoid double producer (#1122) (#1126)
Co-authored-by: Laura Trotta <153528055+l-trotta@users.noreply.github.com>
1 parent a452dfd commit 0f5f8b1

2 files changed

Lines changed: 22 additions & 4 deletions

File tree

java-client/src/main/java/co/elastic/clients/transport/rest5_client/MultiBufferEntity.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import co.elastic.clients.util.NoCopyByteArrayOutputStream;
2323
import org.apache.hc.core5.http.ContentType;
2424
import org.apache.hc.core5.http.io.entity.AbstractHttpEntity;
25-
import org.apache.hc.core5.http.nio.AsyncDataProducer;
25+
import org.apache.hc.core5.http.nio.AsyncEntityProducer;
2626
import org.apache.hc.core5.http.nio.DataStreamChannel;
2727

2828
import java.io.IOException;
@@ -32,20 +32,24 @@
3232
import java.nio.channels.Channels;
3333
import java.nio.channels.WritableByteChannel;
3434
import java.util.Iterator;
35+
import java.util.concurrent.atomic.AtomicReference;
3536

3637
/**
3738
* An HTTP entity based on a sequence of byte buffers.
3839
*/
39-
class MultiBufferEntity extends AbstractHttpEntity implements AsyncDataProducer {
40+
class MultiBufferEntity extends AbstractHttpEntity implements AsyncEntityProducer {
4041

4142
private final Iterable<ByteBuffer> buffers;
4243

4344
private Iterator<ByteBuffer> iterator;
4445
private volatile ByteBuffer currentBuffer;
4546

47+
private final AtomicReference<Exception> exceptionRef;
48+
4649
MultiBufferEntity(Iterable<ByteBuffer> buffers, ContentType contentType) {
4750
super(contentType,null,true);
4851
this.buffers = buffers;
52+
this.exceptionRef = new AtomicReference<>();
4953
init();
5054
}
5155

@@ -69,6 +73,13 @@ public boolean isRepeatable() {
6973
return true;
7074
}
7175

76+
@Override
77+
public void failed(Exception cause) {
78+
if (exceptionRef.compareAndSet(null, cause)) {
79+
releaseResources();
80+
}
81+
}
82+
7283
@Override
7384
public long getContentLength() {
7485
// Use chunked encoding

rest5-client/src/main/java/co/elastic/clients/transport/rest5_client/low_level/Rest5Client.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.hc.core5.http.HttpRequest;
4646
import org.apache.hc.core5.http.ProtocolException;
4747
import org.apache.hc.core5.http.message.RequestLine;
48+
import org.apache.hc.core5.http.nio.AsyncEntityProducer;
4849
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
4950
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
5051
import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder;
@@ -409,7 +410,7 @@ private void performRequestAsync(
409410
final RequestContext context;
410411
context = request.createContextForNextAttempt(nodes.next());
411412
Future<ClassicHttpResponse> futureRef = client.execute(context.requestProducer, context.asyncResponseConsumer, context.context,
412-
new FutureCallback<ClassicHttpResponse>() {
413+
new FutureCallback<>() {
413414
@Override
414415
public void completed(ClassicHttpResponse httpResponse) {
415416
try {
@@ -877,7 +878,13 @@ private static class RequestContext {
877878
.setHeaders(request.httpRequest.getHeaders());
878879

879880
if (request.httpRequest.getEntity() != null) {
880-
builder.setEntity(new BasicAsyncEntityProducer(request.httpRequest.getEntity()));
881+
HttpEntity entity = request.httpRequest.getEntity();
882+
if (entity instanceof AsyncEntityProducer) {
883+
builder.setEntity((AsyncEntityProducer) request.httpRequest.getEntity());
884+
}
885+
else {
886+
builder.setEntity(new BasicAsyncEntityProducer(entity));
887+
}
881888
}
882889

883890
this.requestProducer = builder.build();

0 commit comments

Comments
 (0)