Skip to content

Commit 71a8c31

Browse files
authored
feat(notifications): Linear dispatcher in fanout consumer (cost/turns/duration) (#243)
* feat(notifications): platform-side Linear final-status comment with cost/turns/duration Closes #239. Adds a Linear dispatcher to the fanout consumer alongside the existing slack/github/email dispatchers. Posts a deterministic final-status comment on terminal task events for Linear-origin tasks, with cost, turns/max_turns, duration, and PR link rendered. Three framing modes by (event_type, pr_url): ✅ task_completed → 'Task completed' ⚠️ error_max_turns + pr_url → 'Shipped a PR but stopped early' ❌ all other terminal states → 'Task <subtype>: <classifier title>' The ⚠️ case is the motivating ABCA-91 scenario: agent hit max_turns on turn 101 after shipping PR #35; previous behaviour was a silent ❌ reaction with no metrics surfaced to the requester. The platform-side comment fires deterministically even when the agent crashes (OOM, SDK buffer overflow, max_turns) before reaching its own step-3 completion comment. Architecture: - One new entry in NotificationChannel + CHANNEL_DEFAULTS + DISPATCHERS in fanout-task-events.ts. Dispatcher gates on channel_source === 'linear' so non-Linear tasks short-circuit after one DDB Get. - Reuses the existing postIssueComment helper from shared/linear-feedback.ts (already in use by the screenshot pipeline + orchestrator failure-reporting paths). - New construct props linearWorkspaceRegistryTable + linearOauthSecretArnPattern guard the IAM grants the same way slackSecretArnPattern does — a deployment without LinearIntegration gets no dangling permission to bgagent-linear-oauth-*. - FanOutConsumer instantiation moved below LinearIntegration in agent.ts so it can receive the registry table reference. Tests: 92 passing in fanout-task-events.test.ts (1816 across full CDK suite). New Linear-dispatcher describe block covers happy path, failed without PR, ABCA-91 max_turns-with-PR, channel_source short- circuit, missing metadata, and postIssueComment-returning-false graceful no-op. * fix(linear-dispatcher): krokoko PR-243 review nits + test coverage Addresses the non-blocking nits from #239 review: - JSDoc on renderLinearFinalStatusComment now describes the actual (eventType, prUrl) discriminator rather than 'error_max_turns' as an event type. The agent_status discrimination lives in the error classifier, not in the dispatcher's framing logic. - Inline comment on classifyError result corrected: returns null only for empty error_message, UNKNOWN_CLASSIFICATION (title 'Unexpected error') for any non-empty unmatched message. - ⚠️ frame now appends classifier title — for ABCA-91 the requester sees 'Shipped a PR but stopped early — Exceeded max turns', not just the bare PR-shipped frame. - missing_env and post_failed log paths bumped from INFO to WARN with error_id tags so missing-env / post-failure are alarmable. The Linear comment is the only completion signal for the agent-crash case, so silent drops defeat the dispatcher's purpose. - Stale 'at most 3 channels' comment in routeEvent updated to 4. Test coverage: - New test: LINEAR_WORKSPACE_REGISTRY_TABLE_NAME unset → WARN + skip (the deploy-misconfig safety valve was unexercised). - New test: error_max_turns WITHOUT pr_url renders ❌, not ⚠️ (the ⚠️↔❌ boundary the other direction). - New describe block: 8 direct tests of renderLinearFinalStatusComment covering null-metric fallbacks, formatDuration boundaries (<60s, exact-minute Nm, mixed Nm Ss), classifier-title rendering on ⚠️ and ❌ frames, and the no-trailing-colon when errorTitle is null. Total: 102 tests in fanout-task-events.test.ts (was 92), 1826 passing across the full CDK suite. * fix(linear): drop redundant PR url + agent step-3 comment after first dev smoke After the first dev deploy of the Linear dispatcher (#239), two near-duplicate things showed up on the Linear thread: 1. The platform's ✅ comment carried PR: <url> while the agent's step-2 'PR opened' comment had already posted the same link one slot earlier. Two clickable copies of the same URL adds noise. 2. The agent's step-3 'task completed' free-form comment stacked right next to the platform's ✅ structured comment with full metrics. Two completion comments back-to-back with overlapping intent — the platform one is strictly more informative. Changes: - renderLinearFinalStatusComment: render PR url ONLY on the ⚠️ shipped-but-stopped-early path. On ✅, the agent's step-2 comment is guaranteed to have fired with the PR link; on ⚠️ the agent may have crashed before step-2 (e.g. ABCA-91 max-turns on turn 101), so the platform comment is the backup signal and the PR url has to be there. - Updated the corresponding test to assert not.toContain on the ✅ fixture's PR URL. - Removed step 3 from the Linear-channel prompt contract in prompt_builder.py. Replaced with an explicit prohibition against posting a final 'task completed/failed' comment, with a sentence pointing the agent at the platform fan-out plane (#239) as the source of truth for terminal status. Net Linear thread shape post-task: agent posts start (1) + PR-opened (2); platform posts terminal ✅/⚠️/❌ (3). One PR url, one completion comment. Krokoko predicted this exact migration in their PR-243 review — 'the agent prompt can drop step 3 once the platform side is reliable.' Targeted suite still 102 passing.
1 parent d97b780 commit 71a8c31

6 files changed

Lines changed: 834 additions & 69 deletions

File tree

agent/src/prompt_builder.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -159,12 +159,15 @@ def _channel_prompt_addendum(config: TaskConfig) -> str:
159159
"transition the issue state to `In Review` (fall back to `In Progress` "
160160
"if that state doesn't exist). If neither exists, skip the state "
161161
"transition — the PR comment alone is enough. Do not invent state "
162-
"names or loop on `list_issue_statuses`.\n"
163-
"3. **On completion or failure** — call `mcp__linear-server__save_comment` "
164-
"with the final status (succeeded / failed + short reason).\n\n"
165-
"Keep comments concise. Do not mirror the full agent transcript back to "
166-
"Linear. Even small tasks must post all three updates — users rely on "
167-
"them to track progress."
162+
"names or loop on `list_issue_statuses`.\n\n"
163+
"**Do NOT post a final 'task completed' or 'task failed' comment.** "
164+
"The platform fan-out plane (issue #239) posts a structured "
165+
"✅/⚠️/❌ summary on terminal events with cost / turns / duration / "
166+
"PR-link metrics that you don't have visibility into. A redundant "
167+
"agent-side completion comment would just stack two near-identical "
168+
"comments on the issue.\n\n"
169+
"Keep the start + PR-opened comments concise. Do not mirror the full "
170+
"agent transcript back to Linear."
168171
)
169172

