Skip to content

Commit 3ba880d

Browse files
committed
feat(notifications): platform-side Linear final-status comment with cost/turns/duration
Closes #239. Adds a Linear dispatcher to the fanout consumer alongside the existing slack/github/email dispatchers. Posts a deterministic final-status comment on terminal task events for Linear-origin tasks, with cost, turns/max_turns, duration, and PR link rendered. Three framing modes by (event_type, pr_url): ✅ task_completed → 'Task completed' ⚠️ error_max_turns + pr_url → 'Shipped a PR but stopped early' ❌ all other terminal states → 'Task <subtype>: <classifier title>' The ⚠️ case is the motivating ABCA-91 scenario: agent hit max_turns on turn 101 after shipping PR #35; previous behaviour was a silent ❌ reaction with no metrics surfaced to the requester. The platform-side comment fires deterministically even when the agent crashes (OOM, SDK buffer overflow, max_turns) before reaching its own step-3 completion comment. Architecture: - One new entry in NotificationChannel + CHANNEL_DEFAULTS + DISPATCHERS in fanout-task-events.ts. Dispatcher gates on channel_source === 'linear' so non-Linear tasks short-circuit after one DDB Get. - Reuses the existing postIssueComment helper from shared/linear-feedback.ts (already in use by the screenshot pipeline + orchestrator failure-reporting paths). - New construct props linearWorkspaceRegistryTable + linearOauthSecretArnPattern guard the IAM grants the same way slackSecretArnPattern does — a deployment without LinearIntegration gets no dangling permission to bgagent-linear-oauth-*. - FanOutConsumer instantiation moved below LinearIntegration in agent.ts so it can receive the registry table reference. Tests: 92 passing in fanout-task-events.test.ts (1816 across full CDK suite). New Linear-dispatcher describe block covers happy path, failed without PR, ABCA-91 max_turns-with-PR, channel_source short- circuit, missing metadata, and postIssueComment-returning-false graceful no-op.
1 parent dac4e31 commit 3ba880d

5 files changed

Lines changed: 574 additions & 61 deletions

File tree

