Skip to content

Commit 2c4db47

Browse files
authored
RATIS-2433. Cancel transaction in case of failure to append (#1382)
1 parent df2a302 commit 2c4db47

4 files changed

Lines changed: 98 additions & 38 deletions

File tree

ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java

Lines changed: 63 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -768,17 +768,18 @@ private CompletableFuture<RaftClientReply> checkLeaderState(RaftClientRequest re
768768
} catch (GroupMismatchException e) {
769769
return JavaUtils.completeExceptionally(e);
770770
}
771-
return checkLeaderState(request, null);
771+
return checkLeaderState(request, null, null);
772772
}
773773

774774
/**
775775
* @return null if the server is in leader state.
776776
*/
777-
private CompletableFuture<RaftClientReply> checkLeaderState(RaftClientRequest request, CacheEntry entry) {
777+
private CompletableFuture<RaftClientReply> checkLeaderState(
778+
RaftClientRequest request, CacheEntry entry, TransactionContextImpl context) {
778779
if (!getInfo().isLeader()) {
779780
NotLeaderException exception = generateNotLeaderException();
780781
final RaftClientReply reply = newExceptionReply(request, exception);
781-
return RetryCacheImpl.failWithReply(reply, entry);
782+
return failWithReply(reply, entry, context);
782783
}
783784
if (!getInfo().isLeaderReady()) {
784785
final CacheEntry cacheEntry = retryCache.getIfPresent(ClientInvocationId.valueOf(request));
@@ -787,13 +788,13 @@ private CompletableFuture<RaftClientReply> checkLeaderState(RaftClientRequest re
787788
}
788789
final LeaderNotReadyException lnre = new LeaderNotReadyException(getMemberId());
789790
final RaftClientReply reply = newExceptionReply(request, lnre);
790-
return RetryCacheImpl.failWithReply(reply, entry);
791+
return failWithReply(reply, entry, context);
791792
}
792793

793794
if (!request.isReadOnly() && isSteppingDown()) {
794795
final LeaderSteppingDownException lsde = new LeaderSteppingDownException(getMemberId() + " is stepping down");
795796
final RaftClientReply reply = newExceptionReply(request, lsde);
796-
return RetryCacheImpl.failWithReply(reply, entry);
797+
return failWithReply(reply, entry, context);
797798
}
798799

799800
return null;
@@ -819,11 +820,44 @@ void assertLifeCycleState(Set<LifeCycle.State> expected) throws ServerNotReadyEx
819820
getMemberId() + " is not in " + expected + ": current state is " + c), expected);
820821
}
821822

