Skip to content

Commit bac9623

Browse files
committed
fix(tracing): use ApiFutures chaining to guarantee synchronous callback execution before future resolution
1 parent 232aeb8 commit bac9623

File tree

2 files changed

+50
-4
lines changed

2 files changed

+50
-4
lines changed

sdk-platform-java/gax-java/gax/src/main/java/com/google/api/gax/tracing/TracedBatchingCallable.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
*/
3030
package com.google.api.gax.tracing;
3131

32+
import com.google.api.core.ApiAsyncFunction;
33+
import com.google.api.core.ApiFunction;
3234
import com.google.api.core.ApiFuture;
3335
import com.google.api.core.ApiFutures;
3436
import com.google.api.core.BetaApi;
@@ -100,9 +102,30 @@ public ApiFuture<ResponseT> futureCall(RequestT request, ApiCallContext context)
100102

101103
context = context.withTracer(tracer);
102104
ApiFuture<ResponseT> future = innerCallable.futureCall(request, context);
103-
ApiFutures.addCallback(future, finisher, MoreExecutors.directExecutor());
104105

105-
return future;
106+
ApiFuture<ResponseT> onSuccessFuture =
107+
ApiFutures.transform(
108+
future,
109+
new ApiFunction<ResponseT, ResponseT>() {
110+
@Override
111+
public ResponseT apply(ResponseT response) {
112+
finisher.onSuccess(response);
113+
return response;
114+
}
115+
},
116+
MoreExecutors.directExecutor());
117+
118+
return ApiFutures.catchingAsync(
119+
onSuccessFuture,
120+
Throwable.class,
121+
new ApiAsyncFunction<Throwable, ResponseT>() {
122+
@Override
123+
public ApiFuture<ResponseT> apply(Throwable t) {
124+
finisher.onFailure(t);
125+
return ApiFutures.immediateFailedFuture(t);
126+
}
127+
},
128+
MoreExecutors.directExecutor());
106129
} catch (RuntimeException e) {
107130
finisher.onFailure(e);
108131
throw e;

sdk-platform-java/gax-java/gax/src/main/java/com/google/api/gax/tracing/TracedUnaryCallable.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
*/
3030
package com.google.api.gax.tracing;
3131

32+
import com.google.api.core.ApiAsyncFunction;
33+
import com.google.api.core.ApiFunction;
3234
import com.google.api.core.ApiFuture;
3335
import com.google.api.core.ApiFutures;
3436
import com.google.api.core.BetaApi;
@@ -100,9 +102,30 @@ public ApiFuture<ResponseT> futureCall(RequestT request, ApiCallContext context)
100102
try {
101103
context = context.withTracer(tracer);
102104
ApiFuture<ResponseT> future = innerCallable.futureCall(request, context);
103-
ApiFutures.addCallback(future, finisher, MoreExecutors.directExecutor());
104105

105-
return future;
106+
ApiFuture<ResponseT> onSuccessFuture =
107+
ApiFutures.transform(
108+
future,
109+
new ApiFunction<ResponseT, ResponseT>() {
110+
@Override
111+
public ResponseT apply(ResponseT response) {
112+
finisher.onSuccess(response);
113+
return response;
114+
}
115+
},
116+
MoreExecutors.directExecutor());
117+
118+
return ApiFutures.catchingAsync(
119+
onSuccessFuture,
120+
Throwable.class,
121+
new ApiAsyncFunction<Throwable, ResponseT>() {
122+
@Override
123+
public ApiFuture<ResponseT> apply(Throwable t) {
124+
finisher.onFailure(t);
125+
return ApiFutures.immediateFailedFuture(t);
126+
}
127+
},
128+
MoreExecutors.directExecutor());
106129
} catch (RuntimeException e) {
107130
finisher.onFailure(e);
108131
throw e;

0 commit comments

Comments
 (0)