From 88c76797c8c9e4d21d1ad90574ddaca66ffcea08 Mon Sep 17 00:00:00 2001 From: Mikhail Petrov Date: Thu, 27 Nov 2025 13:26:16 +0300 Subject: [PATCH 1/3] IGNITE-27196 Added authorization of Compute Job cancellation --- .../common/ComputeTaskPermissionsTest.java | 151 ++++++++++++++++-- .../ignite/internal/GridJobSessionImpl.java | 2 +- .../processors/job/GridJobProcessor.java | 139 +++++++++++----- .../processors/task/GridTaskProcessor.java | 2 +- .../processors/task/GridTaskWorker.java | 13 +- 5 files changed, 236 insertions(+), 71 deletions(-) diff --git a/modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskPermissionsTest.java b/modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskPermissionsTest.java index 390478bef794b..e2385a01694fe 100644 --- a/modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskPermissionsTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskPermissionsTest.java @@ -40,8 +40,11 @@ import org.apache.ignite.compute.ComputeJob; import org.apache.ignite.compute.ComputeJobAdapter; import org.apache.ignite.compute.ComputeJobResult; +import org.apache.ignite.compute.ComputeJobResultPolicy; +import org.apache.ignite.compute.ComputeJobSibling; import org.apache.ignite.compute.ComputeTask; import org.apache.ignite.compute.ComputeTaskAdapter; +import org.apache.ignite.compute.ComputeTaskFuture; import org.apache.ignite.configuration.ClientConfiguration; import org.apache.ignite.configuration.ClientConnectorConfiguration; import org.apache.ignite.configuration.ConnectorConfiguration; @@ -50,6 +53,7 @@ import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.management.cache.VerifyBackupPartitionsTask; import org.apache.ignite.internal.processors.security.AbstractSecurityTest; +import org.apache.ignite.internal.processors.security.AbstractTestSecurityPluginProvider; import org.apache.ignite.internal.processors.security.OperationSecurityContext; import org.apache.ignite.internal.processors.security.PublicAccessJob; import org.apache.ignite.internal.processors.security.SecurityContext; @@ -71,6 +75,8 @@ import org.apache.ignite.plugin.security.SecurityException; import org.apache.ignite.plugin.security.SecurityPermissionSet; import org.apache.ignite.plugin.security.SecurityPermissionSetBuilder; +import org.apache.ignite.testframework.ListeningTestLogger; +import org.apache.ignite.testframework.LogListener; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.junit.Test; @@ -92,6 +98,7 @@ import static org.apache.ignite.plugin.security.SecurityPermissionSetBuilder.NO_PERMISSIONS; import static org.apache.ignite.plugin.security.SecurityPermissionSetBuilder.create; import static org.apache.ignite.plugin.security.SecuritySubjectType.REMOTE_CLIENT; +import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause; import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause; import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; @@ -111,10 +118,10 @@ public class ComputeTaskPermissionsTest extends AbstractSecurityTest { private static final ComputeTask SYSTEM_TASK = new VerifyBackupPartitionsTask(); /** */ - private static final AtomicInteger EXECUTED_TASK_CNTR = new AtomicInteger(); + private static final AtomicInteger EXECUTED_JOB_CNT = new AtomicInteger(); /** */ - private static final AtomicInteger CANCELLED_TASK_CNTR = new AtomicInteger(); + private static final AtomicInteger CANCELLED_JOB_CNT = new AtomicInteger(); /** */ private static final String CACHE = DEFAULT_CACHE_NAME; @@ -128,6 +135,9 @@ public class ComputeTaskPermissionsTest extends AbstractSecurityTest { /** */ public static CountDownLatch taskUnblockedLatch; + /** */ + private static ListeningTestLogger listeningLog; + /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { super.beforeTestsStarted(); @@ -138,6 +148,8 @@ public class ComputeTaskPermissionsTest extends AbstractSecurityTest { PublicAccessSystemJob.class ); + listeningLog = new ListeningTestLogger(log); + for (int idx = 0; idx < SRV_NODES_CNT; idx++) startGrid(idx, false); @@ -151,6 +163,15 @@ public class ComputeTaskPermissionsTest extends AbstractSecurityTest { grid(0).createCache(CACHE); } + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration( + String instanceName, + AbstractTestSecurityPluginProvider pluginProv + ) throws Exception { + return super.getConfiguration(instanceName, pluginProv) + .setGridLogger(listeningLog); + } + /** */ private IgniteEx startGrid(int idx, boolean isClient) throws Exception { String login = getTestIgniteInstanceName(idx); @@ -373,6 +394,92 @@ private void checkTaskStart(int initiator, int executor) { checkCallable(c -> executorService(initiator, executor).invokeAny(singletonList(c), getTestTimeout(), MILLISECONDS)); } + /** */ + @Test + public void testJobCancelAuthorizationSucceeded() throws Exception { + taskStartedLatch = new CountDownLatch(SRV_NODES_CNT); + taskUnblockedLatch = new CountDownLatch(1); + + CANCELLED_JOB_CNT.set(0); + EXECUTED_JOB_CNT.set(0); + + try { + ComputeTaskFuture fut = grid(0).compute().executeAsync(CancelAllowedTask.class, null); + + taskStartedLatch.await(getTestTimeout(), MILLISECONDS); + + for (ComputeJobSibling sibling : fut.getTaskSession().getJobSiblings()) + sibling.cancel(); + + fut.get(getTestTimeout()); + + assertTrue(waitForCondition(() -> SRV_NODES_CNT == CANCELLED_JOB_CNT.get(), getTestTimeout())); + assertEquals(0, EXECUTED_JOB_CNT.get()); + } + finally { + taskUnblockedLatch.countDown(); + } + } + + /** */ + @Test + public void testJobCancelAuthorizationFailed() throws Exception { + taskStartedLatch = new CountDownLatch(SRV_NODES_CNT); + taskUnblockedLatch = new CountDownLatch(1); + + CANCELLED_JOB_CNT.set(0); + EXECUTED_JOB_CNT.set(0); + + try { + ComputeTaskFuture fut = grid(0).compute().executeAsync(CancelForbiddenTask.class, null); + + taskStartedLatch.await(getTestTimeout(), MILLISECONDS); + + for (ComputeJobSibling sibling : fut.getTaskSession().getJobSiblings()) { + LogListener logLsnr = LogListener.matches("Failed to cancel Ignite Compute Task Job" + + " [sesId=" + fut.getTaskSession().getId() + + ", jobId=" + sibling.getJobId() + ']' + ).build(); + + listeningLog.registerListener(logLsnr); + + // TODO https://issues.apache.org/jira/browse/IGNITE-27195 Authorization errors during Compute Job + // cancellation do not propagate from remote nodes back to the one that initiated cancellation. + if (grid(0).context().job().activeJob(sibling.getJobId()) != null) { + assertThrowsAnyCause( + log, + () -> { + sibling.cancel(); + + return null; + }, + SecurityException.class, + "Authorization failed" + ); + } + else + sibling.cancel(); + + logLsnr.check(getTestTimeout()); + } + + assertEquals(0, EXECUTED_JOB_CNT.get()); + assertEquals(0, CANCELLED_JOB_CNT.get()); + + assertFalse(fut.isDone()); + + taskUnblockedLatch.countDown(); + + fut.get(getTestTimeout()); + + assertTrue(waitForCondition(() -> SRV_NODES_CNT == EXECUTED_JOB_CNT.get(), getTestTimeout())); + assertEquals(0, CANCELLED_JOB_CNT.get()); + } + finally { + taskUnblockedLatch.countDown(); + } + } + /** */ @Test public void testSystemTaskCancel() throws Exception { @@ -420,8 +527,8 @@ private void checkTaskCancel( taskStartedLatch = new CountDownLatch(expTaskCnt); taskUnblockedLatch = new CountDownLatch(1); - CANCELLED_TASK_CNTR.set(0); - EXECUTED_TASK_CNTR.set(0); + CANCELLED_JOB_CNT.set(0); + EXECUTED_JOB_CNT.set(0); try { Future fut = taskStarter.get(); @@ -438,9 +545,9 @@ private void checkTaskCancel( assertTrue(fut.isCancelled()); - assertTrue(waitForCondition(() -> expTaskCnt == CANCELLED_TASK_CNTR.get(), getTestTimeout())); + assertTrue(waitForCondition(() -> expTaskCnt == CANCELLED_JOB_CNT.get(), getTestTimeout())); - assertEquals(0, EXECUTED_TASK_CNTR.get()); + assertEquals(0, EXECUTED_JOB_CNT.get()); } else { assertThrowsWithCause(() -> fut.cancel(true), expE); @@ -449,7 +556,7 @@ private void checkTaskCancel( taskUnblockedLatch.countDown(); - assertTrue(waitForCondition(() -> expTaskCnt == EXECUTED_TASK_CNTR.get(), getTestTimeout())); + assertTrue(waitForCondition(() -> expTaskCnt == EXECUTED_JOB_CNT.get(), getTestTimeout())); } } } @@ -516,16 +623,16 @@ private void checkTask(Class cls, BiConsumerX consumer) { /** */ private void assertCompleted(RunnableX r, int expCnt) { - EXECUTED_TASK_CNTR.set(0); + EXECUTED_JOB_CNT.set(0); r.run(); - assertEquals(expCnt, EXECUTED_TASK_CNTR.get()); + assertEquals(expCnt, EXECUTED_JOB_CNT.get()); } /** */ private void assertFailed(RunnableX r) { - EXECUTED_TASK_CNTR.set(0); + EXECUTED_JOB_CNT.set(0); try { r.run(); @@ -547,7 +654,7 @@ private void assertFailed(RunnableX r) { fail(); - assertEquals(0, EXECUTED_TASK_CNTR.get()); + assertEquals(0, EXECUTED_JOB_CNT.get()); } /** */ @@ -629,7 +736,7 @@ private static class PublicAccessSystemTask extends AbstractTask { private abstract static class AbstractRunnable implements IgniteRunnable { /** {@inheritDoc} */ @Override public void run() { - EXECUTED_TASK_CNTR.incrementAndGet(); + EXECUTED_JOB_CNT.incrementAndGet(); } } @@ -637,7 +744,7 @@ private abstract static class AbstractRunnable implements IgniteRunnable { private abstract static class AbstractCallable implements IgniteCallable { /** {@inheritDoc} */ @Override public AtomicInteger call() throws Exception { - EXECUTED_TASK_CNTR.incrementAndGet(); + EXECUTED_JOB_CNT.incrementAndGet(); return new AtomicInteger(0); } @@ -647,7 +754,7 @@ private abstract static class AbstractCallable implements IgniteCallable { /** {@inheritDoc} */ @Override public Boolean apply(Boolean o) { - EXECUTED_TASK_CNTR.incrementAndGet(); + EXECUTED_JOB_CNT.incrementAndGet(); return null; } @@ -681,9 +788,19 @@ private abstract static class AbstractTask extends ComputeTaskAdapter results) throws IgniteException { + for (ComputeJobResult res : results) { + if (!res.isCancelled() && res.getException() != null) + throw res.getException(); + } + return null; } + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List rcvd) throws IgniteException { + return ComputeJobResultPolicy.WAIT; + } + /** */ protected ComputeJob job() { return new TestJob(); @@ -707,7 +824,7 @@ private static class TestJob implements ComputeJob { /** {@inheritDoc} */ @Override public Object execute() { - EXECUTED_TASK_CNTR.incrementAndGet(); + EXECUTED_JOB_CNT.incrementAndGet(); return null; } @@ -725,14 +842,14 @@ private static class HangingJob extends ComputeJobAdapter { taskUnblockedLatch.await(5_000, MILLISECONDS); } catch (InterruptedException e) { - CANCELLED_TASK_CNTR.incrementAndGet(); + CANCELLED_JOB_CNT.incrementAndGet(); Thread.currentThread().interrupt(); throw new IgniteException(e); } - EXECUTED_TASK_CNTR.incrementAndGet(); + EXECUTED_JOB_CNT.incrementAndGet(); return null; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSessionImpl.java index 5d98789420ce8..219f424f3c59d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSessionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSessionImpl.java @@ -73,7 +73,7 @@ public GridDeployment deployment() { } /** {@inheritDoc} */ - @Override public GridTaskSessionInternal session() { + @Override public GridTaskSessionImpl session() { return ses; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java index 4ae62e2d34342..f8dfea75081d5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java @@ -23,6 +23,7 @@ import java.util.Iterator; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -41,6 +42,7 @@ import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.compute.ComputeExecutionRejectedException; +import org.apache.ignite.compute.ComputeJob; import org.apache.ignite.compute.ComputeJobSibling; import org.apache.ignite.compute.ComputeTaskSession; import org.apache.ignite.events.DiscoveryEvent; @@ -57,6 +59,7 @@ import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.GridTaskSessionImpl; import org.apache.ignite.internal.GridTaskSessionRequest; +import org.apache.ignite.internal.PlatformSecurityAwareJob; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.collision.GridCollisionJobContextAdapter; import org.apache.ignite.internal.managers.collision.GridCollisionManager; @@ -119,6 +122,10 @@ import static org.apache.ignite.internal.processors.metric.GridMetricManager.SYS_METRICS; import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName; import static org.apache.ignite.internal.processors.security.SecurityUtils.securitySubjectId; +import static org.apache.ignite.internal.processors.security.SecurityUtils.unwrap; +import static org.apache.ignite.internal.processors.task.GridTaskProcessor.resolveTaskClass; +import static org.apache.ignite.plugin.security.SecurityPermission.ADMIN_KILL; +import static org.apache.ignite.plugin.security.SecurityPermission.TASK_CANCEL; import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q; /** @@ -787,51 +794,28 @@ public void cancelJob(@Nullable final IgniteUuid sesId, @Nullable final IgniteUu return; } - // Put either job ID or session ID (they are unique). - cancelReqs.putIfAbsent(jobId != null ? jobId : sesId, sys); - - Predicate idsMatch = idMatch(sesId, jobId); - - // If we don't have jobId then we have to iterate - if (jobId == null) { - if (!jobAlwaysActivate) { - for (GridJobWorker job : passiveJobs.values()) { - if (idsMatch.test(job)) - cancelPassiveJob(job); - } - } - - for (GridJobWorker job : activeJobs.values()) { - if (idsMatch.test(job)) - cancelActiveJob(job, sys); - } - - for (GridJobWorker job : syncRunningJobs.values()) { - if (idsMatch.test(job)) - cancelJob(job, sys); - } - } - else { - if (!jobAlwaysActivate) { - GridJobWorker passiveJob = passiveJobs.get(jobId); - - if (passiveJob != null && idsMatch.test(passiveJob) && cancelPassiveJob(passiveJob)) - return; - } + GridJobWorker jobWorker = findJobWorker(sesId, jobId); - GridJobWorker activeJob = activeJobs.get(jobId); + if (jobWorker != null && !sys) + authorizeJobCancel(jobWorker); - if (activeJob != null && idsMatch.test(activeJob)) { - cancelActiveJob(activeJob, sys); + // Put either job ID or session ID (they are unique). + cancelReqs.putIfAbsent(jobId != null ? jobId : sesId, sys); - return; - } + if (jobWorker == null) + return; - activeJob = syncRunningJobs.get(jobId); + if (!jobAlwaysActivate && passiveJobs.containsKey(jobWorker.getJobId())) + cancelPassiveJob(jobWorker); + else if (activeJobs.containsKey(jobWorker.getJobId())) + cancelActiveJob(jobWorker, sys); + else if (syncRunningJobs.containsKey(jobWorker.getJobId())) + cancelJob(jobWorker, sys); + } + catch (Exception e) { + U.error(log, "Failed to cancel Ignite Compute Task Job [sesId=" + sesId + ", jobId=" + jobId + ']', e); - if (activeJob != null && idsMatch.test(activeJob)) - cancelJob(activeJob, sys); - } + throw e; } finally { rwLock.readUnlock(); @@ -1711,7 +1695,6 @@ else if (ctx.localNodeId().equals(sndNode.id())) * @param nodeId Node ID. * @param req Request. */ - @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter"}) private void processTaskSessionRequest(UUID nodeId, GridTaskSessionRequest req) { if (!rwLock.tryReadLock()) { if (log.isDebugEnabled()) @@ -2424,6 +2407,80 @@ public Map jobStatuses(IgniteUuid sesId) { .collect(groupingBy(GridJobWorker::status, counting())); } + /** */ + private void authorizeJobCancel(GridJobWorker jobWorker) { + if (!ctx.security().enabled()) + return; + + GridTaskSessionImpl taskSes = jobWorker.getSession().session(); + + Class taskCls = resolveTaskClass(taskSes.getTaskName(), null, null); + + if (taskCls == null || !ctx.security().isSystemType(taskCls)) + ctx.security().authorize(taskSes.getTaskName(), TASK_CANCEL); + else { + authorizeJobCancel( + jobWorker.getJob(), + Objects.equals(taskSes.initiatorSecurityContext().subject().id(), ctx.security().securityContext().subject().id()) + ); + } + } + + /** */ + public void authorizeJobCancel(ComputeJob job, boolean isCanceledByInitiator) { + Object executable = unwrap(job); + + if (!ctx.security().isSystemType(executable.getClass())) + ctx.security().authorize(executable.getClass().getName(), TASK_CANCEL); + else if (executable instanceof PlatformSecurityAwareJob) + ctx.security().authorize(((PlatformSecurityAwareJob)executable).name(), TASK_CANCEL); + else if (!isCanceledByInitiator) + ctx.security().authorize(ADMIN_KILL); + } + + /** */ + private GridJobWorker findJobWorker(@Nullable final IgniteUuid sesId, @Nullable final IgniteUuid jobId) { + Predicate pred = idMatch(sesId, jobId); + + GridJobWorker res = findJobWorker(passiveJobs, jobId, pred); + + if (res == null) { + res = findJobWorker(activeJobs, jobId, pred); + + if (res == null) + res = findJobWorker(syncRunningJobs, jobId, pred); + } + + return res; + } + + /** */ + private GridJobWorker findJobWorker( + @Nullable Map workers, + @Nullable IgniteUuid jobId, + Predicate pred + ) { + if (workers == null) + return null; + + if (jobId == null) { + for (GridJobWorker jobWorker : workers.values()) { + if (jobWorker != null && pred.test(jobWorker)) + return jobWorker; + } + + return null; + } + else { + GridJobWorker jobWorker = workers.get(jobId); + + if (jobWorker != null && pred.test(jobWorker)) + return jobWorker; + } + + return null; + } + /** * @param sesId Task session ID. * @param jobId Job ID. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java index 54775dcb63a54..ebcfc3ad5eef2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java @@ -1599,7 +1599,7 @@ private void authorizeUserTask( * execution by its name, and corresponding to this name task class was not found by the default classloader on * the local node. */ - private Class resolveTaskClass(@Nullable String taskName, @Nullable Class taskCls, @Nullable ComputeTask task) { + public static Class resolveTaskClass(@Nullable String taskName, @Nullable Class taskCls, @Nullable ComputeTask task) { if (taskCls != null) return taskCls; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java index 0d0de757abd46..16c91b86edf53 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java @@ -114,7 +114,6 @@ import static org.apache.ignite.internal.processors.security.SecurityUtils.authorizeAll; import static org.apache.ignite.internal.processors.security.SecurityUtils.unwrap; import static org.apache.ignite.internal.util.lang.ClusterNodeFunc.node2id; -import static org.apache.ignite.plugin.security.SecurityPermission.ADMIN_KILL; import static org.apache.ignite.plugin.security.SecurityPermission.TASK_CANCEL; import static org.apache.ignite.plugin.security.SecurityPermission.TASK_EXECUTE; @@ -1780,16 +1779,8 @@ private void authorizeTaskCancel() { ses.initiatorSecurityContext().subject().id(), ctx.security().securityContext().subject().id()); - for (GridJobResultImpl jobRes : jobRes.values()) { - Object executable = unwrap(jobRes.getJob()); - - if (!ctx.security().isSystemType(executable.getClass())) - ctx.security().authorize(executable.getClass().getName(), TASK_CANCEL); - else if (executable instanceof PlatformSecurityAwareJob) - ctx.security().authorize(((PlatformSecurityAwareJob)executable).name(), TASK_CANCEL); - else if (!isClosedByInitiator) - ctx.security().authorize(ADMIN_KILL); - } + for (GridJobResultImpl jobRes : jobRes.values()) + ctx.job().authorizeJobCancel(jobRes.getJob(), isClosedByInitiator); } } } From aeca3cd5d23c9e15cbe278bf88f5961f2b354534 Mon Sep 17 00:00:00 2001 From: Mikhail Petrov Date: Sat, 29 Nov 2025 11:19:06 +0300 Subject: [PATCH 2/3] IGNITE-27196 --- .../processors/job/GridJobProcessor.java | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java index f8dfea75081d5..956396c2e3967 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java @@ -805,11 +805,7 @@ public void cancelJob(@Nullable final IgniteUuid sesId, @Nullable final IgniteUu if (jobWorker == null) return; - if (!jobAlwaysActivate && passiveJobs.containsKey(jobWorker.getJobId())) - cancelPassiveJob(jobWorker); - else if (activeJobs.containsKey(jobWorker.getJobId())) - cancelActiveJob(jobWorker, sys); - else if (syncRunningJobs.containsKey(jobWorker.getJobId())) + if (!cancelPassiveJob(jobWorker) && !cancelActiveJob(jobWorker, sys)) cancelJob(jobWorker, sys); } catch (Exception e) { @@ -829,9 +825,7 @@ else if (syncRunningJobs.containsKey(jobWorker.getJobId())) * @return {@code True} if succeeded. */ private boolean cancelPassiveJob(GridJobWorker job) { - assert !jobAlwaysActivate; - - if (removeFromPassive(job)) { + if (!jobAlwaysActivate && removeFromPassive(job)) { if (log.isDebugEnabled()) log.debug("Job has been cancelled before activation: " + job); @@ -851,7 +845,7 @@ private boolean cancelPassiveJob(GridJobWorker job) { * @param job Job to cancel. * @param sys Flag indicating whether this is a system cancel. */ - private void cancelActiveJob(GridJobWorker job, boolean sys) { + private boolean cancelActiveJob(GridJobWorker job, boolean sys) { if (removeFromActive(job)) { cancelledJobs.put(job.getJobId(), job); @@ -861,7 +855,11 @@ private void cancelActiveJob(GridJobWorker job, boolean sys) { else // No reply, since it is not cancel from collision. cancelJob(job, sys); + + return true; } + + return false; } /** From c9c270de257c68e48175cf313ed834f099271950 Mon Sep 17 00:00:00 2001 From: Mikhail Petrov Date: Tue, 2 Dec 2025 10:53:58 +0300 Subject: [PATCH 3/3] IGNITE-27196 --- .../common/ComputeTaskPermissionsTest.java | 139 ++++++++---------- 1 file changed, 62 insertions(+), 77 deletions(-) diff --git a/modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskPermissionsTest.java b/modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskPermissionsTest.java index e2385a01694fe..02321a705a2d8 100644 --- a/modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskPermissionsTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/common/ComputeTaskPermissionsTest.java @@ -403,22 +403,17 @@ public void testJobCancelAuthorizationSucceeded() throws Exception { CANCELLED_JOB_CNT.set(0); EXECUTED_JOB_CNT.set(0); - try { - ComputeTaskFuture fut = grid(0).compute().executeAsync(CancelAllowedTask.class, null); + ComputeTaskFuture fut = grid(0).compute().executeAsync(CancelAllowedTask.class, null); - taskStartedLatch.await(getTestTimeout(), MILLISECONDS); + taskStartedLatch.await(getTestTimeout(), MILLISECONDS); - for (ComputeJobSibling sibling : fut.getTaskSession().getJobSiblings()) - sibling.cancel(); + for (ComputeJobSibling sibling : fut.getTaskSession().getJobSiblings()) + sibling.cancel(); - fut.get(getTestTimeout()); + fut.get(getTestTimeout()); - assertTrue(waitForCondition(() -> SRV_NODES_CNT == CANCELLED_JOB_CNT.get(), getTestTimeout())); - assertEquals(0, EXECUTED_JOB_CNT.get()); - } - finally { - taskUnblockedLatch.countDown(); - } + assertTrue(waitForCondition(() -> SRV_NODES_CNT == CANCELLED_JOB_CNT.get(), getTestTimeout())); + assertEquals(0, EXECUTED_JOB_CNT.get()); } /** */ @@ -430,54 +425,49 @@ public void testJobCancelAuthorizationFailed() throws Exception { CANCELLED_JOB_CNT.set(0); EXECUTED_JOB_CNT.set(0); - try { - ComputeTaskFuture fut = grid(0).compute().executeAsync(CancelForbiddenTask.class, null); - - taskStartedLatch.await(getTestTimeout(), MILLISECONDS); - - for (ComputeJobSibling sibling : fut.getTaskSession().getJobSiblings()) { - LogListener logLsnr = LogListener.matches("Failed to cancel Ignite Compute Task Job" + - " [sesId=" + fut.getTaskSession().getId() + - ", jobId=" + sibling.getJobId() + ']' - ).build(); - - listeningLog.registerListener(logLsnr); - - // TODO https://issues.apache.org/jira/browse/IGNITE-27195 Authorization errors during Compute Job - // cancellation do not propagate from remote nodes back to the one that initiated cancellation. - if (grid(0).context().job().activeJob(sibling.getJobId()) != null) { - assertThrowsAnyCause( - log, - () -> { - sibling.cancel(); - - return null; - }, - SecurityException.class, - "Authorization failed" - ); - } - else - sibling.cancel(); - - logLsnr.check(getTestTimeout()); - } + ComputeTaskFuture fut = grid(0).compute().executeAsync(CancelForbiddenTask.class, null); - assertEquals(0, EXECUTED_JOB_CNT.get()); - assertEquals(0, CANCELLED_JOB_CNT.get()); + taskStartedLatch.await(getTestTimeout(), MILLISECONDS); - assertFalse(fut.isDone()); + for (ComputeJobSibling sibling : fut.getTaskSession().getJobSiblings()) { + LogListener logLsnr = LogListener.matches("Failed to cancel Ignite Compute Task Job" + + " [sesId=" + fut.getTaskSession().getId() + + ", jobId=" + sibling.getJobId() + ']' + ).build(); - taskUnblockedLatch.countDown(); + listeningLog.registerListener(logLsnr); - fut.get(getTestTimeout()); + // TODO https://issues.apache.org/jira/browse/IGNITE-27195 Authorization errors during Compute Job + // cancellation do not propagate from remote nodes back to the one that initiated cancellation. + if (grid(0).context().job().activeJob(sibling.getJobId()) != null) { + assertThrowsAnyCause( + log, + () -> { + sibling.cancel(); - assertTrue(waitForCondition(() -> SRV_NODES_CNT == EXECUTED_JOB_CNT.get(), getTestTimeout())); - assertEquals(0, CANCELLED_JOB_CNT.get()); - } - finally { - taskUnblockedLatch.countDown(); + return null; + }, + SecurityException.class, + "Authorization failed" + ); + } + else + sibling.cancel(); + + logLsnr.check(getTestTimeout()); } + + assertEquals(0, EXECUTED_JOB_CNT.get()); + assertEquals(0, CANCELLED_JOB_CNT.get()); + + assertFalse(fut.isDone()); + + taskUnblockedLatch.countDown(); + + fut.get(getTestTimeout()); + + assertTrue(waitForCondition(() -> SRV_NODES_CNT == EXECUTED_JOB_CNT.get(), getTestTimeout())); + assertEquals(0, CANCELLED_JOB_CNT.get()); } /** */ @@ -530,39 +520,34 @@ private void checkTaskCancel( CANCELLED_JOB_CNT.set(0); EXECUTED_JOB_CNT.set(0); - try { - Future fut = taskStarter.get(); + Future fut = taskStarter.get(); - assertTrue(taskStartedLatch.await(getTestTimeout(), MILLISECONDS)); + assertTrue(taskStartedLatch.await(getTestTimeout(), MILLISECONDS)); - try ( - OperationSecurityContext ignored = initiator == null - ? null - : grid(0).context().security().withContext(initiator) - ) { - if (expE == null) { - fut.cancel(true); + try ( + OperationSecurityContext ignored = initiator == null + ? null + : grid(0).context().security().withContext(initiator) + ) { + if (expE == null) { + fut.cancel(true); - assertTrue(fut.isCancelled()); + assertTrue(fut.isCancelled()); - assertTrue(waitForCondition(() -> expTaskCnt == CANCELLED_JOB_CNT.get(), getTestTimeout())); + assertTrue(waitForCondition(() -> expTaskCnt == CANCELLED_JOB_CNT.get(), getTestTimeout())); - assertEquals(0, EXECUTED_JOB_CNT.get()); - } - else { - assertThrowsWithCause(() -> fut.cancel(true), expE); + assertEquals(0, EXECUTED_JOB_CNT.get()); + } + else { + assertThrowsWithCause(() -> fut.cancel(true), expE); - assertFalse(fut.isCancelled()); + assertFalse(fut.isCancelled()); - taskUnblockedLatch.countDown(); + taskUnblockedLatch.countDown(); - assertTrue(waitForCondition(() -> expTaskCnt == EXECUTED_JOB_CNT.get(), getTestTimeout())); - } + assertTrue(waitForCondition(() -> expTaskCnt == EXECUTED_JOB_CNT.get(), getTestTimeout())); } } - finally { - taskUnblockedLatch.countDown(); - } } /** */