diff --git a/apps/roam/src/utils/conceptConversion.ts b/apps/roam/src/utils/conceptConversion.ts index 0851df84a..83abd1a14 100644 --- a/apps/roam/src/utils/conceptConversion.ts +++ b/apps/roam/src/utils/conceptConversion.ts @@ -78,17 +78,21 @@ export const discourseNodeSchemaToLocalConcept = ( node: DiscourseNode, ): LocalConceptDataInput => { const titleParts = node.text.split("/"); + const label = titleParts[titleParts.length - 1] ?? node.text; const result: LocalConceptDataInput = { space_id: context.spaceId, name: node.text, source_local_id: node.type, is_schema: true, + literal_content: { + label, + }, /* eslint-enable @typescript-eslint/naming-convention */ ...getNodeExtraData(node.type), }; if (node.template !== undefined) result.literal_content = { - label: titleParts[titleParts.length - 1], + label, template: templateToText(node.template), }; return result; diff --git a/apps/roam/src/utils/convertRoamNodeToFullContent.example.ts b/apps/roam/src/utils/convertRoamNodeToFullContent.example.ts index 1e5c96310..8912d1a31 100644 --- a/apps/roam/src/utils/convertRoamNodeToFullContent.example.ts +++ b/apps/roam/src/utils/convertRoamNodeToFullContent.example.ts @@ -3,11 +3,10 @@ import type { CrossAppNode } from "@repo/database/crossAppNodeContract"; import { buildFullMarkdown } from "./convertRoamNodeToFullContent"; /** - * Typed example for ENG-1848 ("tests or fixtures cover representative Roam - * block content becoming `full` markdown"). This is not a concrete test; it - * documents the `tree.children` shape returned by `getFullTreeByParentUid` for - * a real Roam claim page and type-checks the generated markdown against the - * contract. + * Typed example for ENG-1848 full markdown coverage. This is not a concrete + * test; it documents the `tree.children` shape returned by + * `getFullTreeByParentUid` for a real Roam claim page and type-checks the + * generated markdown against the contract. * * Derived from: * https://roamresearch.com/#/app/plugin-testing-akamatsulab2/page/dnHNmYwe5 diff --git a/apps/roam/src/utils/convertRoamNodeToFullContent.simple.example.ts b/apps/roam/src/utils/convertRoamNodeToFullContent.simple.example.ts new file mode 100644 index 000000000..9b56a2999 --- /dev/null +++ b/apps/roam/src/utils/convertRoamNodeToFullContent.simple.example.ts @@ -0,0 +1,46 @@ +import type { TreeNode } from "roamjs-components/types"; +import type { CrossAppNode } from "@repo/database/crossAppNodeContract"; +import { buildFullMarkdown } from "./convertRoamNodeToFullContent"; + +/** + * Small typed example for ENG-1848/ENG-1852 full markdown validation. The Roam + * app has no unit-test runner, so this keeps a compact in-memory tree that + * type-checks the generated markdown against the shared cross-app contract. + */ + +const block = (text: string, children: TreeNode[] = []): TreeNode => ({ + text, + children, + order: 0, + parents: [], + uid: "", + heading: 0, + open: true, + viewType: "bullet", + blockViewType: "outline", + editTime: new Date(0), + textAlign: "left", + props: { imageResize: {}, iframe: {} }, +}); + +const title = "Sleep improves memory consolidation"; + +const blocks: TreeNode[] = [ + block( + "Multiple studies show that sleep after learning strengthens memory traces.", + ), + block("Supporting evidence:", [block("[[EVD]] - Rasch & Born 2013")]), +]; + +export const roamClaimFullMarkdownSimpleExample: { + title: string; + blocks: TreeNode[]; + full: CrossAppNode["content"]["full"]; +} = { + title, + blocks, + full: { + format: "text/markdown", + value: buildFullMarkdown({ title, blocks }), + }, +}; diff --git a/apps/roam/src/utils/convertRoamNodeToFullContent.ts b/apps/roam/src/utils/convertRoamNodeToFullContent.ts index 88369bb98..3981f825d 100644 --- a/apps/roam/src/utils/convertRoamNodeToFullContent.ts +++ b/apps/roam/src/utils/convertRoamNodeToFullContent.ts @@ -1,11 +1,19 @@ import { toMarkdown } from "./pageToMarkdown"; -import { type RoamDiscourseNodeData } from "./getAllDiscourseNodesSince"; import { type DiscourseNode } from "./getDiscourseNodes"; import getFullTreeByParentUid from "roamjs-components/queries/getFullTreeByParentUid"; import getPageViewType from "roamjs-components/queries/getPageViewType"; import type { TreeNode, ViewType } from "roamjs-components/types"; import type { LocalContentDataInput } from "@repo/database/inputTypes"; +export type RoamFullContentNode = { + author_local_id: string; + source_local_id: string; + created: string | number; + last_modified: string | number; + text: string; + node_title?: string; +}; + const FULL_MARKDOWN_OPTS = { refs: true, embeds: true, @@ -38,7 +46,7 @@ export const buildFullMarkdown = ({ export const convertRoamNodeToFullContent = ({ nodes, }: { - nodes: RoamDiscourseNodeData[]; + nodes: RoamFullContentNode[]; }): LocalContentDataInput[] => nodes.flatMap((node) => { try { diff --git a/apps/roam/src/utils/syncDgNodesToSupabase.ts b/apps/roam/src/utils/syncDgNodesToSupabase.ts index aaa95db71..6e35963fe 100644 --- a/apps/roam/src/utils/syncDgNodesToSupabase.ts +++ b/apps/roam/src/utils/syncDgNodesToSupabase.ts @@ -3,6 +3,7 @@ import { getAllDiscourseNodesSince, nodeTypeSince, } from "./getAllDiscourseNodesSince"; +import getDiscourseNodeFormatExpression from "./getDiscourseNodeFormatExpression"; import { cleanupOrphanedNodes } from "./cleanupOrphanedNodes"; import { getLoggedInClient, @@ -18,7 +19,10 @@ import { } from "./conceptConversion"; import { fetchEmbeddingsForNodes } from "./upsertNodesAsContentWithEmbeddings"; import { convertRoamNodeToLocalContent } from "./upsertNodesAsContentWithEmbeddings"; -import { convertRoamNodeToFullContent } from "./convertRoamNodeToFullContent"; +import { + convertRoamNodeToFullContent, + type RoamFullContentNode, +} from "./convertRoamNodeToFullContent"; import type { DGSupabaseClient } from "@repo/database/lib/client"; import { intersection } from "@repo/utils/setOperations"; import type { Json, Enums } from "@repo/database/dbTypes"; @@ -42,6 +46,7 @@ const SYNC_TIMEOUT = "60s"; // must be less than half the SYNC_INTERVAL. const BATCH_SIZE = 200; const CONCEPT_BATCH_SIZE = 200; const END_SYNC_TASK_RESULT_VERSION = 1; +const DEFAULT_SYNC_TIME = new Date("1970-01-01").getTime(); type SyncPhaseDurations = Record; @@ -556,16 +561,29 @@ export const convertDgToSupabaseConcepts = async ({ nodesSince, since, allNodeTypes, + sharedNodeTypeIds = new Set(), supabaseClient, context, }: { nodesSince: RoamDiscourseNodeData[]; since: number | undefined; allNodeTypes: DiscourseNode[]; + sharedNodeTypeIds?: ReadonlySet; supabaseClient: DGSupabaseClient; context: SupabaseContext; }) => { - const nodeTypes = await nodeTypeSince(since, allNodeTypes); + const changedNodeTypes = await nodeTypeSince(since, allNodeTypes); + const nodeTypesByUid = new Map( + changedNodeTypes.map((nodeType) => [nodeType.type, nodeType]), + ); + + allNodeTypes.forEach((nodeType) => { + if (sharedNodeTypeIds.has(nodeType.type)) { + nodeTypesByUid.set(nodeType.type, nodeType); + } + }); + const nodeTypes = Array.from(nodeTypesByUid.values()); + await upsertNodeSchemaToContent({ nodeTypesUids: nodeTypes.map((node) => node.type), spaceId: context.spaceId, @@ -606,15 +624,44 @@ export const convertDgToSupabaseConcepts = async ({ }); }; +const uploadContentBatches = async ({ + content, + supabaseClient, + context, +}: { + content: LocalContentDataInput[]; + supabaseClient: DGSupabaseClient; + context: SupabaseContext; +}): Promise => { + if (content.length === 0) { + return; + } + + const batches = chunk(content, BATCH_SIZE); + + for (let idx = 0; idx < batches.length; idx++) { + const batch = batches[idx]; + + const { error } = await supabaseClient.rpc("upsert_content", { + data: batch as Json, + v_space_id: context.spaceId, + v_creator_id: context.userId, + content_as_document: true, + }); + + if (error) { + throw new Error(`upsert_content failed for batch ${idx + 1}`, { + cause: error, + }); + } + } +}; + export const upsertNodesToSupabaseAsContentWithEmbeddings = async ( roamNodes: RoamDiscourseNodeData[], supabaseClient: DGSupabaseClient, context: SupabaseContext, - options: { includeFullContent?: boolean } = {}, ): Promise => { - const { userId } = context; - const { includeFullContent = false } = options; - if (roamNodes.length === 0) { return; } @@ -622,34 +669,6 @@ export const upsertNodesToSupabaseAsContentWithEmbeddings = async ( nodes: roamNodes, }); - const uploadBatches = async ( - batches: LocalContentDataInput[][], - ): Promise => { - for (let idx = 0; idx < batches.length; idx++) { - const batch = batches[idx]; - - const { error } = await supabaseClient.rpc("upsert_content", { - data: batch as Json, - v_space_id: context.spaceId, - v_creator_id: userId, - content_as_document: true, - }); - - if (error) { - throw new Error(`upsert_content failed for batch ${idx + 1}`, { - cause: error, - }); - } - } - }; - - if (includeFullContent) { - const fullContent = convertRoamNodeToFullContent({ - nodes: roamNodes, - }); - await uploadBatches(chunk(fullContent, BATCH_SIZE)); - } - let nodesWithEmbeddings: LocalContentDataInput[]; try { nodesWithEmbeddings = await fetchEmbeddingsForNodes( @@ -672,7 +691,32 @@ export const upsertNodesToSupabaseAsContentWithEmbeddings = async ( ); } - await uploadBatches(chunk(nodesWithEmbeddings, BATCH_SIZE)); + await uploadContentBatches({ + content: nodesWithEmbeddings, + supabaseClient, + context, + }); +}; + +const upsertRoamNodesToSupabaseAsFullContent = async ({ + nodes, + supabaseClient, + context, +}: { + nodes: RoamFullContentNode[]; + supabaseClient: DGSupabaseClient; + context: SupabaseContext; +}): Promise => { + if (nodes.length === 0) { + return; + } + + const fullContent = convertRoamNodeToFullContent({ nodes }); + await uploadContentBatches({ + content: fullContent, + supabaseClient, + context, + }); }; const getAllUsers = async (): Promise => { @@ -782,6 +826,108 @@ const getAllMissingOrNewDiscourseNodes = async ({ ]; }; +const getSharedSourceLocalIds = async ({ + supabaseClient, + spaceId, +}: { + supabaseClient: DGSupabaseClient; + spaceId: number; +}): Promise> => { + const sharedResources = await getAllPages( + supabaseClient + .from("ResourceAccess") + .select("source_local_id") + .eq("space_id", spaceId) + .order("source_local_id") + .order("account_uid"), + 1000, + ); + + if (!Array.isArray(sharedResources)) throw sharedResources; + + return new Set(sharedResources.map((resource) => resource.source_local_id)); +}; + +type SharedFullContentUpdateRow = { + author_local_id: string; + source_local_id: string; + created: number; + node_edit_time: number; + page_edit_time: number; + text: string; +}; + +type SharedFullContentUpdate = { + fullContentNode: RoamFullContentNode; + nodeTypeId: string; +}; + +const getSharedRoamNodesWithFullContentUpdatesSince = async ({ + sourceLocalIds, + since, + nodeTypes, +}: { + sourceLocalIds: ReadonlySet; + since: number | undefined; + nodeTypes: DiscourseNode[]; +}): Promise => { + const sharedSourceLocalIds = Array.from(sourceLocalIds); + if (sharedSourceLocalIds.length === 0 || nodeTypes.length === 0) { + return []; + } + + const sinceMs = since ?? DEFAULT_SYNC_TIME; + const query = `[ + :find ?node-title ?uid ?nodeCreateTime ?nodeEditTime ?pageEditTime ?author_local_id + :keys text source_local_id created node_edit_time page_edit_time author_local_id + :in $ [?sharedUid ...] ?since + :where + [?node :block/uid ?sharedUid] + [?node :node/title ?node-title] + [?node :block/uid ?uid] + [?node :create/time ?nodeCreateTime] + [?node :create/user ?user-eid] + [?user-eid :user/uid ?author_local_id] + [(get-else $ ?node :edit/time ?nodeCreateTime) ?nodeEditTime] + [(get-else $ ?node :page/edit-time ?nodeEditTime) ?pageEditTime] + [or + [(> ?nodeEditTime ?since)] + [(> ?pageEditTime ?since)]] + ]`; + + const rows = (await window.roamAlphaAPI.data.backend.q( + query, + sharedSourceLocalIds, + sinceMs, + )) as unknown[] as SharedFullContentUpdateRow[]; + const typeMatchers = nodeTypes.map((node) => ({ + node, + regex: getDiscourseNodeFormatExpression(node.format), + })); + + return rows.flatMap((row) => { + const matchingNodeType = typeMatchers.find(({ regex }) => + regex.test(row.text), + )?.node; + if (matchingNodeType === undefined) { + return []; + } + + return [ + { + fullContentNode: { + author_local_id: row.author_local_id, + source_local_id: row.source_local_id, + created: row.created, + last_modified: Math.max(row.node_edit_time, row.page_edit_time), + text: row.text, + }, + nodeTypeId: matchingNodeType.type, + }, + ]; + }); +}; + export const createOrUpdateDiscourseEmbedding = async ( showToast = false, ): Promise => { @@ -899,7 +1045,7 @@ export const createOrUpdateDiscourseEmbedding = async ( (n) => n.backedBy === "user", ); - const allNodeInstances = await measureSyncPhase({ + const changedNodeInstances = await measureSyncPhase({ phase: isInitialSync ? "getAllMissingOrNewDiscourseNodes" : "getAllDiscourseNodesSince", @@ -914,6 +1060,32 @@ export const createOrUpdateDiscourseEmbedding = async ( }) : getAllDiscourseNodesSince(sinceTime, allDgNodeTypes), }); + const sharedSourceLocalIds = await measureSyncPhase({ + phase: "getSharedSourceLocalIds", + phases, + operation: () => + getSharedSourceLocalIds({ + supabaseClient: activeSupabaseClient, + spaceId: activeContext.spaceId, + }), + }); + const sharedFullContentUpdates = await measureSyncPhase({ + phase: "getSharedFullContentUpdates", + phases, + operation: () => + getSharedRoamNodesWithFullContentUpdatesSince({ + sourceLocalIds: sharedSourceLocalIds, + since: sinceTime, + nodeTypes: allDgNodeTypes, + }), + }); + const sharedFullContentNodes = sharedFullContentUpdates.map( + (update) => update.fullContentNode, + ); + const sharedNodeTypeIds = new Set( + sharedFullContentUpdates.map((update) => update.nodeTypeId), + ); + await measureSyncPhase({ phase: "upsertUsers", phases, @@ -925,19 +1097,30 @@ export const createOrUpdateDiscourseEmbedding = async ( phases, operation: () => upsertNodesToSupabaseAsContentWithEmbeddings( - allNodeInstances, + changedNodeInstances, activeSupabaseClient, activeContext, ), }); + await measureSyncPhase({ + phase: "upsertFullContent", + phases, + operation: () => + upsertRoamNodesToSupabaseAsFullContent({ + nodes: sharedFullContentNodes, + supabaseClient: activeSupabaseClient, + context: activeContext, + }), + }); await measureSyncPhase({ phase: "convertConcepts", phases, operation: () => convertDgToSupabaseConcepts({ - nodesSince: allNodeInstances, + nodesSince: changedNodeInstances, since: sinceTime, allNodeTypes: allDgNodeTypes, + sharedNodeTypeIds, supabaseClient: activeSupabaseClient, context: activeContext, }),