Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,9 @@ public static class Transactions {

/** Operation failed because the transaction is already finished due to an error. */
public static final int TX_ALREADY_FINISHED_WITH_EXCEPTION_ERR = TX_ERR_GROUP.registerErrorCode((short) 19);

/** Operation failed because the transaction is aborted due to a recovery. */
public static final int TX_ABORTED_DUE_TO_RECOVERY_ERR = TX_ERR_GROUP.registerErrorCode((short) 20);
}

/** Replicator error group. */
Expand Down
1 change: 1 addition & 0 deletions modules/platforms/cpp/ignite/common/error_codes.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ enum class code : underlying_t {
TX_DELAYED_ACK = 0x70011,
TX_KILLED = 0x70012,
TX_ALREADY_FINISHED_WITH_EXCEPTION = 0x70013,
TX_ABORTED_DUE_TO_RECOVERY = 0x70014,

// Replicator group. Group code: 8
REPLICA_COMMON = 0x80001,
Expand Down
1 change: 1 addition & 0 deletions modules/platforms/cpp/ignite/odbc/common_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ sql_state error_code_to_sql_state(error::code code) {
case error::code::TX_DELAYED_ACK:
case error::code::TX_KILLED:
case error::code::TX_ALREADY_FINISHED_WITH_EXCEPTION:
case error::code::TX_ABORTED_DUE_TO_RECOVERY:
return sql_state::S25000_INVALID_TRANSACTION_STATE;

// Replicator group. Group code: 8
Expand Down
3 changes: 3 additions & 0 deletions modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,9 @@ public static class Transactions

/// <summary> TxAlreadyFinishedWithException error. </summary>
public const int TxAlreadyFinishedWithException = (GroupCode << 16) | (19 & 0xFFFF);

/// <summary> TxAbortedDueToRecovery error. </summary>
public const int TxAbortedDueToRecovery = (GroupCode << 16) | (20 & 0xFFFF);
}

/// <summary> Replicator errors. </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCode;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_MISS_ERR;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
Expand All @@ -37,6 +39,7 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assumptions.assumeTrue;

import java.time.Instant;
import java.time.ZoneId;
Expand All @@ -51,17 +54,23 @@
import java.util.stream.IntStream;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.ignite.Ignite;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.catalog.commands.CatalogUtils;
import org.apache.ignite.internal.catalog.events.CatalogEvent;
import org.apache.ignite.internal.catalog.events.CreateTableEventParameters;
import org.apache.ignite.internal.client.tx.ClientLazyTransaction;
import org.apache.ignite.internal.event.EventListener;
import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
import org.apache.ignite.internal.sql.ColumnMetadataImpl;
import org.apache.ignite.internal.sql.ColumnMetadataImpl.ColumnOriginImpl;
import org.apache.ignite.internal.sql.engine.QueryCancelledException;
import org.apache.ignite.internal.sql.engine.exec.fsm.QueryInfo;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.internal.tx.message.TxMessageGroup;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.lang.CancelHandle;
import org.apache.ignite.lang.CancellationToken;
Expand All @@ -83,7 +92,9 @@
import org.apache.ignite.sql.Statement;
import org.apache.ignite.sql.Statement.StatementBuilder;
import org.apache.ignite.tx.Transaction;
import org.apache.ignite.tx.TransactionException;
import org.apache.ignite.tx.TransactionOptions;
import org.awaitility.Awaitility;
import org.hamcrest.Matcher;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -740,6 +751,175 @@ public void runtimeErrorInQueryCausesTransactionToFail(String query) {
"Transaction is already finished due to an error");
}

