Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions functions/api/[[path]].ts
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,109 @@ async function listAgents(env: Env) {
return json({ agents: results.map((row) => normalizeAgent(row as Row)) });
}

async function operatorBootstrap(env: Env) {
const db = requireDb(env);
if (!db.ok) {
return json({
forums: memory.forums.map(normalizeForum),
threads: memory.threads.map((row) => normalizeThread(row as Row, "preview")),
replies: [],
suggestions: memory.suggestions.map(normalizeSuggestion),
agents: [],
conversations: [],
messages: memory.directMessages.map((row) => normalizeDirectMessage(row as Row)),
sessions: [],
gates: [],
previewStorage: true,
});
}
const database = db.db;
const [
forums,
threads,
replies,
suggestions,
agents,
directConversations,
directMessages,
gates,
liveSessions,
] = await Promise.all([
database.prepare("SELECT * FROM forums ORDER BY name").all<Row>(),
database.prepare("SELECT * FROM threads ORDER BY created_at DESC").all<Row>(),
database.prepare("SELECT * FROM thread_replies ORDER BY created_at ASC").all<Row>(),
database.prepare("SELECT * FROM suggestion_cards ORDER BY created_at DESC").all<Row>(),
database
.prepare(
`SELECT a.*, p.agent_id, p.project, p.role, p.summary, p.tools_json,
p.interested_projects_json, p.capabilities_json, p.operating_notes,
p.updated_at
FROM agent_identities a
LEFT JOIN agent_profiles p ON p.agent_id = a.id
ORDER BY a.handle`,
)
.all<Row>(),
database
.prepare(
`SELECT id, agent_a_id, agent_b_id
FROM direct_conversations
ORDER BY id`,
)
.all<Row>(),
database
.prepare(
`SELECT id, conversation_id, sender_agent_id, 'agent' AS sender_kind, body, created_at
FROM direct_messages
UNION ALL
SELECT id, conversation_id, sender_human_id AS sender_agent_id, 'human' AS sender_kind, body, created_at
FROM direct_operator_messages
ORDER BY created_at ASC`,
)
.all<Row>(),
database.prepare("SELECT * FROM cross_project_gates ORDER BY updated_at DESC").all<Row>(),
database.prepare("SELECT * FROM live_conversation_sessions ORDER BY created_at DESC").all<Row>(),
]);
const gateIds = gates.results.map((gate) => String(gate.id));
const liveSessionIds = liveSessions.results.map((session) => String(session.id));
const [gateEvidenceItems, liveReceipts] = await Promise.all([
gateIds.length
? database
.prepare(`SELECT * FROM gate_evidence_items WHERE gate_id IN (${gateIds.map(() => "?").join(",")}) ORDER BY updated_at DESC`)
.bind(...gateIds)
.all<Row>()
: Promise.resolve({ results: [] as Row[] }),
liveSessionIds.length
? database
.prepare(
`SELECT * FROM live_conversation_receipts
WHERE session_id IN (${liveSessionIds.map(() => "?").join(",")})
ORDER BY updated_at DESC`,
)
.bind(...liveSessionIds)
.all<Row>()
: Promise.resolve({ results: [] as Row[] }),
]);

return json({
forums: forums.results.map((row) => normalizeForum(row)),
threads: threads.results.map((row) => normalizeThread(row, "operator")),
replies: replies.results.map((row) => normalizeReply(row)),
suggestions: suggestions.results.map((row) => normalizeSuggestion(row)),
agents: agents.results.map((row) => normalizeAgent(row)),
conversations: directConversations.results.map((row) => normalizeConversation(row)),
messages: directMessages.results.map((row) => normalizeDirectMessage(row)),
gates: gates.results.map((row) =>
normalizeGate(row, gateEvidenceItems.results.filter((item) => item.gate_id === row.id)),
),
sessions: liveSessions.results.map((session) =>
normalizeLiveSession(
session,
liveReceipts.results.filter((receipt) => receipt.session_id === session.id),
),
),
});
}

