@@ -113,12 +113,17 @@ public void addClientMiddleware(FlightClientMiddleware.Factory factory) {
113113 * @param root VectorSchemaRoot the root containing data
114114 * @param metadataListener A handler for metadata messages from the server. This will be passed buffers that will be
115115 * freed after {@link StreamListener#onNext(Object)} is called!
116+ * @param maxRequestsInFlight the max in-flight requests in the stream
116117 * @param options RPC-layer hints for this call.
117118 * @return ClientStreamListener an interface to control uploading data
118119 */
119120 public ClientStreamListener startPut (
120- FlightDescriptor descriptor , VectorSchemaRoot root , PutListener metadataListener , CallOption ... options ) {
121- return startPut (descriptor , root , new MapDictionaryProvider (), metadataListener , options );
121+ FlightDescriptor descriptor ,
122+ VectorSchemaRoot root ,
123+ PutListener metadataListener ,
124+ long maxRequestsInFlight ,
125+ CallOption ... options ) {
126+ return startPut (descriptor , root , new MapDictionaryProvider (), metadataListener , maxRequestsInFlight , options );
122127 }
123128
124129 /**
@@ -128,6 +133,7 @@ public ClientStreamListener startPut(
128133 * @param root VectorSchemaRoot the root containing data
129134 * @param provider A dictionary provider for the root.
130135 * @param metadataListener A handler for metadata messages from the server.
136+ * @param maxRequestsInFlight the max in-flight requests in the stream
131137 * @param options RPC-layer hints for this call.
132138 * @return ClientStreamListener an interface to control uploading data.
133139 * {@link ClientStreamListener#start(VectorSchemaRoot, DictionaryProvider)} will already have been called.
@@ -137,10 +143,11 @@ public ClientStreamListener startPut(
137143 VectorSchemaRoot root ,
138144 DictionaryProvider provider ,
139145 PutListener metadataListener ,
146+ long maxRequestsInFlight ,
140147 CallOption ... options ) {
141148 Preconditions .checkNotNull (root , "root must not be null" );
142149 Preconditions .checkNotNull (provider , "provider must not be null" );
143- ClientStreamListener writer = startPut (descriptor , metadataListener , options );
150+ ClientStreamListener writer = startPut (descriptor , metadataListener , maxRequestsInFlight , options );
144151 writer .start (root , provider );
145152 return writer ;
146153 }
@@ -150,18 +157,22 @@ public ClientStreamListener startPut(
150157 *
151158 * @param descriptor FlightDescriptor the descriptor for the data
152159 * @param metadataListener A handler for metadata messages from the server.
160+ * @param maxRequestsInFlight the max in-flight requests in the stream
153161 * @param options RPC-layer hints for this call.
154162 * @return ClientStreamListener an interface to control uploading data.
155163 * {@link ClientStreamListener#start(VectorSchemaRoot, DictionaryProvider)} will NOT already have been called.
156164 */
157165 public ClientStreamListener startPut (
158- FlightDescriptor descriptor , PutListener metadataListener , CallOption ... options ) {
166+ FlightDescriptor descriptor ,
167+ PutListener metadataListener ,
168+ long maxRequestsInFlight ,
169+ CallOption ... options ) {
159170 Preconditions .checkNotNull (descriptor , "descriptor must not be null" );
160171 Preconditions .checkNotNull (metadataListener , "metadataListener must not be null" );
161172
162173 try {
163174 ClientCall <ArrowMessage , Flight .PutResult > call = asyncStubNewCall (this .doPutDescriptor , options );
164- OnStreamReadyHandler onStreamReadyHandler = new OnStreamReadyHandler ();
175+ OnStreamReadyHandler onStreamReadyHandler = new OnStreamReadyHandler (( int ) maxRequestsInFlight );
165176 SetStreamObserver resultObserver =
166177 new SetStreamObserver (this .allocator , metadataListener , onStreamReadyHandler );
167178 ClientCallStreamObserver <ArrowMessage > observer =
@@ -184,11 +195,17 @@ public DictionaryProvider newDefaultDictionaryProvider() {
184195 }
185196
186197 private static class OnStreamReadyHandler implements Runnable {
187- private final Semaphore semaphore = new Semaphore (0 );
198+ private final int maxRequestsInFlight ;
199+ private final Semaphore semaphore ;
200+
201+ OnStreamReadyHandler (int maxRequestsInFlight ) {
202+ this .maxRequestsInFlight = maxRequestsInFlight ;
203+ this .semaphore = new Semaphore (maxRequestsInFlight );
204+ }
188205
189206 @ Override
190207 public void run () {
191- this .semaphore .release ();
208+ this .semaphore .release (this . maxRequestsInFlight );
192209 }
193210
194211 /**
@@ -338,7 +355,10 @@ protected void waitUntilStreamReady() {
338355 // If the stream is not ready, wait for a short time to avoid busy waiting
339356 // This helps reduce CPU usage while still being responsive
340357 try {
341- this .onStreamReadyHandler .await (10 , TimeUnit .MILLISECONDS );
358+ if (this .onStreamReadyHandler .await (10 , TimeUnit .MILLISECONDS )) {
359+ // Allow some in-flight requests to be sent
360+ break ;
361+ }
342362 } catch (InterruptedException e ) {
343363 Thread .currentThread ().interrupt ();
344364 throw new RuntimeException ("Interrupted while waiting for stream to be ready" , e );
0 commit comments