|
23 | 23 | import com.google.api.gax.rpc.ApiCallContext; |
24 | 24 | import com.google.api.gax.rpc.ResponseObserver; |
25 | 25 | import com.google.api.gax.rpc.ServerStream; |
26 | | -import com.google.api.gax.rpc.StateCheckingResponseObserver; |
27 | 26 | import com.google.api.gax.rpc.StreamController; |
28 | 27 | import com.google.cloud.bigtable.data.v2.BigtableDataClient; |
29 | 28 | import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation; |
|
48 | 47 | import com.google.protobuf.ByteString; |
49 | 48 | import io.grpc.CallOptions; |
50 | 49 | import io.grpc.Deadline; |
51 | | -import io.grpc.stub.StreamObserver; |
52 | 50 | import java.util.ArrayDeque; |
53 | 51 | import java.util.ArrayList; |
54 | 52 | import java.util.Iterator; |
@@ -162,10 +160,8 @@ public ApiFuture<List<Result>> readRowsAsync(Query request) { |
162 | 160 | } |
163 | 161 |
|
164 | 162 | @Override |
165 | | - public void readRowsAsync(Query request, StreamObserver<Result> observer) { |
166 | | - delegate |
167 | | - .readRowsCallable(RESULT_ADAPTER) |
168 | | - .call(request, new StreamObserverAdapter<>(observer), createScanCallContext()); |
| 163 | + public void readRowsAsync(Query request, ResponseObserver<Result> observer) { |
| 164 | + delegate.readRowsCallable(RESULT_ADAPTER).call(request, observer, createScanCallContext()); |
169 | 165 | } |
170 | 166 |
|
171 | 167 | // Point reads are implemented using a streaming ReadRows RPC. So timeouts need to be managed |
@@ -218,29 +214,6 @@ public void close() { |
218 | 214 | delegate.close(); |
219 | 215 | } |
220 | 216 |
|
221 | | - /** wraps {@link StreamObserver} onto GCJ {@link com.google.api.gax.rpc.ResponseObserver}. */ |
222 | | - private static class StreamObserverAdapter<T> extends StateCheckingResponseObserver<T> { |
223 | | - private final StreamObserver<T> delegate; |
224 | | - |
225 | | - StreamObserverAdapter(StreamObserver<T> delegate) { |
226 | | - this.delegate = delegate; |
227 | | - } |
228 | | - |
229 | | - protected void onStartImpl(StreamController controller) {} |
230 | | - |
231 | | - protected void onResponseImpl(T response) { |
232 | | - this.delegate.onNext(response); |
233 | | - } |
234 | | - |
235 | | - protected void onErrorImpl(Throwable t) { |
236 | | - this.delegate.onError(t); |
237 | | - } |
238 | | - |
239 | | - protected void onCompleteImpl() { |
240 | | - this.delegate.onCompleted(); |
241 | | - } |
242 | | - } |
243 | | - |
244 | 217 | /** |
245 | 218 | * wraps {@link ServerStream} onto HBase {@link ResultScanner}. {@link PaginatedRowResultScanner} |
246 | 219 | * gets a paginator and a {@link Query.QueryPaginator} used to get a {@link ServerStream}<{@link |
|
0 commit comments