cdk/src/constructs/fanout-consumer.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,23 @@ export interface FanOutConsumerProps {
7676
*/
7777
readonly slackSecretArnPattern?: string;
7878

79+
/**
80+
* LinearWorkspaceRegistryTable — the Linear dispatcher reads this to
81+
* resolve per-workspace OAuth tokens at comment-post time. Optional:
82+
* when omitted, the dispatcher logs and skips so a deployment without
83+
* Linear onboarding doesn't accumulate dangling IAM grants.
84+
*/
85+
readonly linearWorkspaceRegistryTable?: dynamodb.ITable;
86+
87+
/**
88+
* Secrets Manager ARN-prefix pattern for per-workspace Linear OAuth
89+
* bundles. Mirrors ``slackSecretArnPattern`` shape — typically
90+
* ``bgagent-linear-oauth-*``. Required when ``linearWorkspaceRegistryTable``
91+
* is set; without it the dispatcher would resolve the registry row but
92+
* fail at the SM GetSecretValue call.
93+
*/
94+
readonly linearOauthSecretArnPattern?: string;
95+
7996
/**
8097
* Maximum batch size delivered to the Lambda per invocation.
8198
*
@@ -173,6 +190,30 @@ export class FanOutConsumer extends Construct {
173190
}));
174191
}
175192

193+
// Linear dispatcher plumbing. Same guarded shape as Slack/GitHub:
194+
// a deployment without Linear onboarding gets no IAM grants and
195+
// the dispatcher logs-and-skips on missing env. The registry table
196+
// tells us per-workspace OAuth-secret ARN; the secret holds the
197+
// access token that ``postIssueComment`` uses to drive
198+
// ``commentCreate`` GraphQL.
199+
if (props.linearWorkspaceRegistryTable) {
200+
props.linearWorkspaceRegistryTable.grantReadData(this.fn);
201+
this.fn.addEnvironment(
202+
'LINEAR_WORKSPACE_REGISTRY_TABLE_NAME',
203+
props.linearWorkspaceRegistryTable.tableName,
204+
);
205+
}
206+
if (props.linearOauthSecretArnPattern) {
207+
this.fn.addToRolePolicy(new iam.PolicyStatement({
208+
// GetSecretValue + PutSecretValue: the resolver may rotate the
209+
// OAuth token (writes the refreshed bundle back to SM) — same
210+
// grants the LinearIntegration's webhook-processor Lambda holds
211+
// for the same reason.
212+
actions: ['secretsmanager:GetSecretValue', 'secretsmanager:PutSecretValue'],
213+
resources: [props.linearOauthSecretArnPattern],
214+
}));
215+
}
216+
176217
this.fn.addEventSource(new DynamoEventSource(props.taskEventsTable, {
177218
startingPosition: StartingPosition.LATEST,
178219
batchSize: props.batchSize ?? 100,

cdk/src/handlers/fanout-task-events.ts

Lines changed: 197 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,9 @@ import type {
4646
DynamoDBStreamEvent,
4747
} from 'aws-lambda';
4848
import { clearTokenCache, resolveGitHubToken } from './shared/context-hydration';
49+
import { classifyError } from './shared/error-classifier';
4950
import { renderCommentBody, upsertTaskComment } from './shared/github-comment';
51+
import { postIssueComment } from './shared/linear-feedback';
5052
import { logger } from './shared/logger';
5153
import { coerceNumericOrNull } from './shared/numeric';
5254
import { loadRepoConfig } from './shared/repo-config';
@@ -105,7 +107,7 @@ const APPROVAL_NOTIFICATION_EVENTS = [
105107
* - Per-user rate limit of 10 approval-related messages per minute
106108
* is enforced in the dispatcher, not in this filter.
107109
*/
108-
export type NotificationChannel = 'slack' | 'email' | 'github';
110+
export type NotificationChannel = 'slack' | 'email' | 'github' | 'linear';
109111

110112
export const CHANNEL_DEFAULTS: Record<NotificationChannel, ReadonlySet<string>> = {
111113
// Slack is the "on-call" channel per §6.2 — all terminal outcomes
@@ -155,6 +157,21 @@ export const CHANNEL_DEFAULTS: Record<NotificationChannel, ReadonlySet<string>>
155157
...TERMINAL_EVENT_TYPES,
156158
'pr_created',
157159
]),
160+
// Linear posts a single deterministic final-status comment on
161+
// terminal events. The agent's three-comment prompt contract (start /
162+
// PR-opened / completion) covers in-flight progress; this dispatcher
163+
// only fires once the task reaches a terminal state, with cost /
164+
// turns / duration / pr_url metrics the requester wouldn't otherwise
165+
// see. Crucially, this fires even when the agent crashes (e.g.
166+
// error_max_turns, OOM) before reaching its own step-3 completion
167+
// comment — the GH issue #239 motivating example.
168+
//
169+
// Linear's `save_comment` doesn't support edit, so this is post-once
170+
// (no live updates a la GitHub edit-in-place). Approvals / milestones
171+
// are excluded for the same reason — N comments rather than 1.
172+
linear: new Set<string>([
173+
...TERMINAL_EVENT_TYPES,
174+
]),
158175
};
159176

