Skip to content

Commit e49fea0

Browse files
committed
GH-62: throw Statement cancelled on queued streams
1 parent d0b862c commit e49fea0

1 file changed

Lines changed: 20 additions & 8 deletions

File tree

flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFlightStreamResultSet.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@
2727
import org.apache.arrow.driver.jdbc.client.CloseableEndpointStreamPair;
2828
import org.apache.arrow.driver.jdbc.utils.FlightEndpointDataQueue;
2929
import org.apache.arrow.driver.jdbc.utils.VectorSchemaRootTransformer;
30+
import org.apache.arrow.flight.CallStatus;
3031
import org.apache.arrow.flight.FlightInfo;
32+
import org.apache.arrow.flight.FlightRuntimeException;
3133
import org.apache.arrow.flight.FlightStream;
3234
import org.apache.arrow.util.AutoCloseables;
3335
import org.apache.arrow.vector.VectorSchemaRoot;
@@ -193,18 +195,28 @@ public boolean next() throws SQLException {
193195
return true;
194196
}
195197

196-
if (currentEndpointData != null) {
197-
currentEndpointData.getStream().getRoot().clear();
198-
if (currentEndpointData.getStream().next()) {
199-
populateDataForCurrentFlightStream();
200-
continue;
198+
try {
199+
if (currentEndpointData != null) {
200+
currentEndpointData.getStream().getRoot().clear();
201+
if (currentEndpointData.getStream().next()) {
202+
populateDataForCurrentFlightStream();
203+
continue;
204+
}
205+
206+
flightEndpointDataQueue.enqueue(currentEndpointData);
201207
}
202208

203-
flightEndpointDataQueue.enqueue(currentEndpointData);
209+
currentEndpointData = getNextEndpointStream(false);
210+
} catch (final FlightRuntimeException e) {
211+
// A concurrent statement.cancel() (or close) cancels in-flight FlightStreams,
212+
// which surface here as CANCELLED. Normalize to Avatica's "Statement canceled".
213+
if (flightEndpointDataQueue.isClosed()
214+
&& e.status().code() == CallStatus.CANCELLED.code()) {
215+
throw AvaticaConnection.HELPER.createException("Statement canceled");
216+
}
217+
throw e;
204218
}
205219

206-
currentEndpointData = getNextEndpointStream(false);
207-
208220
if (currentEndpointData != null) {
209221
populateDataForCurrentFlightStream();
210222
continue;

0 commit comments

Comments
 (0)