Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,11 @@
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobAdapter;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeJobResultPolicy;
import org.apache.ignite.compute.ComputeJobSibling;
import org.apache.ignite.compute.ComputeTask;
import org.apache.ignite.compute.ComputeTaskAdapter;
import org.apache.ignite.compute.ComputeTaskFuture;
import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.configuration.ClientConnectorConfiguration;
import org.apache.ignite.configuration.ConnectorConfiguration;
Expand All @@ -50,6 +53,7 @@
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.management.cache.VerifyBackupPartitionsTask;
import org.apache.ignite.internal.processors.security.AbstractSecurityTest;
import org.apache.ignite.internal.processors.security.AbstractTestSecurityPluginProvider;
import org.apache.ignite.internal.processors.security.OperationSecurityContext;
import org.apache.ignite.internal.processors.security.PublicAccessJob;
import org.apache.ignite.internal.processors.security.SecurityContext;
Expand All @@ -71,6 +75,8 @@
import org.apache.ignite.plugin.security.SecurityException;
import org.apache.ignite.plugin.security.SecurityPermissionSet;
import org.apache.ignite.plugin.security.SecurityPermissionSetBuilder;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
Expand All @@ -92,6 +98,7 @@
import static org.apache.ignite.plugin.security.SecurityPermissionSetBuilder.NO_PERMISSIONS;
import static org.apache.ignite.plugin.security.SecurityPermissionSetBuilder.create;
import static org.apache.ignite.plugin.security.SecuritySubjectType.REMOTE_CLIENT;
import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;

Expand All @@ -111,10 +118,10 @@
private static final ComputeTask SYSTEM_TASK = new VerifyBackupPartitionsTask();

/** */
private static final AtomicInteger EXECUTED_TASK_CNTR = new AtomicInteger();
private static final AtomicInteger EXECUTED_JOB_CNT = new AtomicInteger();

/** */
private static final AtomicInteger CANCELLED_TASK_CNTR = new AtomicInteger();
private static final AtomicInteger CANCELLED_JOB_CNT = new AtomicInteger();

/** */
private static final String CACHE = DEFAULT_CACHE_NAME;
Expand All @@ -128,6 +135,9 @@
/** */
public static CountDownLatch taskUnblockedLatch;

/** */
private static ListeningTestLogger listeningLog;

/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
Expand All @@ -138,6 +148,8 @@
PublicAccessSystemJob.class
);

listeningLog = new ListeningTestLogger(log);

for (int idx = 0; idx < SRV_NODES_CNT; idx++)
startGrid(idx, false);

Expand All @@ -151,6 +163,15 @@
grid(0).createCache(CACHE);
}

/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(
String instanceName,
AbstractTestSecurityPluginProvider pluginProv
) throws Exception {
return super.getConfiguration(instanceName, pluginProv)
.setGridLogger(listeningLog);
}

/** */
private IgniteEx startGrid(int idx, boolean isClient) throws Exception {
String login = getTestIgniteInstanceName(idx);
Expand Down Expand Up @@ -373,6 +394,82 @@
checkCallable(c -> executorService(initiator, executor).invokeAny(singletonList(c), getTestTimeout(), MILLISECONDS));
}

/** */
@Test
public void testJobCancelAuthorizationSucceeded() throws Exception {
taskStartedLatch = new CountDownLatch(SRV_NODES_CNT);
taskUnblockedLatch = new CountDownLatch(1);

CANCELLED_JOB_CNT.set(0);
EXECUTED_JOB_CNT.set(0);

ComputeTaskFuture<Object> fut = grid(0).compute().executeAsync(CancelAllowedTask.class, null);

taskStartedLatch.await(getTestTimeout(), MILLISECONDS);

for (ComputeJobSibling sibling : fut.getTaskSession().getJobSiblings())
sibling.cancel();

fut.get(getTestTimeout());

assertTrue(waitForCondition(() -> SRV_NODES_CNT == CANCELLED_JOB_CNT.get(), getTestTimeout()));
assertEquals(0, EXECUTED_JOB_CNT.get());
}

