Skip to content

Commit 72f14f8

Browse files
committed
feat(supervisor): schedule-tree node affinity
1 parent d4772b5 commit 72f14f8

File tree

6 files changed

+95
-9
lines changed

6 files changed

+95
-9
lines changed

apps/supervisor/src/env.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,13 @@ const Env = z.object({
117117
KUBERNETES_PROJECT_AFFINITY_WEIGHT: z.coerce.number().int().min(1).max(100).default(50),
118118
KUBERNETES_PROJECT_AFFINITY_TOPOLOGY_KEY: z.string().trim().min(1).default("kubernetes.io/hostname"),
119119

120+
// Schedule affinity settings - runs from schedule trees prefer a dedicated pool
121+
KUBERNETES_SCHEDULE_AFFINITY_ENABLED: BoolEnv.default(false),
122+
KUBERNETES_SCHEDULE_AFFINITY_POOL_LABEL_KEY: z.string().default("node.cluster.x-k8s.io/machinepool"),
123+
KUBERNETES_SCHEDULE_AFFINITY_POOL_LABEL_VALUE: z.string().default("scheduled-runs"),
124+
KUBERNETES_SCHEDULE_AFFINITY_WEIGHT: z.coerce.number().int().min(1).max(100).default(80),
125+
KUBERNETES_SCHEDULE_ANTI_AFFINITY_WEIGHT: z.coerce.number().int().min(1).max(100).default(20),
126+
120127
// Placement tags settings
121128
PLACEMENT_TAGS_ENABLED: BoolEnv.default(false),
122129
PLACEMENT_TAGS_PREFIX: z.string().default("node.cluster.x-k8s.io"),

apps/supervisor/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,7 @@ class ManagedSupervisor {
267267
snapshotId: message.snapshot.id,
268268
snapshotFriendlyId: message.snapshot.friendlyId,
269269
placementTags: message.placementTags,
270+
annotations: message.run.annotations,
270271
});
271272

272273
// Disabled for now

apps/supervisor/src/workloadManager/kubernetes.ts

Lines changed: 81 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ export class KubernetesWorkloadManager implements WorkloadManager {
120120
},
121121
spec: {
122122
...this.addPlacementTags(this.#defaultPodSpec, opts.placementTags),
123-
affinity: this.#getAffinity(opts.machine, opts.projectId),
123+
affinity: this.#getAffinity(opts),
124124
terminationGracePeriodSeconds: 60 * 60,
125125
containers: [
126126
{
@@ -335,13 +335,22 @@ export class KubernetesWorkloadManager implements WorkloadManager {
335335
};
336336
}
337337

338+
#isScheduledRun(opts: WorkloadManagerCreateOptions): boolean {
339+
return opts.annotations?.rootTriggerSource === "schedule";
340+
}
341+
338342
#getSharedLabels(opts: WorkloadManagerCreateOptions): Record<string, string> {
339343
return {
340344
env: opts.envId,
341345
envtype: this.#envTypeToLabelValue(opts.envType),
342346
org: opts.orgId,
343347
project: opts.projectId,
344348
machine: opts.machine.name,
349+
// We intentionally use a boolean label rather than exposing the full trigger source
350+
// (e.g. sdk, api, cli, mcp, schedule) to keep label cardinality low in metrics.
351+
// The schedule vs non-schedule distinction is all we need for the current metrics
352+
// and pool-level scheduling decisions; finer-grained source breakdowns live in run annotations.
353+
scheduled: String(this.#isScheduledRun(opts)),
345354
};
346355
}
347356

@@ -390,16 +399,37 @@ export class KubernetesWorkloadManager implements WorkloadManager {
390399
return preset.name.startsWith("large-");
391400
}
392401

393-
#getAffinity(preset: MachinePreset, projectId: string): k8s.V1Affinity | undefined {
394-
const nodeAffinity = this.#getNodeAffinityRules(preset);
395-
const podAffinity = this.#getProjectPodAffinity(projectId);
396-
397-
if (!nodeAffinity && !podAffinity) {
402+
#getAffinity(opts: WorkloadManagerCreateOptions): k8s.V1Affinity | undefined {
403+
const largeNodeAffinity = this.#getNodeAffinityRules(opts.machine);
404+
const scheduleNodeAffinity = this.#getScheduleNodeAffinityRules(this.#isScheduledRun(opts));
405+
const podAffinity = this.#getProjectPodAffinity(opts.projectId);
406+
407+
// Merge node affinity rules from multiple sources
408+
const preferred = [
409+
...(largeNodeAffinity?.preferredDuringSchedulingIgnoredDuringExecution ?? []),
410+
...(scheduleNodeAffinity?.preferredDuringSchedulingIgnoredDuringExecution ?? []),
411+
];
412+
// Only large machine affinity produces hard requirements (non-large runs must stay off the large pool).
413+
// Schedule affinity is soft both ways.
414+
const required = [
415+
...(largeNodeAffinity?.requiredDuringSchedulingIgnoredDuringExecution?.nodeSelectorTerms ?? []),
416+
];
417+
418+
const hasNodeAffinity = preferred.length > 0 || required.length > 0;
419+
420+
if (!hasNodeAffinity && !podAffinity) {
398421
return undefined;
399422
}
400423

401424
return {
402-
...(nodeAffinity && { nodeAffinity }),
425+
...(hasNodeAffinity && {
426+
nodeAffinity: {
427+
...(preferred.length > 0 && { preferredDuringSchedulingIgnoredDuringExecution: preferred }),
428+
...(required.length > 0 && {
429+
requiredDuringSchedulingIgnoredDuringExecution: { nodeSelectorTerms: required },
430+
}),
431+
},
432+
}),
403433
...(podAffinity && { podAffinity }),
404434
};
405435
}
@@ -447,6 +477,50 @@ export class KubernetesWorkloadManager implements WorkloadManager {
447477
};
448478
}
449479

480+
#getScheduleNodeAffinityRules(isScheduledRun: boolean): k8s.V1NodeAffinity | undefined {
481+
if (!env.KUBERNETES_SCHEDULE_AFFINITY_ENABLED || !env.KUBERNETES_SCHEDULE_AFFINITY_POOL_LABEL_VALUE) {
482+
return undefined;
483+
}
484+
485+
if (isScheduledRun) {
486+
// soft preference for the schedule pool
487+
return {
488+
preferredDuringSchedulingIgnoredDuringExecution: [
489+
{
490+
weight: env.KUBERNETES_SCHEDULE_AFFINITY_WEIGHT,
491+
preference: {
492+
matchExpressions: [
493+
{
494+
key: env.KUBERNETES_SCHEDULE_AFFINITY_POOL_LABEL_KEY,
495+
operator: "In",
496+
values: [env.KUBERNETES_SCHEDULE_AFFINITY_POOL_LABEL_VALUE],
497+
},
498+
],
499+
},
500+
},
501+
],
502+
};
503+
}
504+
505+
// soft anti-affinity: non-schedule runs prefer to avoid the schedule pool
506+
return {
507+
preferredDuringSchedulingIgnoredDuringExecution: [
508+
{
509+
weight: env.KUBERNETES_SCHEDULE_ANTI_AFFINITY_WEIGHT,
510+
preference: {
511+
matchExpressions: [
512+
{
513+
key: env.KUBERNETES_SCHEDULE_AFFINITY_POOL_LABEL_KEY,
514+
operator: "NotIn",
515+
values: [env.KUBERNETES_SCHEDULE_AFFINITY_POOL_LABEL_VALUE],
516+
},
517+
],
518+
},
519+
},
520+
],
521+
};
522+
}
523+
450524
#getProjectPodAffinity(projectId: string): k8s.V1PodAffinity | undefined {
451525
if (!env.KUBERNETES_PROJECT_AFFINITY_ENABLED) {
452526
return undefined;

apps/supervisor/src/workloadManager/types.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { EnvironmentType, MachinePreset, PlacementTag } from "@trigger.dev/core/v3";
1+
import type { EnvironmentType, MachinePreset, PlacementTag, RunAnnotations } from "@trigger.dev/core/v3";
22

33
export interface WorkloadManagerOptions {
44
workloadApiProtocol: "http" | "https";
@@ -35,4 +35,5 @@ export interface WorkloadManagerCreateOptions {
3535
runFriendlyId: string;
3636
snapshotId: string;
3737
snapshotFriendlyId: string;
38+
annotations?: RunAnnotations;
3839
}

internal-packages/run-engine/src/engine/systems/dequeueSystem.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import type { BillingCache } from "../billingCache.js";
22
import { startSpan } from "@internal/tracing";
33
import { assertExhaustive, tryCatch } from "@trigger.dev/core";
4-
import { DequeuedMessage, RetryOptions } from "@trigger.dev/core/v3";
4+
import { DequeuedMessage, RetryOptions, RunAnnotations } from "@trigger.dev/core/v3";
55
import { placementTag } from "@trigger.dev/core/v3/serverOnly";
66
import { getMaxDuration } from "@trigger.dev/core/v3/isomorphic";
77
import {
@@ -575,6 +575,7 @@ export class DequeueSystem {
575575
// Keeping this for backwards compatibility, but really this should be called workerQueue
576576
masterQueue: lockedTaskRun.workerQueue,
577577
traceContext: lockedTaskRun.traceContext as Record<string, unknown>,
578+
annotations: RunAnnotations.safeParse(lockedTaskRun.annotations).data,
578579
},
579580
environment: {
580581
id: lockedTaskRun.runtimeEnvironment.id,

packages/core/src/v3/schemas/runEngine.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { z } from "zod";
22
import { Enum, MachinePreset, RuntimeEnvironmentType, TaskRunExecution } from "./common.js";
33
import { EnvironmentType } from "./schemas.js";
4+
import { RunAnnotations } from "./api.js";
45
import type * as DB_TYPES from "@trigger.dev/database";
56

67
export const TaskRunExecutionStatus = {
@@ -259,6 +260,7 @@ export const DequeuedMessage = z.object({
259260
attemptNumber: z.number(),
260261
masterQueue: z.string(),
261262
traceContext: z.record(z.unknown()),
263+
annotations: RunAnnotations.optional(),
262264
}),
263265
environment: z.object({
264266
id: z.string(),

0 commit comments

Comments
 (0)