Skip to content

Commit fbc3a16

Browse files
merlimatejona86
authored andcommitted
core: Close InputStream in NoopClientStream.writeMessage() to prevent resource leaks
NoopClientStream.writeMessage() silently discards the InputStream without closing it. When a Marshaller.stream() returns an InputStream backed by a ref-counted ByteBuf (e.g. Netty's PooledByteBufAllocator), this causes a direct memory leak. This affects any code path where writeMessage() is called on a NoopClientStream or its subclass FailingClientStream: - Context cancelled before stream start (ClientCallImpl line 197) - Compressor not found (ClientCallImpl line 219) - Deadline already exceeded (ClientCallImpl line 262, via FailingClientStream) - DelayedStream draining buffered messages after cancellation sets realStream to NoopClientStream.INSTANCE The fix calls GrpcUtil.closeQuietly(message) to ensure the InputStream is always closed, matching the contract in AbstractStream.writeMessage().
1 parent 6cabaf4 commit fbc3a16

File tree

2 files changed

+47
-1
lines changed

2 files changed

+47
-1
lines changed

core/src/main/java/io/grpc/internal/NoopClientStream.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,9 @@ public Attributes getAttributes() {
4545
public void request(int numMessages) {}
4646

4747
@Override
48-
public void writeMessage(InputStream message) {}
48+
public void writeMessage(InputStream message) {
49+
GrpcUtil.closeQuietly(message);
50+
}
4951

5052
@Override
5153
public void flush() {}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright 2025 The gRPC Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc.internal;
18+
19+
import static org.mockito.Mockito.mock;
20+
import static org.mockito.Mockito.verify;
21+
22+
import java.io.InputStream;
23+
import org.junit.Test;
24+
import org.junit.runner.RunWith;
25+
import org.junit.runners.JUnit4;
26+
27+
/**
28+
* Unit tests for {@link NoopClientStream}.
29+
*/
30+
@RunWith(JUnit4.class)
31+
public class NoopClientStreamTest {
32+
33+
@Test
34+
public void writeMessageShouldCloseInputStream() throws Exception {
35+
// NoopClientStream.writeMessage() is called when a stream is cancelled or failed
36+
// before the real transport stream is established (e.g. via DelayedStream draining
37+
// buffered messages to NoopClientStream on cancellation, or FailingClientStream
38+
// which extends NoopClientStream). The InputStream must be closed to avoid leaking
39+
// resources such as ref-counted ByteBufs.
40+
InputStream message = mock(InputStream.class);
41+
NoopClientStream.INSTANCE.writeMessage(message);
42+
verify(message).close();
43+
}
44+
}

0 commit comments

Comments
 (0)