/** */
@Test
public void testJobCancelAuthorizationFailed() throws Exception {
taskStartedLatch = new CountDownLatch(SRV_NODES_CNT);
taskUnblockedLatch = new CountDownLatch(1);

CANCELLED_JOB_CNT.set(0);
EXECUTED_JOB_CNT.set(0);

ComputeTaskFuture<Object> fut = grid(0).compute().executeAsync(CancelForbiddenTask.class, null);

taskStartedLatch.await(getTestTimeout(), MILLISECONDS);

for (ComputeJobSibling sibling : fut.getTaskSession().getJobSiblings()) {
LogListener logLsnr = LogListener.matches("Failed to cancel Ignite Compute Task Job" +
" [sesId=" + fut.getTaskSession().getId() +
", jobId=" + sibling.getJobId() + ']'
).build();

listeningLog.registerListener(logLsnr);

// TODO https://issues.apache.org/jira/browse/IGNITE-27195 Authorization errors during Compute Job

Check warning on line 440 in modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskPermissionsTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Complete the task associated to this TODO comment.

See more on https://sonarcloud.io/project/issues?id=apache_ignite&issues=AZrE7huyI3xKPb_0NMpA&open=AZrE7huyI3xKPb_0NMpA&pullRequest=12542
// cancellation do not propagate from remote nodes back to the one that initiated cancellation.
if (grid(0).context().job().activeJob(sibling.getJobId()) != null) {
assertThrowsAnyCause(
log,
() -> {
sibling.cancel();

return null;
},
SecurityException.class,
"Authorization failed"
);
}
else
sibling.cancel();

logLsnr.check(getTestTimeout());
}

assertEquals(0, EXECUTED_JOB_CNT.get());
assertEquals(0, CANCELLED_JOB_CNT.get());

assertFalse(fut.isDone());

taskUnblockedLatch.countDown();

fut.get(getTestTimeout());

assertTrue(waitForCondition(() -> SRV_NODES_CNT == EXECUTED_JOB_CNT.get(), getTestTimeout()));
assertEquals(0, CANCELLED_JOB_CNT.get());
}