170173

cdk/src/constructs/fanout-consumer.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,23 @@ export interface FanOutConsumerProps {
7676
*/
7777
readonly slackSecretArnPattern?: string;
7878

79+
/**
80+
* LinearWorkspaceRegistryTable — the Linear dispatcher reads this to
81+
* resolve per-workspace OAuth tokens at comment-post time. Optional:
82+
* when omitted, the dispatcher logs and skips so a deployment without
83+
* Linear onboarding doesn't accumulate dangling IAM grants.
84+
*/
85+
readonly linearWorkspaceRegistryTable?: dynamodb.ITable;
86+
87+
/**
88+
* Secrets Manager ARN-prefix pattern for per-workspace Linear OAuth
89+
* bundles. Mirrors ``slackSecretArnPattern`` shape — typically
90+
* ``bgagent-linear-oauth-*``. Required when ``linearWorkspaceRegistryTable``
91+
* is set; without it the dispatcher would resolve the registry row but
92+
* fail at the SM GetSecretValue call.
93+
*/
94+
readonly linearOauthSecretArnPattern?: string;
95+
7996
/**
8097
* Maximum batch size delivered to the Lambda per invocation.
8198
*
@@ -173,6 +190,30 @@ export class FanOutConsumer extends Construct {
173190
}));
174191
}
175192

193+
// Linear dispatcher plumbing. Same guarded shape as Slack/GitHub:
194+
// a deployment without Linear onboarding gets no IAM grants and
195+
// the dispatcher logs-and-skips on missing env. The registry table
196+
// tells us per-workspace OAuth-secret ARN; the secret holds the
197+
// access token that ``postIssueComment`` uses to drive
198+
// ``commentCreate`` GraphQL.
199+
if (props.linearWorkspaceRegistryTable) {
200+
props.linearWorkspaceRegistryTable.grantReadData(this.fn);
201+
this.fn.addEnvironment(
202+
'LINEAR_WORKSPACE_REGISTRY_TABLE_NAME',
203+
props.linearWorkspaceRegistryTable.tableName,
204+
);
205+
}
206+
if (props.linearOauthSecretArnPattern) {
207+
this.fn.addToRolePolicy(new iam.PolicyStatement({
208+
// GetSecretValue + PutSecretValue: the resolver may rotate the
209+
// OAuth token (writes the refreshed bundle back to SM) — same
210+
// grants the LinearIntegration's webhook-processor Lambda holds
211+
// for the same reason.
212+
actions: ['secretsmanager:GetSecretValue', 'secretsmanager:PutSecretValue'],
213+
resources: [props.linearOauthSecretArnPattern],
214+
}));
215+
}
216+
176217
this.fn.addEventSource(new DynamoEventSource(props.taskEventsTable, {
177218
startingPosition: StartingPosition.LATEST,
178219
batchSize: props.batchSize ?? 100,

cdk/src/handlers/fanout-task-events.ts

Lines changed: 243 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,9 @@ import type {
4646
DynamoDBStreamEvent,
4747
} from 'aws-lambda';
4848
import { clearTokenCache, resolveGitHubToken } from './shared/context-hydration';
49+
import { classifyError } from './shared/error-classifier';
4950
import { renderCommentBody, upsertTaskComment } from './shared/github-comment';
51+
import { postIssueComment } from './shared/linear-feedback';
5052
import { logger } from './shared/logger';
5153
import { coerceNumericOrNull } from './shared/numeric';
5254
import { loadRepoConfig } from './shared/repo-config';
@@ -105,7 +107,7 @@ const APPROVAL_NOTIFICATION_EVENTS = [
105107
* - Per-user rate limit of 10 approval-related messages per minute
106108
* is enforced in the dispatcher, not in this filter.
107109
*/
108-
export type NotificationChannel = 'slack' | 'email' | 'github';
110+
export type NotificationChannel = 'slack' | 'email' | 'github' | 'linear';
109111

110112
export const CHANNEL_DEFAULTS: Record<NotificationChannel, ReadonlySet<string>> = {
111113
// Slack is the "on-call" channel per §6.2 — all terminal outcomes
@@ -155,6 +157,21 @@ export const CHANNEL_DEFAULTS: Record<NotificationChannel, ReadonlySet<string>>
155157
...TERMINAL_EVENT_TYPES,
156158
'pr_created',
157159
]),
160+
// Linear posts a single deterministic final-status comment on
161+
// terminal events. The agent's three-comment prompt contract (start /
162+
// PR-opened / completion) covers in-flight progress; this dispatcher
163+
// only fires once the task reaches a terminal state, with cost /
164+
// turns / duration / pr_url metrics the requester wouldn't otherwise
165+
// see. Crucially, this fires even when the agent crashes (e.g.
166+
// error_max_turns, OOM) before reaching its own step-3 completion
167+
// comment — the GH issue #239 motivating example.
168+
//
169+
// Linear's `save_comment` doesn't support edit, so this is post-once
170+
// (no live updates a la GitHub edit-in-place). Approvals / milestones
171+
// are excluded for the same reason — N comments rather than 1.
172+
linear: new Set<string>([
173+
...TERMINAL_EVENT_TYPES,
174+
]),
158175
};
159176

