Skip to content

Commit afcf61c

Browse files
fix(tasks): claim-promote satisfied conditional tasks
1 parent 87f3d7d commit afcf61c

3 files changed

Lines changed: 69 additions & 19 deletions

File tree

apps/rest-api/e2e/judge-eval-attempt.e2e.test.ts

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,9 @@ describe('judge_eval_attempt duplicate protection', () => {
150150
});
151151
return data!;
152152
},
153-
(task) => task.status === 'completed' || task.status === 'failed',
154-
{ label: `run_eval.complete[${taskId.slice(0, 8)}]`, maxAttempts: 30 },
153+
(task) =>
154+
task.status === 'completed' && task.acceptedAttemptN === attemptN,
155+
{ label: `run_eval.accepted[${taskId.slice(0, 8)}]`, maxAttempts: 30 },
155156
);
156157
}
157158

@@ -213,6 +214,14 @@ describe('judge_eval_attempt duplicate protection', () => {
213214
expect(prematureClaim.response.status).toBe(409);
214215

215216
await completeRunEval(firstRunTaskId);
217+
const halfReadyClaim = await claimTask({
218+
client,
219+
auth: () => claimer.accessToken,
220+
path: { id: judge!.id },
221+
body: { leaseTtlSec: 60 },
222+
});
223+
expect(halfReadyClaim.response.status).toBe(409);
224+
216225
const stillWaiting = await pollUntil(
217226
async () => {
218227
const { data } = await getTask({
@@ -228,20 +237,6 @@ describe('judge_eval_attempt duplicate protection', () => {
228237
expect(stillWaiting.status).toBe('waiting');
229238

230239
await completeRunEval(secondRunTaskId);
231-
const queued = await pollUntil(
232-
async () => {
233-
const { data } = await getTask({
234-
client,
235-
auth: () => proposer.accessToken,
236-
path: { id: judge!.id },
237-
});
238-
return data!;
239-
},
240-
(task) => task.status === 'queued',
241-
{ label: 'conditional judge promoted', maxAttempts: 30 },
242-
);
243-
expect(queued.status).toBe('queued');
244-
245240
const claimed = await claimTask({
246241
client,
247242
auth: () => claimer.accessToken,

apps/rest-api/e2e/legreffier-onboarding.e2e.test.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import {
2121
getLegreffierOnboardingStatus,
2222
startLegreffierOnboarding,
2323
} from '@moltnet/api-client';
24+
import { cryptoService } from '@moltnet/crypto-service';
2425
import { afterAll, beforeAll, describe, expect, it } from 'vitest';
2526

2627
import { createTestHarness, type TestHarness } from './setup.js';
@@ -179,12 +180,13 @@ describe('LeGreffier onboarding', () => {
179180
describe('happy path (requires SPONSOR_AGENT_ID)', () => {
180181
it('full onboarding flow: start → callback → installed → completed', async () => {
181182
// Arrange
183+
const keyPair = await cryptoService.generateKeyPair();
182184
const { data: startData, error: startError } =
183185
await startLegreffierOnboarding({
184186
client,
185187
body: {
186-
publicKey: VALID_PUBLIC_KEY,
187-
fingerprint: VALID_FINGERPRINT,
188+
publicKey: keyPair.publicKey,
189+
fingerprint: keyPair.fingerprint,
188190
agentName: 'e2e-test-bot',
189191
},
190192
});

libs/task-service/src/task.service.ts

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,43 @@ export function createTaskService(deps: TaskServiceDeps) {
438438
);
439439
}
440440

441+
async function promoteWaitingTaskIfSatisfied(row: DbTask): Promise<DbTask> {
442+
const condition = row.claimCondition as ClaimCondition | null;
443+
if (row.status !== 'waiting' || !condition) return row;
444+
const tasksById = await loadConditionTaskMap(condition);
445+
if (!evaluateClaimConditionFromTasks(condition, tasksById)) return row;
446+
447+
const proposerId = row.proposedByAgentId ?? row.proposedByHumanId;
448+
const proposerNs = row.proposedByAgentId
449+
? KetoNamespace.Agent
450+
: KetoNamespace.Human;
451+
if (!proposerId) {
452+
logger.warn({ taskId: row.id }, 'task.claim.waiting.missing_proposer');
453+
return row;
454+
}
455+
456+
const errors = await validateTaskInputAsync(
457+
row.taskType,
458+
row.input,
459+
makeAsyncValidationContext(proposerId, proposerNs, {
460+
currentTaskId: row.id,
461+
}),
462+
);
463+
if (errors.length > 0) {
464+
logger.warn(
465+
{ taskId: row.id, errors },
466+
'task.claim.waiting.strict_validation_failed',
467+
);
468+
return row;
469+
}
470+
471+
const [promoted] = await transactionRunner.runInTransaction(
472+
() => taskRepository.promoteWaitingTasks([row.id]),
473+
{ name: 'task.claim.promoteWaiting' },
474+
);
475+
return promoted ?? row;
476+
}
477+
441478
async function tryPromoteSatisfiedWaitingTasks(opts: {
442479
triggerTaskId?: string;
443480
}): Promise<void> {
@@ -888,7 +925,23 @@ export function createTaskService(deps: TaskServiceDeps) {
888925
throw new TaskServiceError('invalid', 'Only agents may claim tasks');
889926
}
890927

891-
const row = await taskRepository.findById(taskId);
928+
const initialRow = await taskRepository.findById(taskId);
929+
if (initialRow?.status === 'waiting') {
930+
const canClaimWaiting = await permissionChecker.canClaimTask(
931+
taskId,
932+
callerId,
933+
callerNs,
934+
);
935+
if (!canClaimWaiting)
936+
throw new TaskServiceError(
937+
'forbidden',
938+
'Not authorized to claim this task',
939+
);
940+
}
941+
const row =
942+
initialRow?.status === 'waiting'
943+
? await promoteWaitingTaskIfSatisfied(initialRow)
944+
: initialRow;
892945
if (!row || row.status !== 'queued') {
893946
throw new TaskServiceError(
894947
'conflict',

0 commit comments

Comments
 (0)