822-
private CompletableFuture<RaftClientReply> getResourceUnavailableReply(RaftClientRequest request, CacheEntry entry) {
823-
return entry.failWithException(new ResourceUnavailableException(
824-
getMemberId() + ": Failed to acquire a pending write request for " + request));
823+
private CompletableFuture<RaftClientReply> getResourceUnavailableReply(String op,
824+
RaftClientRequest request, CacheEntry entry, TransactionContextImpl context) {
825+
final ResourceUnavailableException e = new ResourceUnavailableException(getMemberId()
826+
+ ": Failed to " + op + " for " + request);
827+
cancelTransaction(context, e);
828+
return entry.failWithException(e);
829+
}
830+
831+
private CompletableFuture<RaftClientReply> failWithReply(
832+
RaftClientReply reply, CacheEntry entry, TransactionContextImpl context) {
833+
if (context != null) {
834+
cancelTransaction(context, reply.getException());
835+
}
836+
837+
if (entry == null) {
838+
return CompletableFuture.completedFuture(reply);
839+
}
840+
entry.failWithReply(reply);
841+
return entry.getReplyFuture();
842+
}
843+
844+
/** Cancel a transaction and notify the state machine. Set exception if provided to the transaction context. */
845+
private void cancelTransaction(TransactionContextImpl context, Exception exception) {
846+
if (context == null) {
847+
return;
825848
}
826849

850+
if (exception != null) {
851+
context.setException(exception);
852+
}
853+
854+
try {
855+
context.cancelTransaction();
856+
} catch (IOException ioe) {
857+
LOG.warn("{}: Failed to cancel transaction {}", getMemberId(), context, ioe);
858+
}
859+
}
860+
827861
/**
828862
* Handle a normal update request from client.
829863
*/
@@ -836,27 +870,28 @@ private CompletableFuture<RaftClientReply> appendTransaction(
836870

837871
final LeaderStateImpl unsyncedLeaderState = role.getLeaderState().orElse(null);
838872
if (unsyncedLeaderState == null) {
839-
final RaftClientReply reply = newExceptionReply(request, generateNotLeaderException());
840-
return RetryCacheImpl.failWithReply(reply, cacheEntry);
873+
final NotLeaderException nle = generateNotLeaderException();
874+
final RaftClientReply reply = newExceptionReply(request, nle);
875+
return failWithReply(reply, cacheEntry, context);
841876
}
842877
final PendingRequests.Permit unsyncedPermit = unsyncedLeaderState.tryAcquirePendingRequest(request.getMessage());
843878
if (unsyncedPermit == null) {
844-
return getResourceUnavailableReply(request, cacheEntry);
879+
return getResourceUnavailableReply("acquire a pending write request", request, cacheEntry, context);
845880
}
846881

847882
final LeaderStateImpl leaderState;
848883
final PendingRequest pending;
849884
synchronized (this) {
850-
final CompletableFuture<RaftClientReply> reply = checkLeaderState(request, cacheEntry);
885+
final CompletableFuture<RaftClientReply> reply = checkLeaderState(request, cacheEntry, context);
851886
if (reply != null) {
852887
return reply;
853888
}
854889

855890
leaderState = role.getLeaderStateNonNull();
856-
final PendingRequests.Permit permit = leaderState == unsyncedLeaderState? unsyncedPermit
891+
final PendingRequests.Permit permit = leaderState == unsyncedLeaderState ? unsyncedPermit
857892
: leaderState.tryAcquirePendingRequest(request.getMessage());
858893
if (permit == null) {
859-
return getResourceUnavailableReply(request, cacheEntry);
894+
return getResourceUnavailableReply("acquire a pending write request", request, cacheEntry, context);
860895
}
861896

862897
// append the message to its local log
@@ -866,20 +901,18 @@ private CompletableFuture<RaftClientReply> appendTransaction(
866901
} catch (StateMachineException e) {
867902
// the StateMachineException is thrown by the SM in the preAppend stage.
868903
// Return the exception in a RaftClientReply.
869-
RaftClientReply exceptionReply = newExceptionReply(request, e);
870-
cacheEntry.failWithReply(exceptionReply);
904+
final RaftClientReply exceptionReply = newExceptionReply(request, e);
871905
// leader will step down here
872906
if (e.leaderShouldStepDown() && getInfo().isLeader()) {
873907
leaderState.submitStepDownEvent(StepDownReason.STATE_MACHINE_EXCEPTION);
874908
}
875-
return CompletableFuture.completedFuture(exceptionReply);
909+
return failWithReply(exceptionReply, cacheEntry, null);
876910
}
877911

878912
// put the request into the pending queue
879913
pending = leaderState.addPendingRequest(permit, request, context);
880914
if (pending == null) {
881-
return cacheEntry.failWithException(new ResourceUnavailableException(
882-
getMemberId() + ": Failed to add a pending write request for " + request));
915+
return getResourceUnavailableReply("add a pending write request", request, cacheEntry, context);
883916
}
884917
}
885918
leaderState.notifySenders();
@@ -1011,19 +1044,23 @@ private CompletableFuture<RaftClientReply> writeAsyncImpl(RaftClientRequest requ
10111044
// return the cached future.
10121045
return cacheEntry.getReplyFuture();
10131046
}
1014-
// TODO: this client request will not be added to pending requests until
1015-
// later which means that any failure in between will leave partial state in
1016-
// the state machine. We should call cancelTransaction() for failed requests
1047+
// This request will be added to pending requests later in appendTransaction.
1048+
// Any failure in between must invoke cancelTransaction.
10171049
final TransactionContextImpl context = (TransactionContextImpl) stateMachine.startTransaction(
10181050
filterDataStreamRaftClientRequest(request));
10191051
if (context.getException() != null) {
1020-
final StateMachineException e = new StateMachineException(getMemberId(), context.getException());
1052+
final Exception exception = context.getException();
1053+
final StateMachineException e = new StateMachineException(getMemberId(), exception);
10211054
final RaftClientReply exceptionReply = newExceptionReply(request, e);
1022-
cacheEntry.failWithReply(exceptionReply);
1023-
return CompletableFuture.completedFuture(exceptionReply);
1055+
return failWithReply(exceptionReply, cacheEntry, context);
10241056
}
10251057

1026-
return appendTransaction(request, context, cacheEntry);
1058+
try {
1059+
return appendTransaction(request, context, cacheEntry);
1060+
} catch (Exception e) {
1061+
cancelTransaction(context, e);
1062+
throw e;
1063+
}
10271064
}
10281065

10291066
private CompletableFuture<RaftClientReply> watchAsync(RaftClientRequest request) {

ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -257,14 +257,4 @@ public synchronized void close() {
257257
cache.invalidateAll();
258258
statistics.set(null);
259259
}
260-
261-
static CompletableFuture<RaftClientReply> failWithReply(
262-
RaftClientReply reply, CacheEntry entry) {
263-
if (entry != null) {
264-
entry.failWithReply(reply);
265-
return entry.getReplyFuture();
266-
} else {
267-
return CompletableFuture.completedFuture(reply);
268-
}
269-
}
270260
}

ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,8 +191,6 @@ public TransactionContext preAppendTransaction() throws IOException {
191191

192192
@Override
193193
public TransactionContext cancelTransaction() throws IOException {
194-
// TODO: This is not called from Raft server / log yet. When an IOException happens, we should
195-
// call this to let the SM know that Transaction cannot be synced
196194
return stateMachine.cancelTransaction(this);
197195
}
198196
}

ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.io.IOException;
4141
import java.util.Objects;
4242
import java.util.concurrent.CompletableFuture;
43+
import java.util.concurrent.atomic.AtomicInteger;
4344

4445
import static org.junit.jupiter.api.Assertions.fail;
4546

@@ -52,6 +53,7 @@ public abstract class RaftStateMachineExceptionTests<CLUSTER extends MiniRaftClu
5253
}
5354