160177
/**
@@ -753,13 +770,235 @@ async function dispatchToEmail(event: FanOutEvent): Promise<void> {
753770
});
754771
}
755772

773+
/**
774+
* Render the Linear final-status comment body. Inputs are already
775+
* coerced to native types by the caller; this function only formats.
776+
*
777+
* The framing flips between three outcomes based on `(eventType, prUrl)`:
778+
*
779+
* 1. ``task_completed`` → ✅ "Task completed"
780+
* 2. any non-completed terminal event WITH PR → ⚠️ "Shipped a PR but stopped early"
781+
* (the motivating ABCA-91 case is max-turns-with-PR, but the same
782+
* framing applies to any terminal failure — budget cap, agent
783+
* crash, etc. — that managed to ship a PR before stopping)
784+
* 3. any non-completed terminal event NO PR → ❌ "Task <subtype>" + classifier title
785+
*
786+
* The ⚠️ frame appends the classifier title when one is available so the
787+
* requester sees both outcomes (the PR shipped, AND the reason it
788+
* stopped — "Hit max-turns cap" for ABCA-91).
789+
*
790+
* Cost / turns / duration appear as a subtitle line. Missing values
791+
* (e.g. failure before the agent emitted any tokens) render as `—`.
792+
*/
793+
export function renderLinearFinalStatusComment(args: {
794+
eventType: string;
795+
prUrl: string | null;
796+
costUsd: number | null;
797+
turns: number | null;
798+
maxTurns: number | null;
799+
durationS: number | null;
800+
taskId: string;
801+
errorTitle: string | null;
802+
}): string {
803+
const isCompleted = args.eventType === 'task_completed';
804+
const shippedDespiteFailure = !isCompleted && args.prUrl != null;
805+
806+
let header: string;
807+
if (isCompleted) {
808+
header = '✅ **Task completed**';
809+
} else if (shippedDespiteFailure) {
810+
// Append the classifier title (when known) so the requester sees
811+
// *why* the agent stopped, not just that it shipped a PR. For
812+
// ABCA-91 this renders "...stopped early — Hit max-turns cap".
813+
const reason = args.errorTitle ? ` — ${args.errorTitle}` : '';
814+
header = `⚠️ **Shipped a PR but stopped early${reason}** — review and decide if more work is needed`;
815+
} else {
816+
const reason = args.errorTitle ? `: ${args.errorTitle}` : '';
817+
header = `❌ **Task ${args.eventType.replace(/^task_/, '')}${reason}**`;
818+
}
819+
820+
const costStr = args.costUsd != null ? `$${args.costUsd.toFixed(2)}` : '—';
821+
const turnsStr = args.turns != null
822+
? `${args.turns}${args.maxTurns != null ? ` / ${args.maxTurns}` : ''}`
823+
: '—';
824+
const durationStr = args.durationS != null
825+
? formatDuration(args.durationS)
826+
: '—';
827+
828+
const lines: string[] = [
829+
header,
830+
'',
831+
`cost: ${costStr} • turns: ${turnsStr} • duration: ${durationStr}`,
832+
];
833+
// Render the PR URL only on the ⚠️ "shipped a PR but stopped early"
834+
// path — that's the case where the agent's own step-2 "PR opened"
835+
// comment is *not* guaranteed to have fired (the agent may have
836+
// crashed between opening the PR and posting the comment, e.g.
837+
// ABCA-91 hitting max-turns on turn 101). On the ✅ success path the
838+
// agent's step-2 comment reliably carries the PR link, so duplicating
839+
// it here is just noise.
840+
if (args.prUrl && shippedDespiteFailure) {
841+
lines.push('', `PR: ${args.prUrl}`);
842+
}
843+
lines.push('', `_task ${args.taskId}_`);
844+
return lines.join('\n');
845+
}
846+
847+
function formatDuration(seconds: number): string {
848+
const total = Math.round(seconds);
849+
if (total < 60) return `${total}s`;
850+
const m = Math.floor(total / 60);
851+
const s = total % 60;
852+
return s === 0 ? `${m}m` : `${m}m ${s}s`;
853+
}
854+
855+
/**
856+
* Linear dispatcher — posts a deterministic final-status comment when a
857+
* Linear-origin task reaches a terminal event. Mirrors Slack's structural
858+
* shape (channel_source gate, best-effort, single error-isolation point):
859+
*
860+
* 1. Load TaskRecord. Skip if missing (TTL eviction race).
861+
* 2. Gate on ``channel_source === 'linear'`` so non-Linear tasks
862+
* short-circuit after one DDB Get.
863+
* 3. Read ``linear_issue_id`` + ``linear_workspace_id`` from
864+
* ``channel_metadata``. Skip if either is missing — defensive,
865+
* shouldn't happen for properly-admitted Linear tasks.
866+
* 4. Render the comment + post via the existing ``postIssueComment``
867+
* helper, which itself swallows network/auth errors and returns
868+
* false rather than throwing.
869+
*
870+
* Failure handling: ``postIssueComment`` is best-effort — a Linear API
871+
* outage logs and returns false rather than throwing. We reflect that
872+
* outcome in the dispatcher log but never reject the dispatcher
873+
* promise: a failed Linear comment shouldn't trigger ``routeEvent``'s
874+
* batch-retry path because retrying won't fix Linear's API.
875+
*/
876+
async function dispatchToLinear(event: FanOutEvent): Promise<void> {
877+
const registryTableName = process.env.LINEAR_WORKSPACE_REGISTRY_TABLE_NAME;
878+
if (!registryTableName) {
879+
// WARN with error_id so this is alarmable. The Linear comment is
880+
// the *only* completion signal for the agent-crash case (#239), so a
881+
// misconfigured env var would silently drop every Linear-origin
882+
// task's metrics — exactly the gap this dispatcher was built to
883+
// close. The GitHub dispatcher uses the same WARN+error_id pattern
884+
// for its missing-env path.
885+
logger.warn('[fanout/linear] LINEAR_WORKSPACE_REGISTRY_TABLE_NAME not set — skipping', {
886+
event: 'fanout.linear.missing_env',
887+
error_id: 'FANOUT_LINEAR_MISSING_ENV',
888+
task_id: event.task_id,
889+
});
890+
return;
891+
}
892+
893+
const task = await loadTaskForComment(event.task_id);
894+
if (!task) {
895+
logger.warn('[fanout/linear] task not found — skipping comment', {
896+
event: 'fanout.linear.task_missing',
897+
task_id: event.task_id,
898+
});
899+
return;
900+
}
901+
902+
// channel_source gate — short-circuit non-Linear tasks. Same shape
903+
// Slack uses to keep the GitHub edit-in-place comment from racing
904+
// against the platform-side Linear comment when channel_source is
905+
// 'github'/'slack'/'api'.
906+
if (task.channel_source !== 'linear') {
907+
return;
908+
}
909+
910+
const issueId = task.channel_metadata?.linear_issue_id;
911+
const workspaceId = task.channel_metadata?.linear_workspace_id;
912+
if (!issueId || !workspaceId) {
913+
logger.warn('[fanout/linear] task missing linear_issue_id or linear_workspace_id — skipping', {
914+
event: 'fanout.linear.metadata_missing',
915+
task_id: event.task_id,
916+
has_issue_id: Boolean(issueId),
917+
has_workspace_id: Boolean(workspaceId),
918+
});
919+
return;
920+
}
921+
922+
// Derive an error title from `error_message` via the shared classifier.
923+
// Same data the API surfaces as `error_classification.title` —
924+
// "Hit max-turns cap", "Insufficient GitHub permissions", etc.
925+
//
926+
// Returns null only when error_message is empty/undefined (the
927+
// task_completed case). For any non-empty error_message that doesn't
928+
// match a known pattern, returns the UNKNOWN_CLASSIFICATION fallback
929+
// ("Unexpected error") — so a generic failure still gets a structured
930+
// title rather than nothing. See error-classifier.ts.
931+
const classification = classifyError(task.error_message);
932+
933+
const body = renderLinearFinalStatusComment({
934+
eventType: event.event_type,
935+
prUrl: task.pr_url ?? null,
936+
// DDB returns numeric attributes as strings at the Document-client
937+
// boundary; coerce so toFixed/comparisons work. Same pattern the
938+
// GitHub dispatcher uses.
939+
costUsd: coerceNumericOrNull(
940+
task.cost_usd,
941+
{ field: 'cost_usd', task_id: task.task_id, event_id: event.event_id },
942+
logger,
943+
),
944+
turns: coerceNumericOrNull(
945+
task.turns_attempted,
946+
{ field: 'turns_attempted', task_id: task.task_id, event_id: event.event_id },
947+
logger,
948+
),
949+
maxTurns: coerceNumericOrNull(
950+
task.max_turns,
951+
{ field: 'max_turns', task_id: task.task_id, event_id: event.event_id },
952+
logger,
953+
),
954+
durationS: coerceNumericOrNull(
955+
task.duration_s,
956+
{ field: 'duration_s', task_id: task.task_id, event_id: event.event_id },
957+
logger,
958+
),
959+
taskId: task.task_id,
960+
errorTitle: classification?.title ?? null,
961+
});
962+
963+
const ok = await postIssueComment(
964+
{ linearWorkspaceId: workspaceId, registryTableName },
965+
issueId,
966+
body,
967+
);
968+
969+
// Split the success / failure path so post-failure can be alarmed
970+
// distinctly. The underlying linear-feedback.ts path already WARNs
971+
// on the specific failure reason (auth, network, etc.); this
972+
// backstop ensures a steady drip of post-failures shows up in the
973+
// dispatcher's own log channel for cross-channel alarms.
974+
if (ok) {
975+
logger.info('[fanout/linear] comment dispatched', {
976+
event: 'fanout.linear.dispatched',
977+
task_id: task.task_id,
978+
issue_id: issueId,
979+
event_type: event.event_type,
980+
posted: true,
981+
});
982+
} else {
983+
logger.warn('[fanout/linear] postIssueComment returned false — Linear API path failed', {
984+
event: 'fanout.linear.post_failed',
985+
error_id: 'FANOUT_LINEAR_POST_FAILED',
986+
task_id: task.task_id,
987+
issue_id: issueId,
988+
event_type: event.event_type,
989+
posted: false,
990+
});
991+
}
992+
}
993+
756994
/** Exposed for testing: the per-channel dispatcher callable by the
757995
* handler. Each key's absence from the routing map disables its
758996
* dispatcher; the signature is uniform so adding a channel is one
759997
* entry. */
760998
const DISPATCHERS: Record<NotificationChannel, (ev: FanOutEvent) => Promise<void>> = {
761999
slack: dispatchToSlack,
7621000
github: dispatchToGitHubComment,
1001+
linear: dispatchToLinear,
7631002
email: dispatchToEmail,
7641003
};
7651004

@@ -814,8 +1053,9 @@ export async function routeEvent(
8141053
attempted.push(ch);
8151054
tasks.push(DISPATCHERS[ch](ev));
8161055
}
817-
// Parallelism is bounded by the dispatcher list (at most 3 channels),
818-
// not by program input, so the unbounded-parallelism lint does not apply.
1056+
// Parallelism is bounded by the dispatcher list (4 channels:
1057+
// slack/github/linear/email), not by program input, so the
1058+
// unbounded-parallelism lint does not apply.
8191059

8201060
const results = await Promise.allSettled(tasks);
8211061

cdk/src/handlers/shared/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ export interface TaskNotificationsConfig {
219219
readonly slack?: ChannelConfig;
220220
readonly email?: ChannelConfig;
221221
readonly github?: ChannelConfig;
222+
readonly linear?: ChannelConfig;
222223
}
223224

224225
/**

0 commit comments

Comments
 (0)