160177
/**
@@ -742,13 +759,192 @@ async function dispatchToEmail(event: FanOutEvent): Promise<void> {
742759
});
743760
}
744761

762+
/**
763+
* Render the Linear final-status comment body. Inputs are already
764+
* coerced to native types by the caller; this function only formats.
765+
*
766+
* The framing flips between three outcomes based on `(eventType, prUrl)`:
767+
*
768+
* 1. ``task_completed`` → ✅ "Task completed"
769+
* 2. ``error_max_turns`` + PR → ⚠️ "Shipped a PR but hit max-turns cap" — the
770+
* motivating GH #239 case (ABCA-91 turn 101)
771+
* 3. all other terminal subtypes → ❌ "Task failed" + classifier title if known
772+
*
773+
* Cost / turns / duration appear as a subtitle line. Missing values
774+
* (e.g. failure before the agent emitted any tokens) render as `—`.
775+
*/
776+
export function renderLinearFinalStatusComment(args: {
777+
eventType: string;
778+
prUrl: string | null;
779+
costUsd: number | null;
780+
turns: number | null;
781+
maxTurns: number | null;
782+
durationS: number | null;
783+
taskId: string;
784+
errorTitle: string | null;
785+
}): string {
786+
const isCompleted = args.eventType === 'task_completed';
787+
const shippedDespiteFailure = !isCompleted && args.prUrl != null;
788+
789+
let header: string;
790+
if (isCompleted) {
791+
header = '✅ **Task completed**';
792+
} else if (shippedDespiteFailure) {
793+
header = '⚠️ **Shipped a PR but stopped early** — review and decide if more work is needed';
794+
} else {
795+
const reason = args.errorTitle ? `: ${args.errorTitle}` : '';
796+
header = `❌ **Task ${args.eventType.replace(/^task_/, '')}${reason}**`;
797+
}
798+
799+
const costStr = args.costUsd != null ? `$${args.costUsd.toFixed(2)}` : '—';
800+
const turnsStr = args.turns != null
801+
? `${args.turns}${args.maxTurns != null ? ` / ${args.maxTurns}` : ''}`
802+
: '—';
803+
const durationStr = args.durationS != null
804+
? formatDuration(args.durationS)
805+
: '—';
806+
807+
const lines: string[] = [
808+
header,
809+
'',
810+
`cost: ${costStr} • turns: ${turnsStr} • duration: ${durationStr}`,
811+
];
812+
if (args.prUrl) {
813+
lines.push('', `PR: ${args.prUrl}`);
814+
}
815+
lines.push('', `_task ${args.taskId}_`);
816+
return lines.join('\n');
817+
}
818+
819+
function formatDuration(seconds: number): string {
820+
const total = Math.round(seconds);
821+
if (total < 60) return `${total}s`;
822+
const m = Math.floor(total / 60);
823+
const s = total % 60;
824+
return s === 0 ? `${m}m` : `${m}m ${s}s`;
825+
}
826+
827+
/**
828+
* Linear dispatcher — posts a deterministic final-status comment when a
829+
* Linear-origin task reaches a terminal event. Mirrors Slack's structural
830+
* shape (channel_source gate, best-effort, single error-isolation point):
831+
*
832+
* 1. Load TaskRecord. Skip if missing (TTL eviction race).
833+
* 2. Gate on ``channel_source === 'linear'`` so non-Linear tasks
834+
* short-circuit after one DDB Get.
835+
* 3. Read ``linear_issue_id`` + ``linear_workspace_id`` from
836+
* ``channel_metadata``. Skip if either is missing — defensive,
837+
* shouldn't happen for properly-admitted Linear tasks.
838+
* 4. Render the comment + post via the existing ``postIssueComment``
839+
* helper, which itself swallows network/auth errors and returns
840+
* false rather than throwing.
841+
*
842+
* Failure handling: ``postIssueComment`` is best-effort — a Linear API
843+
* outage logs and returns false rather than throwing. We reflect that
844+
* outcome in the dispatcher log but never reject the dispatcher
845+
* promise: a failed Linear comment shouldn't trigger ``routeEvent``'s
846+
* batch-retry path because retrying won't fix Linear's API.
847+
*/
848+
async function dispatchToLinear(event: FanOutEvent): Promise<void> {
849+
const registryTableName = process.env.LINEAR_WORKSPACE_REGISTRY_TABLE_NAME;
850+
if (!registryTableName) {
851+
logger.info('[fanout/linear] LINEAR_WORKSPACE_REGISTRY_TABLE_NAME not set — skipping', {
852+
event: 'fanout.linear.missing_env',
853+
task_id: event.task_id,
854+
});
855+
return;
856+
}
857+
858+
const task = await loadTaskForComment(event.task_id);
859+
if (!task) {
860+
logger.warn('[fanout/linear] task not found — skipping comment', {
861+
event: 'fanout.linear.task_missing',
862+
task_id: event.task_id,
863+
});
864+
return;
865+
}
866+
867+
// channel_source gate — short-circuit non-Linear tasks. Same shape
868+
// Slack uses to keep the GitHub edit-in-place comment from racing
869+
// against the platform-side Linear comment when channel_source is
870+
// 'github'/'slack'/'api'.
871+
if (task.channel_source !== 'linear') {
872+
return;
873+
}
874+
875+
const issueId = task.channel_metadata?.linear_issue_id;
876+
const workspaceId = task.channel_metadata?.linear_workspace_id;
877+
if (!issueId || !workspaceId) {
878+
logger.warn('[fanout/linear] task missing linear_issue_id or linear_workspace_id — skipping', {
879+
event: 'fanout.linear.metadata_missing',
880+
task_id: event.task_id,
881+
has_issue_id: Boolean(issueId),
882+
has_workspace_id: Boolean(workspaceId),
883+
});
884+
return;
885+
}
886+
887+
// Derive an error title from `error_message` via the shared classifier.
888+
// Same data the API surfaces as `error_classification.title` —
889+
// "Hit max-turns cap", "Insufficient GitHub permissions", etc.
890+
// Returns null for tasks that completed successfully or whose error
891+
// message doesn't match any known pattern; the renderer falls back
892+
// to a generic frame in that case.
893+
const classification = classifyError(task.error_message);
894+
895+
const body = renderLinearFinalStatusComment({
896+
eventType: event.event_type,
897+
prUrl: task.pr_url ?? null,
898+
// DDB returns numeric attributes as strings at the Document-client
899+
// boundary; coerce so toFixed/comparisons work. Same pattern the
900+
// GitHub dispatcher uses.
901+
costUsd: coerceNumericOrNull(
902+
task.cost_usd,
903+
{ field: 'cost_usd', task_id: task.task_id, event_id: event.event_id },
904+
logger,
905+
),
906+
turns: coerceNumericOrNull(
907+
task.turns_attempted,
908+
{ field: 'turns_attempted', task_id: task.task_id, event_id: event.event_id },
909+
logger,
910+
),
911+
maxTurns: coerceNumericOrNull(
912+
task.max_turns,
913+
{ field: 'max_turns', task_id: task.task_id, event_id: event.event_id },
914+
logger,
915+
),
916+
durationS: coerceNumericOrNull(
917+
task.duration_s,
918+
{ field: 'duration_s', task_id: task.task_id, event_id: event.event_id },
919+
logger,
920+
),
921+
taskId: task.task_id,
922+
errorTitle: classification?.title ?? null,
923+
});
924+
925+
const ok = await postIssueComment(
926+
{ linearWorkspaceId: workspaceId, registryTableName },
927+
issueId,
928+
body,
929+
);
930+
931+
logger.info('[fanout/linear] comment dispatched', {
932+
event: ok ? 'fanout.linear.dispatched' : 'fanout.linear.post_failed',
933+
task_id: task.task_id,
934+
issue_id: issueId,
935+
event_type: event.event_type,
936+
posted: ok,
937+
});
938+
}
939+
745940
/** Exposed for testing: the per-channel dispatcher callable by the
746941
* handler. Each key's absence from the routing map disables its
747942
* dispatcher; the signature is uniform so adding a channel is one
748943
* entry. */
749944
const DISPATCHERS: Record<NotificationChannel, (ev: FanOutEvent) => Promise<void>> = {
750945
slack: dispatchToSlack,
751946
github: dispatchToGitHubComment,
947+
linear: dispatchToLinear,
752948
email: dispatchToEmail,
753949
};
754950