5455
private static volatile boolean failPreAppend = false;
56+
private static final AtomicInteger numCancelTransaction = new AtomicInteger();
5557

5658
protected static class StateMachineWithException extends
5759
SimpleStateMachine4Testing {
@@ -72,6 +74,12 @@ public TransactionContext preAppendTransaction(TransactionContext trx)
7274
return trx;
7375
}
7476
}
77+
78+
@Override
79+
public TransactionContext cancelTransaction(TransactionContext trx) throws IOException {
80+
numCancelTransaction.incrementAndGet();
81+
return super.cancelTransaction(trx);
82+
}
7583
}
7684

7785
{
@@ -179,4 +187,31 @@ private void runTestRetryOnExceptionDuringReplication(CLUSTER cluster) throws Ex
179187
failPreAppend = false;
180188
}
181189
}
190+
191+
@Test
192+
public void testNoCancelTransactionOnPreAppendFailure() throws Exception {
193+
runWithNewCluster(3, this::runTestNoCancelTransactionOnPreAppendFailure);
194+
}
195+
196+
private void runTestNoCancelTransactionOnPreAppendFailure(CLUSTER cluster) throws Exception {
197+
final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
198+
failPreAppend = true;
199+
numCancelTransaction.set(0);
200+
try (final RaftClient client = cluster.createClient(leaderId)) {
201+
try {
202+
client.io().send(new SimpleMessage("cancel-transaction"));
203+
fail("Exception expected");
204+
} catch (StateMachineException e) {
205+
Assertions.assertTrue(e.getCause().getMessage().contains("Fake Exception in preAppend"));
206+
}
207+
208+
JavaUtils.attemptRepeatedly(() -> {
209+
Assertions.assertEquals(0, numCancelTransaction.get(),
210+
() -> "Expected cancelTransaction() not to be called but got " + numCancelTransaction.get());
211+
return null;
212+
}, 10, ONE_SECOND, "wait for cancelTransaction", LOG);
213+
} finally {
214+
failPreAppend = false;
215+
}
216+
}
182217
}

0 commit comments

Comments
 (0)