@Test
public void runtimeErrorReturnsSameTransactionErrorBeforeAndAfterRollbackCompletion() throws Exception {
sql("CREATE TABLE tst(id INTEGER PRIMARY KEY, val INTEGER)");

IgniteSql sql = igniteSql();

Transaction tx = igniteTx().begin();

// Enlist enough operations to make rollback non-trivial.
for (int i = 0; i < 100; i++) {
execute(tx, sql, "INSERT INTO tst VALUES (?, ?)", i, i);
}

UUID txId = txId(tx);

assertThrowsSqlException(
Sql.RUNTIME_ERR,
"Division by zero",
() -> execute(tx, sql, "SELECT val / 0 FROM tst WHERE id = ?", 0)
);

IgniteException[] immediateExceptions = new IgniteException[5];
for (int i = 0; i < immediateExceptions.length; i++) {
immediateExceptions[i] = (IgniteException) assertThrowsWithCause(
() -> executeForRead(sql, tx, "SELECT * FROM tst WHERE id = ?", 1),
IgniteException.class
);
}

if (tx instanceof InternalTransaction) {
assertNotNull(txId, "Expected transaction id for test transaction implementation");

Awaitility.await()
.atMost(5, TimeUnit.SECONDS)
.until(() -> {
TxStateMeta meta = txManager().stateMeta(txId);

return meta != null && TxState.isFinalState(meta.txState());
});
}

IgniteException abortedStateException = (IgniteException) assertThrowsWithCause(
() -> executeForRead(sql, tx, "SELECT * FROM tst WHERE id = ?", 1),
IgniteException.class
);

assertEquals(Transactions.TX_ALREADY_FINISHED_WITH_EXCEPTION_ERR, abortedStateException.code());
assertTrue(abortedStateException.getMessage().contains("Transaction is already finished due to an error"));

for (IgniteException immediateException : immediateExceptions) {
assertEquals(abortedStateException.code(), immediateException.code());
assertTrue(immediateException.getMessage().contains("Transaction is already finished due to an error"));
}
}

@Test
public void secondRequestDuringRollbackReturnsFinishedWithExceptionAndPreservesOriginalCause() {
sql("CREATE TABLE tst(id INTEGER PRIMARY KEY, val INTEGER)");
sql("INSERT INTO tst VALUES (0, 1)");

IgniteSql sql = igniteSql();

Transaction tx = igniteTx().begin();

List<IgniteImpl> clusterNodes = CLUSTER.runningNodes()
.map(node -> unwrapIgniteImpl(node))
.collect(toList());

CompletableFuture<Void> failingRequestStarted = new CompletableFuture<>();
CompletableFuture<Void> finishRequestBlocked = new CompletableFuture<>();
CompletableFuture<Void> releaseFinishRequest = new CompletableFuture<>();

for (IgniteImpl clusterNode : clusterNodes) {
// Install predicates in cluster
clusterNode.dropMessages((recipientConsistentId, msg) -> {
if (!failingRequestStarted.isDone()) {
return false;
}

if (msg.groupType() != TxMessageGroup.GROUP_TYPE
|| msg.messageType() != TxMessageGroup.TX_FINISH_REQUEST) {
return false;
}

finishRequestBlocked.complete(null);

return !releaseFinishRequest.isDone();
});
}

try {
CompletableFuture<IgniteException> failingRequestFut = IgniteTestUtils.runAsync(() -> {
failingRequestStarted.complete(null);

IgniteException ex = assertInstanceOf(
IgniteException.class,
assertThrowsWithCause(
() -> execute(tx, sql, "SELECT val / 0 FROM tst WHERE id = ?", 0),
IgniteException.class
)
);

assertTrue(hasCause(ex, "Division by zero", Throwable.class));
assertTrue(
ex.code() == Sql.RUNTIME_ERR || ex.code() == Transactions.TX_ALREADY_FINISHED_WITH_EXCEPTION_ERR,
"Unexpected code for a request that triggers rollback [code=" + ex.code() + ']'
);

return ex;
});

Awaitility.await()
.atMost(5, TimeUnit.SECONDS)
.until(finishRequestBlocked::isDone);

IgniteException parallelRequestException = assertInstanceOf(
IgniteException.class,
assertThrowsWithCause(
() -> executeForRead(sql, tx, "SELECT * FROM tst WHERE id = ?", 0),
IgniteException.class
)
);

assertEquals(Transactions.TX_ALREADY_FINISHED_WITH_EXCEPTION_ERR, parallelRequestException.code());
assertTrue(parallelRequestException.getMessage().contains("Transaction is already finished due to an error"));
assertTrue(
hasCause(parallelRequestException, "Division by zero", Throwable.class),
"Expected original rollback cause in user-visible exception chain"
);

releaseFinishRequest.complete(null);

IgniteException firstRequestException = await(failingRequestFut);

assertTrue(hasCause(firstRequestException, "Division by zero", Throwable.class));
} finally {
clusterNodes.forEach(IgniteImpl::stopDroppingMessages);
}
}