cdk/src/handlers/shared/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ export interface TaskNotificationsConfig {
204204
readonly slack?: ChannelConfig;
205205
readonly email?: ChannelConfig;
206206
readonly github?: ChannelConfig;
207+
readonly linear?: ChannelConfig;
207208
}
208209

209210
/**

cdk/src/stacks/agent.ts

Lines changed: 38 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -648,28 +648,8 @@ export class AgentStack extends Stack {
648648
attachmentsBucket: attachmentsBucket.bucket,
649649
});
650650

651-
// --- Fan-out plane consumer ---
652-
// Consumes TaskEventsTable DynamoDB Streams and dispatches events to
653-
// Slack / GitHub / email per per-channel default filters. GitHub
654-
// dispatcher edits a single issue comment in place; Slack
655-
// dispatcher (issue #64) reads per-workspace bot tokens from
656-
// ``bgagent/slack/*``. Email remains a log-only stub until Phase 2.
657-
new FanOutConsumer(this, 'FanOutConsumer', {
658-
taskEventsTable: taskEventsTable.table,
659-
taskTable: taskTable.table,
660-
repoTable: repoTable.table,
661-
githubTokenSecret,
662-
// Slack bot-token grant is guarded on this prop — pass the
663-
// ``bgagent/slack/*`` prefix so the FanOutConsumer can read
664-
// workspace tokens. Same scope SlackIntegration uses for its
665-
// own writers (PR #79 review #2).
666-
slackSecretArnPattern: Stack.of(this).formatArn({
667-
service: 'secretsmanager',
668-
resource: 'secret',
669-
resourceName: 'bgagent/slack/*',
670-
arnFormat: ArnFormat.COLON_RESOURCE_NAME,
671-
}),
672-
});
651+
// FanOutConsumer is constructed below LinearIntegration so the
652+
// Linear dispatcher can receive ``linearIntegration.workspaceRegistryTable``.
673653

674654
// --- Cedar HITL approval metrics publisher (Chunk 8, §11.3 / IMPL-28) ---
675655
// Consumer #2 of the TaskEventsTable stream (FanOutConsumer is #1).
@@ -835,6 +815,42 @@ export class AgentStack extends Stack {
835815
description: 'Name of the DynamoDB Linear workspace registry — `bgagent linear setup` writes a row per OAuth-installed workspace',
836816
});
837817

818+
// --- Fan-out plane consumer ---
819+
// Consumes TaskEventsTable DynamoDB Streams and dispatches events to
820+
// Slack / GitHub / Linear / email per per-channel default filters.
821+
// GitHub dispatcher edits a single issue comment in place; Slack
822+
// dispatcher (issue #64) reads per-workspace bot tokens from
823+
// ``bgagent/slack/*``; Linear dispatcher (issue #239) posts a single
824+
// deterministic final-status comment with cost/turns/duration.
825+
// Email remains a log-only stub until SES wires.
826+
new FanOutConsumer(this, 'FanOutConsumer', {
827+
taskEventsTable: taskEventsTable.table,
828+
taskTable: taskTable.table,
829+
repoTable: repoTable.table,
830+
githubTokenSecret,
831+
// Slack bot-token grant is guarded on this prop — pass the
832+
// ``bgagent/slack/*`` prefix so the FanOutConsumer can read
833+
// workspace tokens. Same scope SlackIntegration uses for its
834+
// own writers (PR #79 review #2).
835+
slackSecretArnPattern: Stack.of(this).formatArn({
836+
service: 'secretsmanager',
837+
resource: 'secret',
838+
resourceName: 'bgagent/slack/*',
839+
arnFormat: ArnFormat.COLON_RESOURCE_NAME,
840+
}),
841+
// Linear dispatcher reads workspace registry rows + per-workspace
842+
// OAuth-secret JSON. Same scope `bgagent-linear-oauth-*` as the
843+
// orchestrator and webhook processor — Lambdas in this stack share
844+
// the rotated-token write path; the agent runtime gets read-only.
845+
linearWorkspaceRegistryTable: linearIntegration.workspaceRegistryTable,
846+
linearOauthSecretArnPattern: Stack.of(this).formatArn({
847+
service: 'secretsmanager',
848+
resource: 'secret',
849+
resourceName: 'bgagent-linear-oauth-*',
850+
arnFormat: ArnFormat.COLON_RESOURCE_NAME,
851+
}),
852+
});
853+
838854
// --- GitHub deployment-status → screenshot pipeline ---
839855
// Listens for Vercel-style preview deploys, screenshots the
840856
// `deployment.environment_url` via AgentCore Browser, posts the

0 commit comments

Comments
 (0)