Skip to content

Commit 852ed0b

Browse files
refactor: replace CleanupListener.closed flag with chain.isDone()
VOperationImpl.start needs to detect whether chain.start synchronously delivered a terminal onClose, so it can skip registering the gRPC cancellation listener (which would otherwise leak onto grpcContext). Previously this was tracked via a 'closed' flag on CleanupListener — a piggyback bookkeeping field on the listener wrapper that exists only for VOperationImpl's coordination. Add isDone() to the VRpc interface and ask the chain directly. The chain is the natural source of truth for its own terminal state. CleanupListener shrinks back to its single concern: relay events and unhook the gRPC cancellation listener on close. Implementations: RetryingVRpc delegates to currentState.isDone(); VRpcImpl reports state==CLOSED; ForwardingVRpc forwards; PendingVRpc defers to realCall once handed off, otherwise reports isCancelled. Test fakes (DelayedVRpc, FakeVRpc, anonymous VOperationImplTest chains) implement the new method. Drive-by: drop the defensive handling of tracer.onOperationStart throws from RetryingVRpc.start, and the symmetric `!started` early-return in Done.onStart that paired with it. CompositeVRpcTracer catches throws from every child tracer, so the only way tracer.onOperationStart reaches RetryingVRpc with a real throw is a test that bypasses Composite. Dead code in production; relying on the existing chain.cancel cascade is simpler than maintaining a separate short-circuit path. Also: the already-started error in RetryingVRpc.start now dispatches listener.onClose through ctx.getExecutor() rather than invoking it synchronously on the caller, matching the dispatch convention used everywhere else in the chain.
1 parent 6eec32c commit 852ed0b

9 files changed

Lines changed: 71 additions & 25 deletions

File tree

java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/middleware/ForwardingVRpc.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,11 @@ public void cancel(@Nullable String message, @Nullable Throwable cause) {
3636
delegate.cancel(message, cause);
3737
}
3838

39+
@Override
40+
public boolean isDone() {
41+
return delegate.isDone();
42+
}
43+
3944
@Override
4045
public void requestNext() {
4146
delegate.requestNext();

java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/middleware/RetryingVRpc.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,10 @@ public RetryingVRpc(Supplier<VRpc<ReqT, RespT>> supplier, BigtableTimer timer) {
6969
@Override
7070
public void start(ReqT req, VRpcCallContext ctx, VRpcListener<RespT> listener) {
7171
if (started) {
72-
listener.onClose(
72+
VRpcResult alreadyStarted =
7373
VRpcResult.createRejectedError(
74-
Status.FAILED_PRECONDITION.withDescription("operation is already started")));
74+
Status.FAILED_PRECONDITION.withDescription("operation is already started"));
75+
ctx.getExecutor().execute(() -> listener.onClose(alreadyStarted));
7576
return;
7677
}
7778

@@ -117,6 +118,11 @@ public void cancel(@Nullable String message, @Nullable Throwable cause) {
117118
Status.CANCELLED.withDescription(message).withCause(finalCause))));
118119
}
119120

