3030import com .google .protobuf .InvalidProtocolBufferException ;
3131import com .google .protobuf .Message ;
3232import com .google .protobuf .Struct ;
33+ import com .google .protobuf .UnsafeByteOperations ;
3334import com .google .protobuf .Value ;
3435import com .google .protobuf .util .Durations ;
3536import io .envoyproxy .envoy .config .core .v3 .GrpcService ;
5758import io .grpc .ClientCall ;
5859import io .grpc .ClientInterceptor ;
5960import io .grpc .Deadline ;
61+ import io .grpc .Detachable ;
6062import io .grpc .DoubleHistogramMetricInstrument ;
63+ import io .grpc .Drainable ;
6164import io .grpc .ForwardingClientCall .SimpleForwardingClientCall ;
6265import io .grpc .ForwardingClientCallListener .SimpleForwardingClientCallListener ;
66+ import io .grpc .HasByteBuffer ;
67+ import io .grpc .KnownLength ;
6368import io .grpc .Metadata ;
6469import io .grpc .MethodDescriptor ;
6570import io .grpc .MetricInstrumentRegistry ;
8893import io .grpc .xds .internal .headermutations .HeaderMutations ;
8994import io .grpc .xds .internal .headermutations .HeaderMutator ;
9095import io .grpc .xds .internal .headermutations .HeaderValueOption ;
91- import java .io .ByteArrayInputStream ;
9296import java .io .IOException ;
9397import java .io .InputStream ;
98+ import java .io .OutputStream ;
99+ import java .nio .ByteBuffer ;
94100import java .util .ArrayList ;
95101import java .util .List ;
96102import java .util .Locale ;
@@ -1387,16 +1393,16 @@ public void sendMessage(InputStream message) {
13871393
13881394 // Mode is GRPC
13891395 try {
1390- byte [] bodyBytes = ByteStreams . toByteArray (message );
1396+ ByteString bodyByteString = outboundStreamToByteString (message );
13911397 sendToExtProc (ProcessingRequest .newBuilder ()
13921398 .setRequestBody (HttpBody .newBuilder ()
1393- .setBody (ByteString . copyFrom ( bodyBytes ) )
1399+ .setBody (bodyByteString )
13941400 .setEndOfStream (false )
13951401 .build ())
13961402 .build ());
13971403
13981404 if (config .getObservabilityMode ()) {
1399- super .sendMessage (new ByteArrayInputStream ( bodyBytes ));
1405+ super .sendMessage (new OutboundZeroCopyInputStream ( bodyByteString ));
14001406 }
14011407 } catch (IOException e ) {
14021408 rawCall .cancel ("Failed to serialize message for External Processor" , e );
@@ -1459,7 +1465,7 @@ private void handleRequestBodyResponse(BodyResponse bodyResponse) {
14591465 if (mutation .hasStreamedResponse ()) {
14601466 StreamedBodyResponse streamed = mutation .getStreamedResponse ();
14611467 if (!streamed .getEndOfStreamWithoutMessage ()) {
1462- super .sendMessage (streamed .getBody (). newInput ( ));
1468+ super .sendMessage (new OutboundZeroCopyInputStream ( streamed .getBody ()));
14631469 }
14641470 if (streamed .getEndOfStream () || streamed .getEndOfStreamWithoutMessage ()) {
14651471 if (requestSideClosed .compareAndSet (false , true )) {
@@ -1633,11 +1639,11 @@ public void onMessage(InputStream message) {
16331639 }
16341640
16351641 try {
1636- byte [] bodyBytes = ByteStreams . toByteArray (message );
1637- sendResponseBodyToExtProc (bodyBytes , false );
1642+ ByteString bodyByteString = inboundStreamToByteString (message );
1643+ sendResponseBodyToExtProc (bodyByteString , false );
16381644
16391645 if (dataPlaneClientCall .config .getObservabilityMode ()) {
1640- delegate ().onMessage (new ByteArrayInputStream ( bodyBytes ));
1646+ delegate ().onMessage (new InboundZeroCopyInputStream ( bodyByteString ));
16411647 }
16421648 } catch (IOException e ) {
16431649 rawCall .cancel ("Failed to read server response" , e );
@@ -1739,7 +1745,7 @@ private void proceedWithClose(Status status, Metadata trailers) {
17391745 }
17401746
17411747 void onExternalBody (ByteString body ) {
1742- delegate ().onMessage (body . newInput ( ));
1748+ delegate ().onMessage (new InboundZeroCopyInputStream ( body ));
17431749 }
17441750
17451751 void unblockAfterStreamComplete () {
@@ -1793,7 +1799,8 @@ private void triggerCloseHandshake() {
17931799 }
17941800 }
17951801
1796- private void sendResponseBodyToExtProc (@ Nullable byte [] bodyBytes , boolean endOfStream ) {
1802+ private void sendResponseBodyToExtProc (
1803+ @ Nullable ByteString bodyByteString , boolean endOfStream ) {
17971804 if (dataPlaneClientCall .extProcStreamState .get ().isCompleted ()
17981805 || dataPlaneClientCall .currentProcessingMode .getResponseBodyMode ()
17991806 != ProcessingMode .BodySendMode .GRPC ) {
@@ -1802,8 +1809,8 @@ private void sendResponseBodyToExtProc(@Nullable byte[] bodyBytes, boolean endOf
18021809
18031810 HttpBody .Builder bodyBuilder =
18041811 HttpBody .newBuilder ();
1805- if (bodyBytes != null ) {
1806- bodyBuilder .setBody (ByteString . copyFrom ( bodyBytes ) );
1812+ if (bodyByteString != null ) {
1813+ bodyBuilder .setBody (bodyByteString );
18071814 }
18081815 bodyBuilder .setEndOfStream (endOfStream );
18091816
@@ -1813,4 +1820,140 @@ private void sendResponseBodyToExtProc(@Nullable byte[] bodyBytes, boolean endOf
18131820 }
18141821 }
18151822 }
1823+
1824+ private static ByteString inboundStreamToByteString (InputStream message ) throws IOException {
1825+ if (message == null ) {
1826+ return ByteString .EMPTY ;
1827+ }
1828+ if (message instanceof HasByteBuffer && ((HasByteBuffer ) message ).byteBufferSupported ()) {
1829+ ByteBuffer byteBuffer = ((HasByteBuffer ) message ).getByteBuffer ();
1830+ if (byteBuffer != null ) {
1831+ return UnsafeByteOperations .unsafeWrap (byteBuffer );
1832+ }
1833+ }
1834+ byte [] bodyBytes = ByteStreams .toByteArray (message );
1835+ return ByteString .copyFrom (bodyBytes );
1836+ }
1837+
1838+ private static ByteString outboundStreamToByteString (InputStream message ) throws IOException {
1839+ if (message == null ) {
1840+ return ByteString .EMPTY ;
1841+ }
1842+ if (message instanceof Drainable ) {
1843+ int size = message .available ();
1844+ ByteString .Output output =
1845+ size > 0 ? ByteString .newOutput (size ) : ByteString .newOutput ();
1846+ ((Drainable ) message ).drainTo (output );
1847+ return output .toByteString ();
1848+ }
1849+ byte [] bodyBytes = ByteStreams .toByteArray (message );
1850+ return ByteString .copyFrom (bodyBytes );
1851+ }
1852+
1853+ private static final class InboundZeroCopyInputStream extends InputStream
1854+ implements HasByteBuffer , Detachable , KnownLength {
1855+ private final ByteString byteString ;
1856+ private final InputStream delegate ;
1857+ private boolean detached = false ;
1858+
1859+ InboundZeroCopyInputStream (ByteString byteString ) {
1860+ this .byteString = byteString ;
1861+ this .delegate = byteString .newInput ();
1862+ }
1863+
1864+ @ Override
1865+ public int read () throws IOException {
1866+ return delegate .read ();
1867+ }
1868+
1869+ @ Override
1870+ public int read (byte [] b , int off , int len ) throws IOException {
1871+ return delegate .read (b , off , len );
1872+ }
1873+
1874+ @ Override
1875+ public int available () throws IOException {
1876+ return delegate .available ();
1877+ }
1878+
1879+ @ Override
1880+ public void close () throws IOException {
1881+ delegate .close ();
1882+ }
1883+
1884+ @ Override
1885+ public boolean markSupported () {
1886+ return delegate .markSupported ();
1887+ }
1888+
1889+ @ Override
1890+ public void mark (int readlimit ) {
1891+ delegate .mark (readlimit );
1892+ }
1893+
1894+ @ Override
1895+ public void reset () throws IOException {
1896+ delegate .reset ();
1897+ }
1898+
1899+ @ Override
1900+ public boolean byteBufferSupported () {
1901+ return !detached ;
1902+ }
1903+
1904+ @ Override
1905+ public ByteBuffer getByteBuffer () {
1906+ if (detached ) {
1907+ throw new IllegalStateException ("Stream has been detached" );
1908+ }
1909+ return byteString .asReadOnlyByteBuffer ();
1910+ }
1911+
1912+ @ Override
1913+ public InputStream detach () {
1914+ if (detached ) {
1915+ throw new IllegalStateException ("Stream already detached" );
1916+ }
1917+ detached = true ;
1918+ return delegate ;
1919+ }
1920+ }
1921+
1922+ private static final class OutboundZeroCopyInputStream extends InputStream
1923+ implements Drainable , KnownLength {
1924+ private final ByteString byteString ;
1925+ private final InputStream delegate ;
1926+
1927+ OutboundZeroCopyInputStream (ByteString byteString ) {
1928+ this .byteString = byteString ;
1929+ this .delegate = byteString .newInput ();
1930+ }
1931+
1932+ @ Override
1933+ public int read () throws IOException {
1934+ return delegate .read ();
1935+ }
1936+
1937+ @ Override
1938+ public int read (byte [] b , int off , int len ) throws IOException {
1939+ return delegate .read (b , off , len );
1940+ }
1941+
1942+ @ Override
1943+ public int available () throws IOException {
1944+ return delegate .available ();
1945+ }
1946+
1947+ @ Override
1948+ public void close () throws IOException {
1949+ delegate .close ();
1950+ }
1951+
1952+ @ Override
1953+ public int drainTo (OutputStream target ) throws IOException {
1954+ int size = byteString .size ();
1955+ byteString .writeTo (target );
1956+ return size ;
1957+ }
1958+ }
18161959}
0 commit comments