Skip to content

Commit 68694fc

Browse files
committed
Polishing.
Release CopyData on discard to avoid memory leaks. See #714
1 parent aaa7fce commit 68694fc

4 files changed

Lines changed: 52 additions & 0 deletions

File tree

src/main/java/io/r2dbc/postgresql/message/frontend/CopyData.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,13 @@ public Publisher<ByteBuf> encode(ByteBufAllocator byteBufAllocator) {
6666
});
6767
}
6868

69+
@Override
70+
public void dispose() {
71+
if (refCnt() > 0) {
72+
release();
73+
}
74+
}
75+
6976
@Override
7077
public boolean equals(Object o) {
7178
if (this == o) {

src/test/java/io/r2dbc/postgresql/PostgresqlCopyInIntegrationTests.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.netty.buffer.Unpooled;
2121
import io.r2dbc.postgresql.ExceptionFactory.PostgresqlBadGrammarException;
2222
import io.r2dbc.spi.R2dbcNonTransientResourceException;
23+
import org.awaitility.Awaitility;
2324
import org.junit.jupiter.api.AfterEach;
2425
import org.junit.jupiter.api.BeforeEach;
2526
import org.junit.jupiter.api.Test;
@@ -28,6 +29,7 @@
2829
import reactor.core.publisher.Mono;
2930
import reactor.test.StepVerifier;
3031

32+
import java.time.Duration;
3133
import java.util.Collections;
3234
import java.util.List;
3335

@@ -146,6 +148,24 @@ void shouldFailOnInvalidStatement() {
146148
verifyCopyInFailed(sql, data, "syntax error at or near \"command\"");
147149
}
148150

151+
@Test
152+
void shouldReleaseBufferWhenDataArrivesAfterStatementFailure() {
153+
String sql = "COPY invalid command";
154+
ByteBuf buffer = byteBuf("something,something-invalid\n");
155+
156+
// Delay the data so that the backend reports the statement failure (and the copy is stopped) before the
157+
// CopyData is produced. The CopyData is then dropped on the request side and must still be released.
158+
Flux<ByteBuf> data = Flux.just(buffer).delayElements(Duration.ofMillis(100));
159+
160+
this.connection.copyIn(sql, data)
161+
.as(StepVerifier::create)
162+
.consumeErrorWith(e -> assertThat(e).isInstanceOf(PostgresqlBadGrammarException.class))
163+
.verify();
164+
165+
Awaitility.await().atMost(Duration.ofSeconds(5)).until(() -> buffer.refCnt() == 0);
166+
assertThat(buffer.refCnt()).describedAs("CopyData buffer must be released").isZero();
167+
}
168+
149169
@Test
150170
void shouldFailOnInvalidDataType() {
151171
String sql = "COPY test (val, timestamp) FROM STDIN WITH DELIMITER ','";

src/test/java/io/r2dbc/postgresql/PostgresqlCopyInUnitTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import static io.r2dbc.postgresql.message.backend.ReadyForQuery.TransactionStatus.IDLE;
4242
import static java.util.Collections.emptyList;
4343
import static java.util.Collections.emptySet;
44+
import static org.assertj.core.api.Assertions.assertThat;
4445

4546
/**
4647
* Unit tests for {@link PostgresqlCopyIn}.
@@ -77,6 +78,8 @@ void copyInErrorResponse() {
7778
.as(StepVerifier::create)
7879
.expectError(PostgresqlNonTransientResourceException.class)
7980
.verify();
81+
82+
assertThat(byteBuffer.refCnt()).describedAs("CopyData buffer must be released on failure").isZero();
8083
}
8184

8285
@Test

src/test/java/io/r2dbc/postgresql/message/frontend/CopyDataUnitTests.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.r2dbc.postgresql.message.frontend;
1818

19+
import io.netty.buffer.ByteBuf;
1920
import org.junit.jupiter.api.Test;
2021

2122
import static io.r2dbc.postgresql.message.frontend.FrontendMessageAssert.assertThat;
@@ -44,4 +45,25 @@ void encode() {
4445
.writeInt(100));
4546
}
4647

48+
@Test
49+
void disposeReleasesBuffer() {
50+
ByteBuf buffer = TEST.buffer(4).writeInt(100);
51+
CopyData data = new CopyData(buffer);
52+
53+
data.dispose();
54+
55+
org.assertj.core.api.Assertions.assertThat(buffer.refCnt()).isZero();
56+
}
57+
58+
@Test
59+
void disposeIsIdempotent() {
60+
ByteBuf buffer = TEST.buffer(4).writeInt(100);
61+
CopyData data = new CopyData(buffer);
62+
63+
data.dispose();
64+
data.dispose();
65+
66+
org.assertj.core.api.Assertions.assertThat(buffer.refCnt()).isZero();
67+
}
68+
4769
}

0 commit comments

Comments
 (0)