/** */
@Test
public void testSystemTaskCancel() throws Exception {
Expand Down Expand Up @@ -420,42 +517,37 @@
taskStartedLatch = new CountDownLatch(expTaskCnt);
taskUnblockedLatch = new CountDownLatch(1);

CANCELLED_TASK_CNTR.set(0);
EXECUTED_TASK_CNTR.set(0);
CANCELLED_JOB_CNT.set(0);
EXECUTED_JOB_CNT.set(0);

try {
Future<?> fut = taskStarter.get();
Future<?> fut = taskStarter.get();

assertTrue(taskStartedLatch.await(getTestTimeout(), MILLISECONDS));
assertTrue(taskStartedLatch.await(getTestTimeout(), MILLISECONDS));

try (
OperationSecurityContext ignored = initiator == null
? null
: grid(0).context().security().withContext(initiator)
) {
if (expE == null) {
fut.cancel(true);
try (
OperationSecurityContext ignored = initiator == null
? null
: grid(0).context().security().withContext(initiator)
) {
if (expE == null) {
fut.cancel(true);

assertTrue(fut.isCancelled());
assertTrue(fut.isCancelled());

assertTrue(waitForCondition(() -> expTaskCnt == CANCELLED_TASK_CNTR.get(), getTestTimeout()));
assertTrue(waitForCondition(() -> expTaskCnt == CANCELLED_JOB_CNT.get(), getTestTimeout()));

assertEquals(0, EXECUTED_TASK_CNTR.get());
}
else {
assertThrowsWithCause(() -> fut.cancel(true), expE);
assertEquals(0, EXECUTED_JOB_CNT.get());
}
else {
assertThrowsWithCause(() -> fut.cancel(true), expE);

assertFalse(fut.isCancelled());
assertFalse(fut.isCancelled());

taskUnblockedLatch.countDown();
taskUnblockedLatch.countDown();

assertTrue(waitForCondition(() -> expTaskCnt == EXECUTED_TASK_CNTR.get(), getTestTimeout()));
}
assertTrue(waitForCondition(() -> expTaskCnt == EXECUTED_JOB_CNT.get(), getTestTimeout()));
}
}
finally {
taskUnblockedLatch.countDown();
}
}

/** */
Expand Down Expand Up @@ -516,16 +608,16 @@

/** */
private void assertCompleted(RunnableX r, int expCnt) {
EXECUTED_TASK_CNTR.set(0);
EXECUTED_JOB_CNT.set(0);

r.run();

assertEquals(expCnt, EXECUTED_TASK_CNTR.get());
assertEquals(expCnt, EXECUTED_JOB_CNT.get());
}

/** */
private void assertFailed(RunnableX r) {
EXECUTED_TASK_CNTR.set(0);
EXECUTED_JOB_CNT.set(0);

try {
r.run();
Expand All @@ -547,7 +639,7 @@

fail();

assertEquals(0, EXECUTED_TASK_CNTR.get());
assertEquals(0, EXECUTED_JOB_CNT.get());
}

/** */
Expand Down Expand Up @@ -629,15 +721,15 @@
private abstract static class AbstractRunnable implements IgniteRunnable {
/** {@inheritDoc} */
@Override public void run() {
EXECUTED_TASK_CNTR.incrementAndGet();
EXECUTED_JOB_CNT.incrementAndGet();
}
}

/** */
private abstract static class AbstractCallable implements IgniteCallable<AtomicInteger> {
/** {@inheritDoc} */
@Override public AtomicInteger call() throws Exception {
EXECUTED_TASK_CNTR.incrementAndGet();
EXECUTED_JOB_CNT.incrementAndGet();

return new AtomicInteger(0);
}
Expand All @@ -647,7 +739,7 @@
private abstract static class AbstractClosure implements IgniteClosure<Boolean, Boolean> {
/** {@inheritDoc} */
@Override public Boolean apply(Boolean o) {
EXECUTED_TASK_CNTR.incrementAndGet();
EXECUTED_JOB_CNT.incrementAndGet();

return null;
}
Expand Down Expand Up @@ -681,9 +773,19 @@

/** {@inheritDoc} */
@Override public @Nullable Object reduce(List<ComputeJobResult> results) throws IgniteException {
for (ComputeJobResult res : results) {
if (!res.isCancelled() && res.getException() != null)
throw res.getException();
}

return null;
}

/** {@inheritDoc} */
@Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteException {
return ComputeJobResultPolicy.WAIT;
}

/** */
protected ComputeJob job() {
return new TestJob();
Expand All @@ -707,7 +809,7 @@

/** {@inheritDoc} */
@Override public Object execute() {
EXECUTED_TASK_CNTR.incrementAndGet();
EXECUTED_JOB_CNT.incrementAndGet();

return null;
}
Expand All @@ -725,14 +827,14 @@
taskUnblockedLatch.await(5_000, MILLISECONDS);
}
catch (InterruptedException e) {
CANCELLED_TASK_CNTR.incrementAndGet();
CANCELLED_JOB_CNT.incrementAndGet();

Thread.currentThread().interrupt();

throw new IgniteException(e);
}

EXECUTED_TASK_CNTR.incrementAndGet();
EXECUTED_JOB_CNT.incrementAndGet();

return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public GridDeployment deployment() {
}

/** {@inheritDoc} */
@Override public GridTaskSessionInternal session() {
@Override public GridTaskSessionImpl session() {
return ses;
}

Expand Down
Loading
Loading