121+
@Override
122+
public boolean isDone() {
123+
return currentState.isDone();
124+
}
125+
120126
@Override
121127
public void requestNext() {
122128
// Assert the op-executor affinity even though the body is dead today — when streaming lands
@@ -389,10 +395,6 @@ class Done extends State {
389395

390396
@Override
391397
public void onStart() {
392-
if (!started) {
393-
LOG.fine("operation is not started yet.");
394-
return;
395-
}
396398
// Per-attempt tracer pairing is owned by Active.onExit; Done just runs the user listener
397399
// and the per-operation tracer finish.
398400
Stopwatch appTimer = Stopwatch.createStarted();

java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/middleware/VOperationImpl.java

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -95,21 +95,21 @@ public void start(ReqT req, VRpcListener<RespT> listener) {
9595
// Register AFTER chain.start so a context-cancel that fires immediately is sequenced behind
9696
// start. Matches ClientCallImpl's ordering (grpc-java issue #1343).
9797
//
98-
// Queueing the registration onto the op executor is what makes the closed-check sound: any
98+
// Queueing the registration onto the op executor is what makes the isDone-check sound: any
9999
// onClose that chain.start enqueued during runInline drains FIRST (FIFO), so by the time this
100-
// task runs wrapped.closed reflects whether onClose has already fired. If it has, we skip
101-
// addListener — otherwise the listener would pin the chain on grpcContext for its lifetime
102-
// (CleanupListener.onClose already called removeListener as a no-op pre-registration). If
103-
// grpcContext gets cancelled between start() returning and this task running, the
104-
// directExecutor below fires the listener immediately on addListener, so cancel still
105-
// propagates correctly.
100+
// task runs chain.isDone() reflects whether the chain has already reached terminal. If it
101+
// has, we skip addListener — otherwise the listener would pin the chain on grpcContext for
102+
// its lifetime (CleanupListener.onClose already called removeListener as a no-op
103+
// pre-registration). If grpcContext gets cancelled between start() returning and this task
104+
// running, the directExecutor below fires the listener immediately on addListener, so cancel
105+
// still propagates correctly.
106106
//
107107
// runInline is the right verb here: when chain.start enqueued nothing (common path), the
108108
// executor is idle and the body runs inline on this thread — no extra context switch. When
109109
// chain.start did enqueue an onClose, runInline takes the queue branch and FIFO drains both.
110110
exec.runInline(
111111
() -> {
112-
if (!wrapped.closed) {
112+
if (!chain.isDone()) {
113113
grpcContext.addListener(cancellationListener, MoreExecutors.directExecutor());
114114
}
115115
});
@@ -123,10 +123,6 @@ public void cancel(@Nullable String message, @Nullable Throwable cause) {
123123
private static class CleanupListener<RespT> extends ForwardListener<RespT> {
124124
private final Context grpcContext;
125125
private final Context.CancellationListener cancellationListener;
126-
// Read by VOperationImpl.start on the caller thread after runInline returns. runInline runs
127-
// chain.start synchronously, so any sync onClose has completed (and this flag been set) by
128-
// the time start() reads it on the same thread — no synchronization needed.
129-
volatile boolean closed = false;
130126

131127
CleanupListener(
132128
VRpcListener<RespT> delegate,
@@ -139,7 +135,6 @@ private static class CleanupListener<RespT> extends ForwardListener<RespT> {
139135

140136
@Override
141137
public void onClose(VRpcResult result) {
142-
closed = true;
143138
grpcContext.removeListener(cancellationListener);
144139
super.onClose(result);
145140
}

java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/middleware/VRpc.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,14 @@ public interface VRpc<ReqT, RespT> {
5959
/** Cancel a started RPC. This will be done by best effort. */
6060
void cancel(@Nullable String message, @Nullable Throwable cause);
6161

62+
/**
63+
* True once a terminal result has been (or is about to be) delivered to the listener; future
64+
* events on this VRpc are no-ops. Callers use this to detect a synchronous terminal during
65+
* {@link #start} — e.g. VOperationImpl checks this to avoid registering a gRPC cancellation
66+
* listener that would never be removed because the chain already finished.
67+
*/
68+
boolean isDone();
69+
6270
/**
6371
* TBD - server streaming rpcs. This will be used to request more data. Unlike gRPC's request(n),
6472
* starting a call will implicitly request the first message.

java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionPoolImpl.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -775,6 +775,13 @@ void cancelWithResult(VRpcResult result) {
775775
});
776776
}
777777

778+
@Override
779+
public boolean isDone() {
780+
// realCall set in drainTo's lambda; once we hand off, it's the source of truth.
781+
// Pre-handoff, isCancelled tracks our own terminal state.
782+
return realCall != null ? realCall.isDone() : isCancelled;
783+
}
784+
778785
@Override
779786
public void requestNext() {
780787
if (realCall != null) {

java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/VRpcImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,11 @@ public void cancel(@Nullable String message, @Nullable Throwable cause) {
196196
session.cancelRpc(rpcId, message, cause);
197197
}
198198

199+
@Override
200+
public boolean isDone() {
201+
return state.get() == State.CLOSED;
202+
}
203+
199204
@Override
200205
public void requestNext() {
201206
throw new UnsupportedOperationException("streamed RPCs are not supported yet");

java-bigtable/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/api/TableBaseTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,11 @@ public void start(Object req, VRpcCallContext ctx, VRpcListener ignored) {
219219
@Override
220220
public void cancel(@Nullable String message, @Nullable Throwable cause) {}
221221

222+
@Override
223+
public boolean isDone() {
224+
return false;
225+
}
226+
222227
@Override
223228
public void requestNext() {}
224229
}

java-bigtable/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/csm/tracers/VRpcTracerTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -510,6 +510,11 @@ public void cancel(@Nullable String message, @Nullable Throwable cause) {
510510
throw new UnsupportedOperationException();
511511
}
512512

513+
@Override
514+
public boolean isDone() {
515+
return false;
516+
}
517+
513518
@Override
514519
public void requestNext() {
515520
throw new UnsupportedOperationException();

java-bigtable/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/middleware/VOperationImplTest.java

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,11 @@ public void cancel(@Nullable String msg, @Nullable Throwable cause) {
5757
cancelLatch.countDown();
5858
}
5959

60+
@Override
61+
public boolean isDone() {
62+
return false;
63+
}
64+
6065
@Override
6166
public void requestNext() {}
6267
};
@@ -88,9 +93,9 @@ public void onClose(VRpcResult result) {}
8893

8994
@Test
9095
void asyncOnCloseFromChainDoesNotPropagateLaterContextCancel() throws InterruptedException {
91-
// Regression for the wrapped.closed TOCTOU. When chain.start asynchronously queues an
96+
// Regression for the chain-already-done TOCTOU. When chain.start asynchronously queues an
9297
// onClose via the op executor, the addListener task (also queued through the op executor
93-
// by VOperationImpl.start) drains FIFO after the onClose and observes wrapped.closed=true,
98+
// by VOperationImpl.start) drains FIFO after the onClose and observes chain.isDone()=true,
9499
// so the cancellationListener is NOT registered. A later grpcContext.cancel therefore has
95100
// no path to reach chain.cancel — which is correct because the chain has already terminated.
96101
Context.CancellableContext grpcContext = Context.current().withCancellation();
@@ -100,22 +105,31 @@ void asyncOnCloseFromChainDoesNotPropagateLaterContextCancel() throws Interrupte
100105

101106
VRpc<String, String> chain =
102107
new VRpc<String, String>() {
108+
private volatile boolean done = false;
109+
103110
@Override
104111
public void start(String req, VRpcCallContext ctx, VRpcListener<String> listener) {
105112
// Simulate PendingVRpc pool-closed branch / VRpcImpl deadline short-circuit.
106113
ctx.getExecutor()
107114
.execute(
108-
() ->
109-
listener.onClose(
110-
VRpcResult.createUncommitedError(
111-
Status.UNAVAILABLE.withDescription("fast-fail"))));
115+
() -> {
116+
done = true;
117+
listener.onClose(
118+
VRpcResult.createUncommitedError(
119+
Status.UNAVAILABLE.withDescription("fast-fail")));
120+
});
112121
}
113122

114123
@Override
115124
public void cancel(@Nullable String msg, @Nullable Throwable cause) {
116125
chainCancelCount.incrementAndGet();
117126
}
118127

128+
@Override
129+
public boolean isDone() {
130+
return done;
131+
}
132+
119133
@Override
120134
public void requestNext() {}
121135
};

0 commit comments

Comments
 (0)