@Test
public void rollbackWithExceptionCauseIsPropagatedToSubsequentSqlRequest() {
sql("CREATE TABLE tst(id INTEGER PRIMARY KEY, val INTEGER)");
sql("INSERT INTO tst VALUES (?, ?)", 1, 1);

Transaction tx = igniteTx().begin();

assumeTrue(tx instanceof InternalTransaction, "InternalTransaction is required");

InternalTransaction internalTx = (InternalTransaction) tx;
String rollbackCauseMessage = "rollback-cause-primary-replica-changed";
TransactionException rollbackCause = new TransactionException(REPLICA_MISS_ERR, rollbackCauseMessage);

await(internalTx.rollbackWithExceptionAsync(rollbackCause));

IgniteException ex = assertInstanceOf(
IgniteException.class,
assertThrowsWithCause(
() -> executeForRead(igniteSql(), tx, "SELECT * FROM tst WHERE id = ?", 1),
IgniteException.class
)
);

assertEquals(Transactions.TX_ALREADY_FINISHED_WITH_EXCEPTION_ERR, ex.code());
assertTrue(ex.getMessage().contains("Transaction is already finished due to an error"));
assertTrue(hasCause(ex, TransactionException.class));
assertTrue(hasCause(ex, rollbackCauseMessage, Throwable.class), "Expected rollback cause message in user-visible exception chain");
}

@Test
public void testLockIsNotReleasedAfterTxRollback() {
IgniteSql sql = igniteSql();
Expand Down Expand Up @@ -1413,6 +1593,18 @@ protected ResultSet<SqlRow> executeForRead(IgniteSql sql, @Nullable Transaction

protected abstract ResultSet<SqlRow> executeForRead(IgniteSql sql, @Nullable Transaction tx, Statement statement, Object... args);

private static @Nullable UUID txId(Transaction tx) {
if (tx instanceof InternalTransaction) {
return ((InternalTransaction) tx).id();
}

if (tx instanceof ClientLazyTransaction) {
return ((ClientLazyTransaction) tx).startedTx().txId();
}

return null;
}

protected void checkSqlError(
int code,
String msg,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.ignite.internal.sql.engine;

import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_WITH_EXCEPTION_ERR;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -34,6 +35,7 @@
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.internal.hlc.HybridTimestampTracker;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.sql.engine.framework.NoOpTransaction;
import org.apache.ignite.internal.sql.engine.sql.IgniteSqlCommitTransaction;
import org.apache.ignite.internal.sql.engine.sql.IgniteSqlStartTransaction;
Expand All @@ -45,8 +47,11 @@
import org.apache.ignite.internal.sql.engine.tx.ScriptTransactionContext;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.internal.tx.TxStateMetaFinishing;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.lang.ErrorGroups.Sql;
import org.apache.ignite.tx.TransactionException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
Expand Down Expand Up @@ -231,6 +236,48 @@ public void testScriptTransactionWrapperTxInflightsInteraction() {
assertEquals(1, inflights.size());
}

@Test
public void testInflightTrackerUsesFinishedWithErrorClassificationForFinishingTx() {
NoOpTransaction tx = NoOpTransaction.readWrite("test-rw", false);
IgniteInternalException failure = new IgniteInternalException(321, "boom");
TxStateMeta finishingMeta = new TxStateMetaFinishing(null, null, false, null, failure, failure.code());

when(transactionInflights.track(tx.id())).thenReturn(false);
when(txManager.stateMeta(tx.id())).thenReturn(finishingMeta);

TransactionException ex = assertThrowsExactly(
TransactionException.class,
() -> new InflightTransactionalOperationTracker(transactionInflights, txManager).registerOperationStart(tx)
);

assertEquals(TX_ALREADY_FINISHED_WITH_EXCEPTION_ERR, ex.code());
assertEquals(failure, ex.getCause());
}

@Test
public void testExplicitSqlTransactionUsesFinishedWithErrorClassificationForFinishingTx() {
NoOpTransaction tx = NoOpTransaction.readWrite("test-rw", false);
IgniteInternalException failure = new IgniteInternalException(321, "boom");
TxStateMeta finishingMeta = new TxStateMetaFinishing(null, null, false, null, failure, failure.code());

when(txManager.stateMeta(tx.id())).thenReturn(finishingMeta);

QueryTransactionContext txCtx = new QueryTransactionContextImpl(
txManager,
observableTimeTracker,
tx,
new InflightTransactionalOperationTracker(transactionInflights, txManager)
);

TransactionException ex = assertThrowsExactly(
TransactionException.class,
() -> txCtx.getOrStartSqlManaged(false, false)
);

assertEquals(TX_ALREADY_FINISHED_WITH_EXCEPTION_ERR, ex.code());
assertEquals(failure, ex.getCause());
}

private void prepareTransactionsMocks() {
when(txManager.beginExplicit(any(), anyBoolean(), any())).thenAnswer(
inv -> {
Expand Down
Loading