Skip to content

Commit 9ca2eb8

Browse files
committed
Fix double-release of buffers.
Guard spsc buffer in ReactorNettyClient against multiple concurrent release calls to avoid double releasing. This has only became visible using Netty 4.2 AdaptivePoolingAllocator. [#717]
1 parent 5a0bd8e commit 9ca2eb8

3 files changed

Lines changed: 56 additions & 9 deletions

File tree

src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1032,9 +1032,19 @@ public void close(Supplier<? extends Throwable> supplier) {
10321032
receiver.onError(supplier.get());
10331033
}
10341034

1035-
while (!this.buffer.isEmpty()) {
1036-
ReferenceCountUtil.release(this.buffer.poll());
1037-
}
1035+
// Drain the buffer under the single-consumer guard. The buffer is an Spsc queue to avoid double-releasing it (IllegalReferenceCountException).
1036+
do {
1037+
if (this.drain.compareAndSet(false, true)) {
1038+
try {
1039+
BackendMessage message;
1040+
while ((message = this.buffer.poll()) != null) {
1041+
ReferenceCountUtil.release(message);
1042+
}
1043+
} finally {
1044+
this.drain.compareAndSet(true, false);
1045+
}
1046+
}
1047+
} while (!this.buffer.isEmpty());
10381048
}
10391049

10401050
}

src/main/java/io/r2dbc/postgresql/codec/BlobCodec.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import java.nio.ByteBuffer;
3131
import java.util.Collections;
32+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
3233

3334
import static io.r2dbc.postgresql.codec.PostgresqlObjectId.BYTEA;
3435
import static io.r2dbc.postgresql.message.Format.FORMAT_TEXT;
@@ -87,22 +88,22 @@ public Iterable<PostgresTypeIdentifier> getDataTypes() {
8788

8889
private static final class ByteABlob implements Blob {
8990

91+
private static final AtomicIntegerFieldUpdater<ByteABlob> RELEASED = AtomicIntegerFieldUpdater.newUpdater(ByteABlob.class, "released");
92+
9093
private final ByteBuf byteBuf;
9194

9295
private final Format format;
9396

97+
private volatile int released;
98+
9499
private ByteABlob(ByteBuf byteBuf, Format format) {
95100
this.byteBuf = byteBuf.retain();
96101
this.format = format;
97102
}
98103

99104
@Override
100105
public Mono<Void> discard() {
101-
return Mono.fromRunnable(() -> {
102-
if (this.byteBuf.refCnt() > 0) {
103-
this.byteBuf.release();
104-
}
105-
});
106+
return Mono.fromRunnable(this::release);
106107
}
107108

108109
@Override
@@ -113,7 +114,16 @@ public Mono<ByteBuffer> stream() {
113114
}
114115

115116
return ByteBuffer.wrap(AbstractBinaryCodec.decodeFromHex(this.byteBuf));
116-
}).doAfterTerminate(this.byteBuf::release);
117+
}).doAfterTerminate(this::release);
118+
}
119+
120+
/**
121+
* Release the retained buffer exactly once.
122+
*/
123+
private void release() {
124+
if (RELEASED.compareAndSet(this, 0, 1)) {
125+
this.byteBuf.release();
126+
}
117127
}
118128

119129
}

src/test/java/io/r2dbc/postgresql/codec/BlobCodecUnitTests.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,33 @@ void decodeNoByteBuf() {
6565
assertThat(new BlobCodec(TEST).decode(null, dataType, FORMAT_TEXT, Blob.class)).isNull();
6666
}
6767

68+
@Test
69+
void streamThenDiscardMustNotOverReleaseSharedBuffer() {
70+
71+
// The decoded column is a retained slice that shares its refCnt with the frame decoder's cumulation
72+
// buffer. Consuming the Blob via stream() and then discard() (as Flux.usingWhen does) must not release
73+
// the shared buffer twice, which would corrupt the still-alive cumulation (IllegalReferenceCountException).
74+
io.netty.buffer.ByteBuf cumulation = TEST.buffer();
75+
cumulation.writeBytes("\\x746573742d76616c7565".getBytes());
76+
io.netty.buffer.ByteBuf column = cumulation.readRetainedSlice(cumulation.readableBytes());
77+
assertThat(cumulation.refCnt()).isEqualTo(2);
78+
79+
Blob blob = new BlobCodec(TEST).doDecode(column, BYTEA, FORMAT_TEXT, Blob.class);
80+
column.release(); // DataRow.deallocate() releases the column once
81+
82+
Flux.from(blob.stream()).reduce(TEST.compositeBuffer(), (a, b) -> a.addComponent(true, Unpooled.wrappedBuffer(b)))
83+
.as(StepVerifier::create)
84+
.expectNextCount(1)
85+
.verifyComplete();
86+
87+
reactor.core.publisher.Mono.from(blob.discard()).block();
88+
89+
assertThat(cumulation.refCnt())
90+
.describedAs("shared cumulation refCnt after stream()+discard()")
91+
.isOne();
92+
cumulation.release();
93+
}
94+
6895
@Test
6996
void doCanDecode() {
7097
BlobCodec codec = new BlobCodec(TEST);

0 commit comments

Comments
 (0)