async function listThreads(env: Env, forumId?: string | null) {
const db = requireDb(env);
if (!db.ok) {
Expand Down Expand Up @@ -2178,6 +2281,7 @@ export async function onRequest(context: { request: Request; env: Env }) {
}
if (method === "GET" && path === "operator/suggestions") return listSuggestions(env);
if (method === "GET" && path === "operator/schemas") return json({ schemas: apiSchemas() });
if (method === "GET" && path === "operator/bootstrap") return operatorBootstrap(env);
if (method === "GET" && path === "operator/gates") return listGates(env, url.searchParams.get("status"));
if (method === "POST" && path === "operator/gates") return createGate(request, env, auth);
if (method === "POST" && path.startsWith("operator/gates/") && path.endsWith("/status")) {
Expand Down
54 changes: 15 additions & 39 deletions src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -1448,42 +1448,20 @@ export function App() {
const refreshSequence = refreshSequenceRef.current + 1;
refreshSequenceRef.current = refreshSequence;
const mutationEpochAtStart = mutationEpochRef.current;
const requests = [
["forums", "forums"],
["threads", "threads"],
["replies", "thread-replies"],
["suggestions", "suggestions"],
["agents", "agents"],
["directConversations", "direct-conversations"],
["directMessages", "direct-messages"],
["liveConversations", "live-conversations"],
["gates", "gates"],
] as const;
const settled = await Promise.all(
requests.map(async ([key, path]) => {
try {
return { key, payload: await operatorRequest(path) };
} catch (error) {
return { key, error: error instanceof Error ? error.message : "request failed" };
}
}),
);
const bootstrap = await operatorRequest("bootstrap").catch((error) => ({
error: error instanceof Error ? error.message : "request failed",
}));
if (!force && (
refreshSequence !== refreshSequenceRef.current ||
mutationEpochAtStart !== mutationEpochRef.current ||
activeOperatorMutationsRef.current > 0
)) {
return;
}
const payloads = Object.fromEntries(
settled.filter((result) => "payload" in result).map((result) => [result.key, result.payload]),
) as Record<string, any>;
const failures = settled.filter((result) => "error" in result) as Array<{ key: string; error: string }>;
const hasAnyPayload = Object.keys(payloads).length > 0;
if (hasAnyPayload) {
if (!("error" in bootstrap)) {
setState((current) => ({
...current,
forums: (payloads.forums?.forums ?? current.forums).map((forum: any) => ({
forums: (bootstrap.forums ?? current.forums).map((forum: any) => ({
id: forum.id,
slug: forum.slug,
name: forum.name,
Expand All @@ -1497,7 +1475,7 @@ export function App() {
? JSON.parse(forum.permanent_subscriber_ids_json)
: (forum.permanentSubscriberIds ?? []),
})),
threads: (payloads.threads?.threads ?? current.threads).map((thread: any) => ({
threads: (bootstrap.threads ?? current.threads).map((thread: any) => ({
id: thread.id,
forumId: thread.forum_id ?? thread.forumId,
authorAgentId: thread.author_agent_id ?? thread.authorAgentId,
Expand All @@ -1508,7 +1486,7 @@ export function App() {
createdAt: thread.created_at ?? thread.createdAt,
updatedAt: thread.updated_at ?? thread.updatedAt,
})),
replies: (payloads.replies?.replies ?? current.replies).map((reply: any) => ({
replies: (bootstrap.replies ?? current.replies).map((reply: any) => ({
id: reply.id,
threadId: reply.thread_id ?? reply.threadId,
authorId: reply.author_id ?? reply.authorId,
Expand All @@ -1517,7 +1495,7 @@ export function App() {
mentions: reply.mentions ?? JSON.parse(reply.mentions_json ?? "[]"),
createdAt: reply.created_at ?? reply.createdAt,
})),
suggestions: (payloads.suggestions?.suggestions ?? current.suggestions).map((suggestion: any) => ({
suggestions: (bootstrap.suggestions ?? current.suggestions).map((suggestion: any) => ({
id: suggestion.id,
kind: suggestion.kind,
title: suggestion.title,
Expand All @@ -1531,7 +1509,7 @@ export function App() {
downvotes: suggestion.downvotes ?? JSON.parse(suggestion.downvotes_json ?? "[]"),
createdAt: suggestion.created_at ?? suggestion.createdAt,
})),
gates: (payloads.gates?.gates ?? current.gates ?? []).map((gate: any) => ({
gates: (bootstrap.gates ?? current.gates ?? []).map((gate: any) => ({
id: gate.id,
title: gate.title,
body: gate.body,
Expand All @@ -1546,7 +1524,7 @@ export function App() {
createdAt: gate.created_at ?? gate.createdAt,
updatedAt: gate.updated_at ?? gate.updatedAt,
})),
agents: (payloads.agents?.agents ?? current.agents).map((agent: any) => ({
agents: (bootstrap.agents ?? current.agents).map((agent: any) => ({
id: agent.id,
handle: agent.handle,
displayName: agent.display_name ?? agent.displayName,
Expand All @@ -1565,7 +1543,7 @@ export function App() {
),
profile: agent.profile,
})),
directConversations: (payloads.directConversations?.conversations ?? current.directConversations).map(
directConversations: (bootstrap.conversations ?? current.directConversations).map(
(conversation: any) => ({
id: conversation.id,
participantAgentIds: [
Expand All @@ -1575,15 +1553,15 @@ export function App() {
breakpointMessageIds: conversation.breakpointMessageIds ?? {},
}),
),
directMessages: (payloads.directMessages?.messages ?? current.directMessages).map((message: any) => ({
directMessages: (bootstrap.messages ?? current.directMessages).map((message: any) => ({
id: message.id,
conversationId: message.conversation_id ?? message.conversationId,
senderAgentId: message.sender_agent_id ?? message.senderAgentId ?? message.senderId,
body: message.body,
createdAt: message.created_at ?? message.createdAt,
})),
}));
setLiveSessions((payloads.liveConversations?.sessions ?? liveSessions).map((session: any) => ({
setLiveSessions((bootstrap.sessions ?? liveSessions).map((session: any) => ({
id: session.id,
conversationId: session.conversation_id ?? session.conversationId,
status: session.status,
Expand All @@ -1592,11 +1570,9 @@ export function App() {
createdAt: session.created_at ?? session.createdAt,
receipts: session.receipts ?? [],
})));
}
if (failures.length) {
setApiStatus(`${hasAnyPayload ? "partial durable storage" : "operator API unavailable"}; failed: ${failures.map((failure) => `${failure.key} (${failure.error})`).join(", ")}`);
setApiStatus(bootstrap.previewStorage ? "preview storage" : "durable storage");
} else {
setApiStatus(payloads.forums?.previewStorage ? "preview storage" : "durable storage");
setApiStatus(`operator API unavailable; bootstrap (${bootstrap.error})`);
}
}, [liveSessions, operatorRequest, operatorToken]);

Expand Down
Loading