diff --git a/CHANGELOG.md b/CHANGELOG.md index 3af0fc3494..5672a1543f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,9 @@ and this project adheres to ### Fixed +- Fix version-stuck bug where the collaborative editor shows stale state after a + sandbox merge or CLI deploy. + [#4535](https://github.com/OpenFn/lightning/issues/4535) - Copying api tokens doesn't work on unsecure non-localhost contexts [PR#4551](https://github.com/OpenFn/lightning/pull/4551) - Fix AI assistant authorization for support users on projects with support diff --git a/lib/lightning/collaboration.ex b/lib/lightning/collaboration.ex index cd05eb6fe0..81b9bd386b 100644 --- a/lib/lightning/collaboration.ex +++ b/lib/lightning/collaboration.ex @@ -25,6 +25,7 @@ defmodule Lightning.Collaborate do Collaborate.start(user: user, workflow: workflow) """ + alias Lightning.Accounts.User alias Lightning.Collaboration.DocumentSupervisor alias Lightning.Collaboration.Registry alias Lightning.Collaboration.Session @@ -36,6 +37,20 @@ defmodule Lightning.Collaborate do @spec start(opts :: Keyword.t()) :: GenServer.on_start() def start(opts) do + case do_start(opts) do + {:error, {:error, :shared_doc_not_found}} -> + # A SharedDoc was registered in :pg but died before the Session could + # observe it (0ms auto_exit race). Yield one ms — enough for the timer + # to fire and clear :pg — then try once more from scratch. + Process.sleep(1) + do_start(opts) + + result -> + result + end + end + + defp do_start(opts) do session_id = Ecto.UUID.generate() parent_pid = Keyword.get(opts, :parent_pid, self()) @@ -64,13 +79,15 @@ defmodule Lightning.Collaborate do end # Start session for this user + user_id = if is_struct(user, User), do: user.id, else: nil + SessionSupervisor.start_child({ Session, workflow: workflow, user: user, parent_pid: parent_pid, document_name: document_name, - name: Registry.via({:session, "#{document_name}:#{session_id}", user.id}) + name: Registry.via({:session, "#{document_name}:#{session_id}", user_id}) }) end diff --git a/lib/lightning/collaboration/persistence.ex b/lib/lightning/collaboration/persistence.ex index 10ff16204b..3f5fe70123 100644 --- a/lib/lightning/collaboration/persistence.ex +++ b/lib/lightning/collaboration/persistence.ex @@ -11,7 +11,6 @@ defmodule Lightning.Collaboration.Persistence do alias Lightning.Collaboration.DocumentState alias Lightning.Collaboration.PersistenceWriter alias Lightning.Collaboration.Session - alias Lightning.Collaboration.WorkflowSerializer require Logger @@ -30,7 +29,6 @@ defmodule Lightning.Collaboration.Persistence do case DocumentState.get_checkpoint_and_updates(doc_name) do {:ok, checkpoint, updates} -> apply_persisted_state(doc, doc_name, checkpoint, updates) - reconcile_or_reset(doc, doc_name, workflow) {:error, :not_found} -> Logger.info( @@ -89,96 +87,4 @@ defmodule Lightning.Collaboration.Persistence do DocumentState.apply_to_doc(doc, checkpoint, updates) Logger.debug("Loaded #{length(updates)} updates. document=#{doc_name}") end - - defp reconcile_or_reset(doc, doc_name, workflow) do - workflow_map = Yex.Doc.get_map(doc, "workflow") - persisted_lock_version = extract_lock_version(workflow_map) - current_lock_version = workflow.lock_version - - if stale?(persisted_lock_version, current_lock_version) do - Logger.warning(""" - Persisted Y.Doc is stale (persisted: #{inspect(persisted_lock_version)}, \ - current: #{current_lock_version}) - Discarding persisted state and reloading from database. - document=#{doc_name} - """) - - clear_and_reset_workflow(doc, workflow) - else - Logger.debug( - "Persisted Y.Doc is current (lock_version: #{current_lock_version}). document=#{doc_name}" - ) - - reconcile_workflow_metadata(doc, workflow) - end - end - - defp extract_lock_version(workflow_map) do - case Yex.Map.fetch(workflow_map, "lock_version") do - {:ok, version} when is_float(version) -> trunc(version) - {:ok, version} when is_integer(version) -> version - {:ok, nil} -> nil - :error -> nil - end - end - - defp stale?(nil, current_version), do: not is_nil(current_version) - - defp stale?(persisted_version, current_version), - do: persisted_version != current_version - - defp clear_and_reset_workflow(doc, workflow) do - # Same pattern as Session.clear_and_reset_doc - # Get all Yex collections BEFORE transaction to avoid VM deadlock - jobs_array = Yex.Doc.get_array(doc, "jobs") - edges_array = Yex.Doc.get_array(doc, "edges") - triggers_array = Yex.Doc.get_array(doc, "triggers") - - # Transaction 1: Clear all arrays - Yex.Doc.transaction(doc, "clear_stale_workflow", fn -> - clear_array(jobs_array) - clear_array(edges_array) - clear_array(triggers_array) - end) - - # Transaction 2: Re-serialize workflow from database - Session.initialize_workflow_document(doc, workflow) - - :ok - end - - defp clear_array(array) do - length = Yex.Array.length(array) - - if length > 0 do - Yex.Array.delete_range(array, 0, length) - end - end - - defp reconcile_workflow_metadata(doc, workflow) do - # Update workflow metadata fields to match current database state - # This is critical when loading persisted Y.Doc state that may be stale - workflow_map = Yex.Doc.get_map(doc, "workflow") - - Yex.Doc.transaction(doc, "reconcile_metadata", fn -> - # Update lock_version to current database value - Yex.Map.set(workflow_map, "lock_version", workflow.lock_version) - - # Update name in case it changed - Yex.Map.set(workflow_map, "name", workflow.name) - - # Update deleted_at if present - Yex.Map.set( - workflow_map, - "deleted_at", - WorkflowSerializer.datetime_to_string(workflow.deleted_at) - ) - end) - - Logger.debug( - "Reconciled workflow metadata: lock_version=#{workflow.lock_version}, name=#{workflow.name}" - ) - - :ok - end end diff --git a/lib/lightning/collaboration/session.ex b/lib/lightning/collaboration/session.ex index 164d4b1857..0c41effe60 100644 --- a/lib/lightning/collaboration/session.ex +++ b/lib/lightning/collaboration/session.ex @@ -22,6 +22,7 @@ defmodule Lightning.Collaboration.Session do alias Lightning.Accounts.User alias Lightning.Collaboration.WorkflowSerializer + alias Lightning.VersionControl.ProjectRepoConnection alias Lightning.Workflows.Presence alias Lightning.Workflows.WorkflowUsageLimiter alias Yex.Sync.SharedDoc @@ -41,7 +42,7 @@ defmodule Lightning.Collaboration.Session do @type start_opts :: [ workflow: Lightning.Workflows.Workflow.t(), - user: User.t(), + user: User.t() | ProjectRepoConnection.t(), parent_pid: pid() ] @@ -69,6 +70,10 @@ defmodule Lightning.Collaboration.Session do GenServer.stop(session_pid) end + def shared_doc_pid(session_pid) do + GenServer.call(session_pid, :shared_doc_pid) + end + def child_spec(opts) do {opts, args} = Keyword.put_new_lazy(opts, :session_id, fn -> Ecto.UUID.generate() end) @@ -114,20 +119,26 @@ defmodule Lightning.Collaboration.Session do {:stop, {:error, :shared_doc_not_found}} shared_doc_pid -> - SharedDoc.observe(shared_doc_pid) - Logger.info("Joined SharedDoc for #{document_name}") - - # We track the user presence here so the the original WorkflowLive.Edit - # can be stopped from editing the workflow when someone else is editing it. - # Note: Presence tracking uses workflow.id, not document_name, because - # presence is about showing who is editing the workflow, not which version - Presence.track_user_presence( - user, - workflow.id, - self() - ) - - {:ok, %{state | shared_doc_pid: shared_doc_pid}} + try do + SharedDoc.observe(shared_doc_pid) + Logger.info("Joined SharedDoc for #{document_name}") + + # We track the user presence here so the the original WorkflowLive.Edit + # can be stopped from editing the workflow when someone else is editing it. + # Note: Presence tracking uses workflow.id, not document_name, because + # presence is about showing who is editing the workflow, not which version. + # Only track presence for real users — not system actors like ProjectRepoConnection. + if is_struct(user, User), + do: Presence.track_user_presence(user, workflow.id, self()) + + {:ok, %{state | shared_doc_pid: shared_doc_pid}} + catch + # GenServer.call raises an exit (not a rescuable exception) when the + # target process is dead. SharedDoc may have been registered in :pg + # but died before we could observe it (0ms auto_exit race). + # Return cleanly so Collaborate.start can retry. + :exit, _ -> {:stop, {:error, :shared_doc_not_found}} + end end end @@ -144,11 +155,8 @@ defmodule Lightning.Collaboration.Session do SharedDoc.unobserve(shared_doc_pid) end - Presence.untrack_user_presence( - state.user, - state.workflow.id, - self() - ) + if is_struct(state.user, User), + do: Presence.untrack_user_presence(state.user, state.workflow.id, self()) :ok end @@ -259,6 +267,11 @@ defmodule Lightning.Collaboration.Session do GenServer.call(session_pid, {:reset_workflow, user}, 10_000) end + @impl true + def handle_call(:shared_doc_pid, _from, state) do + {:reply, state.shared_doc_pid, state} + end + @impl true def handle_call(:get_doc, _from, %{shared_doc_pid: shared_doc_pid} = state) do {:reply, SharedDoc.get_doc(shared_doc_pid), state} diff --git a/lib/lightning/collaboration/workflow_reconciler.ex b/lib/lightning/collaboration/workflow_reconciler.ex index 6fa7562f21..40b1e9cd8b 100644 --- a/lib/lightning/collaboration/workflow_reconciler.ex +++ b/lib/lightning/collaboration/workflow_reconciler.ex @@ -6,10 +6,13 @@ defmodule Lightning.Collaboration.WorkflowReconciler do """ import Ecto.Changeset, only: [get_field: 2] + alias Lightning.Collaborate alias Lightning.Collaboration.Session + alias Lightning.Collaboration.WorkflowSerializer alias Lightning.Workflows.Edge alias Lightning.Workflows.Job alias Lightning.Workflows.Trigger + alias Lightning.Workflows.Triggers.KafkaConfiguration alias Lightning.Workflows.Workflow alias Yex.Doc alias Yex.Sync.SharedDoc @@ -50,6 +53,372 @@ defmodule Lightning.Collaboration.WorkflowReconciler do :ok end + @doc """ + Reconciles the SharedDoc for a workflow with the current DB state. + + Called after an external write (provisioner import, sandbox merge) has + already updated the database. Goes through `Collaborate.start/1` regardless + of whether anyone is online — this is the unified path for both cases: + + - Nobody online: starts a new document, applies the diff, stops. The + document flushes to DB on shutdown, so the next user opens correct state. + - Someone online: joins the existing document, applies the diff (broadcast + to live users in real time), stops. The document stays alive. + + Unlike `reconcile_workflow_changes/2`, this is state-driven: it diffs the + live Y.Doc against the DB and applies the minimum targeted operations — + deleting phantom items (e.g. unsaved jobs from open tabs), inserting new + items, and updating changed items in-place. CRDT IDs are preserved for + unchanged items. + """ + @spec reconcile_workflow_from_db( + Workflow.t(), + Lightning.Accounts.User.t() + | Lightning.VersionControl.ProjectRepoConnection.t() + ) :: :ok + def reconcile_workflow_from_db(%Workflow{} = workflow, actor) do + {:ok, session_pid} = Collaborate.start(workflow: workflow, user: actor) + + Session.update_doc(session_pid, fn doc -> + apply_db_state_to_doc(doc, workflow) + end) + + Session.stop(session_pid) + + Phoenix.PubSub.broadcast( + Lightning.PubSub, + "workflow:collaborate:#{workflow.id}", + {:workflow_updated_externally, workflow} + ) + + :ok + rescue + error -> + # Intentional: reconciler failure must never fail the provisioner. + # This also catches Session startup failures (e.g. Collaborate.start + # returns {:error, _} and the match raises), so the log is the only + # signal that reconciliation was skipped. + Logger.error( + "Failed to reconcile SharedDoc for workflow #{workflow.id}: #{inspect(error)}" + ) + + :ok + end + + defp apply_db_state_to_doc(doc, workflow) do + # Step 1: Pre-fetch all root Yex objects BEFORE transaction (avoids VM deadlock) + workflow_map = Doc.get_map(doc, "workflow") + jobs_array = Doc.get_array(doc, "jobs") + edges_array = Doc.get_array(doc, "edges") + triggers_array = Doc.get_array(doc, "triggers") + + # Step 2: Read current Y.Doc state as plain Elixir lists (still before transaction) + ydoc_jobs = Yex.Array.to_list(jobs_array) + ydoc_edges = Yex.Array.to_list(edges_array) + ydoc_triggers = Yex.Array.to_list(triggers_array) + + # Step 3: Pre-fetch body Y.Text references — must happen before transaction + body_texts = + Map.new(ydoc_jobs, fn job_map -> + id = Yex.Map.fetch!(job_map, "id") + {:ok, body} = Yex.Map.fetch(job_map, "body") + {id, body} + end) + + # Step 4: Compute all operations before opening the transaction + # Note: positions are intentionally not reconciled here. The provisioner + # changeset does not accept positions (validate_extraneous_params rejects + # them), so the DB value can never be newer than the Y.Doc. Reconciling + # positions would risk overwriting unsaved node drags from open editor tabs. + job_ops = compute_job_ops(jobs_array, ydoc_jobs, workflow.jobs, body_texts) + edge_ops = compute_edge_ops(edges_array, ydoc_edges, workflow.edges) + + trigger_ops = + compute_trigger_ops(triggers_array, ydoc_triggers, workflow.triggers) + + # Step 5: Apply everything in ONE transaction + Doc.transaction(doc, "provisioner_reconcile", fn -> + update_workflow_metadata(workflow_map, workflow) + Enum.each(job_ops ++ edge_ops ++ trigger_ops, &apply_reconcile_op/1) + end) + end + + defp update_workflow_metadata(workflow_map, workflow) do + Yex.Map.set(workflow_map, "lock_version", workflow.lock_version) + Yex.Map.set(workflow_map, "name", workflow.name || "") + Yex.Map.set(workflow_map, "concurrency", workflow.concurrency) + Yex.Map.set(workflow_map, "enable_job_logs", workflow.enable_job_logs) + + Yex.Map.set( + workflow_map, + "deleted_at", + WorkflowSerializer.datetime_to_string(workflow.deleted_at) + ) + end + + # --------------------------------------------------------------------------- + # Job operations + # --------------------------------------------------------------------------- + + defp compute_job_ops(jobs_array, ydoc_jobs, db_jobs, body_texts) do + ydoc_ids = ydoc_jobs |> Enum.map(&Yex.Map.fetch!(&1, "id")) |> MapSet.new() + db_ids = db_jobs |> Enum.map(& &1.id) |> MapSet.new() + + phantom_ids = MapSet.difference(ydoc_ids, db_ids) + new_ids = MapSet.difference(db_ids, ydoc_ids) + existing_ids = MapSet.intersection(ydoc_ids, db_ids) + + delete_ops = + phantom_ids + |> Enum.map(&find_index_in_array(jobs_array, &1)) + |> Enum.reject(&is_nil/1) + |> Enum.sort(:desc) + |> Enum.map(&{:delete, jobs_array, &1}) + + insert_ops = + db_jobs + |> Enum.filter(&MapSet.member?(new_ids, &1.id)) + |> Enum.map(&{:insert, jobs_array, job_to_prelim(&1)}) + + update_ops = + db_jobs + |> Enum.filter(&MapSet.member?(existing_ids, &1.id)) + |> Enum.flat_map(fn db_job -> + ydoc_job = Enum.find(ydoc_jobs, &(Yex.Map.fetch!(&1, "id") == db_job.id)) + body_text = Map.get(body_texts, db_job.id) + job_update_ops(ydoc_job, body_text, db_job) + end) + + delete_ops ++ insert_ops ++ update_ops + end + + defp job_to_prelim(job) do + Yex.MapPrelim.from(%{ + "id" => job.id, + "name" => job.name || "", + "body" => Yex.TextPrelim.from(job.body || ""), + "adaptor" => job.adaptor, + "project_credential_id" => job.project_credential_id, + "keychain_credential_id" => job.keychain_credential_id + }) + end + + defp job_update_ops(ydoc_job, body_text, db_job) do + field_ops = + [ + {"name", db_job.name || ""}, + {"adaptor", db_job.adaptor}, + {"project_credential_id", db_job.project_credential_id}, + {"keychain_credential_id", db_job.keychain_credential_id} + ] + |> Enum.flat_map(fn {key, db_val} -> + case Yex.Map.fetch(ydoc_job, key) do + {:ok, ^db_val} -> [] + _ -> [{:set_field, ydoc_job, key, db_val}] + end + end) + + body_ops = + case body_text do + %Yex.Text{} = text -> + db_body = db_job.body || "" + current = Yex.Text.to_string(text) + if current != db_body, do: [{:update_text, text, db_body}], else: [] + + _ -> + [] + end + + field_ops ++ body_ops + end + + # --------------------------------------------------------------------------- + # Edge operations + # --------------------------------------------------------------------------- + + defp compute_edge_ops(edges_array, ydoc_edges, db_edges) do + ydoc_ids = ydoc_edges |> Enum.map(&Yex.Map.fetch!(&1, "id")) |> MapSet.new() + db_ids = db_edges |> Enum.map(& &1.id) |> MapSet.new() + + phantom_ids = MapSet.difference(ydoc_ids, db_ids) + new_ids = MapSet.difference(db_ids, ydoc_ids) + existing_ids = MapSet.intersection(ydoc_ids, db_ids) + + delete_ops = + phantom_ids + |> Enum.map(&find_index_in_array(edges_array, &1)) + |> Enum.reject(&is_nil/1) + |> Enum.sort(:desc) + |> Enum.map(&{:delete, edges_array, &1}) + + insert_ops = + db_edges + |> Enum.filter(&MapSet.member?(new_ids, &1.id)) + |> Enum.map(&{:insert, edges_array, edge_to_prelim(&1)}) + + update_ops = + db_edges + |> Enum.filter(&MapSet.member?(existing_ids, &1.id)) + |> Enum.flat_map(fn db_edge -> + ydoc_edge = + Enum.find(ydoc_edges, &(Yex.Map.fetch!(&1, "id") == db_edge.id)) + + edge_update_ops(ydoc_edge, db_edge) + end) + + delete_ops ++ insert_ops ++ update_ops + end + + defp edge_to_prelim(edge) do + Yex.MapPrelim.from(%{ + "id" => edge.id, + "condition_expression" => edge.condition_expression, + "condition_label" => edge.condition_label, + "condition_type" => to_string(edge.condition_type), + "enabled" => edge.enabled, + "source_job_id" => edge.source_job_id, + "source_trigger_id" => edge.source_trigger_id, + "target_job_id" => edge.target_job_id + }) + end + + defp edge_update_ops(ydoc_edge, db_edge) do + [ + {"condition_expression", db_edge.condition_expression}, + {"condition_label", db_edge.condition_label}, + {"condition_type", to_string(db_edge.condition_type)}, + {"enabled", db_edge.enabled}, + {"source_job_id", db_edge.source_job_id}, + {"source_trigger_id", db_edge.source_trigger_id}, + {"target_job_id", db_edge.target_job_id} + ] + |> Enum.flat_map(fn {key, db_val} -> + case Yex.Map.fetch(ydoc_edge, key) do + {:ok, ^db_val} -> [] + _ -> [{:set_field, ydoc_edge, key, db_val}] + end + end) + end + + # --------------------------------------------------------------------------- + # Trigger operations + # --------------------------------------------------------------------------- + + defp compute_trigger_ops(triggers_array, ydoc_triggers, db_triggers) do + ydoc_ids = + ydoc_triggers |> Enum.map(&Yex.Map.fetch!(&1, "id")) |> MapSet.new() + + db_ids = db_triggers |> Enum.map(& &1.id) |> MapSet.new() + + phantom_ids = MapSet.difference(ydoc_ids, db_ids) + new_ids = MapSet.difference(db_ids, ydoc_ids) + existing_ids = MapSet.intersection(ydoc_ids, db_ids) + + delete_ops = + phantom_ids + |> Enum.map(&find_index_in_array(triggers_array, &1)) + |> Enum.reject(&is_nil/1) + |> Enum.sort(:desc) + |> Enum.map(&{:delete, triggers_array, &1}) + + insert_ops = + db_triggers + |> Enum.filter(&MapSet.member?(new_ids, &1.id)) + |> Enum.map(&{:insert, triggers_array, trigger_to_prelim(&1)}) + + update_ops = + db_triggers + |> Enum.filter(&MapSet.member?(existing_ids, &1.id)) + |> Enum.flat_map(fn db_trigger -> + ydoc_trigger = + Enum.find(ydoc_triggers, &(Yex.Map.fetch!(&1, "id") == db_trigger.id)) + + trigger_update_ops(ydoc_trigger, db_trigger) + end) + + delete_ops ++ insert_ops ++ update_ops + end + + defp trigger_to_prelim(trigger) do + kafka_configuration = + trigger.kafka_configuration && + Yex.MapPrelim.from(%{ + "connect_timeout" => trigger.kafka_configuration.connect_timeout, + "group_id" => trigger.kafka_configuration.group_id, + "hosts_string" => + KafkaConfiguration.generate_hosts_string( + trigger.kafka_configuration.hosts + ), + "initial_offset_reset_policy" => + trigger.kafka_configuration.initial_offset_reset_policy, + "password" => trigger.kafka_configuration.password, + "sasl" => to_string(trigger.kafka_configuration.sasl), + "ssl" => trigger.kafka_configuration.ssl, + "topics_string" => + KafkaConfiguration.generate_topics_string( + trigger.kafka_configuration.topics + ), + "username" => trigger.kafka_configuration.username + }) + + Yex.MapPrelim.from(%{ + "id" => trigger.id, + "type" => to_string(trigger.type), + "enabled" => trigger.enabled, + "cron_expression" => trigger.cron_expression, + "cron_cursor_job_id" => trigger.cron_cursor_job_id, + "webhook_reply" => + trigger.webhook_reply && to_string(trigger.webhook_reply), + "kafka_configuration" => kafka_configuration + }) + end + + defp trigger_update_ops(ydoc_trigger, db_trigger) do + [ + {"type", to_string(db_trigger.type)}, + {"enabled", db_trigger.enabled}, + {"cron_expression", db_trigger.cron_expression}, + {"cron_cursor_job_id", db_trigger.cron_cursor_job_id}, + {"webhook_reply", + db_trigger.webhook_reply && to_string(db_trigger.webhook_reply)} + ] + |> Enum.flat_map(fn {key, db_val} -> + case Yex.Map.fetch(ydoc_trigger, key) do + {:ok, ^db_val} -> [] + _ -> [{:set_field, ydoc_trigger, key, db_val}] + end + end) + + # kafka_configuration nested fields are not updated here — the provisioner + # does not expose kafka config changes through this reconciliation path. + end + + # --------------------------------------------------------------------------- + # Apply reconcile operations + # --------------------------------------------------------------------------- + + defp apply_reconcile_op({:insert, array, prelim}) do + Yex.Array.push(array, prelim) + end + + defp apply_reconcile_op({:delete, array, index}) do + Yex.Array.delete(array, index) + end + + defp apply_reconcile_op({:set_field, yex_map, key, value}) do + Yex.Map.set(yex_map, key, value) + end + + defp apply_reconcile_op({:update_text, text, new_content}) do + len = Yex.Text.length(text) + if len > 0, do: Yex.Text.delete(text, 0, len) + Yex.Text.insert(text, 0, new_content) + end + + # --------------------------------------------------------------------------- + # Existing changeset-driven reconciler (unchanged below) + # --------------------------------------------------------------------------- + defp generate_ydoc_operations(%Ecto.Changeset{} = changeset, workflow, doc) do [ :jobs, diff --git a/lib/lightning/projects/provisioner.ex b/lib/lightning/projects/provisioner.ex index 37aed26c40..a774cebb7f 100644 --- a/lib/lightning/projects/provisioner.ex +++ b/lib/lightning/projects/provisioner.ex @@ -25,6 +25,7 @@ defmodule Lightning.Projects.Provisioner do alias Lightning.VersionControl.ProjectRepoConnection alias Lightning.VersionControl.VersionControlUsageLimiter + alias Lightning.Collaboration.WorkflowReconciler alias Lightning.Workflows alias Lightning.Workflows.Audit alias Lightning.Workflows.Edge @@ -62,43 +63,63 @@ defmodule Lightning.Projects.Provisioner do def import_document(project, user_or_repo_connection, data, opts) do allow_stale = Keyword.get(opts, :allow_stale, false) - Repo.transact(fn -> - with :ok <- maybe_limit_provisioning(project.id, user_or_repo_connection), - project_changeset <- - build_import_changeset(project, user_or_repo_connection, data), - edges_to_cleanup <- - edges_referencing_deleted_jobs(project_changeset), - {:ok, %{workflows: workflows} = project} <- - Repo.insert_or_update(project_changeset, allow_stale: allow_stale), - :ok <- cleanup_orphaned_edges(edges_to_cleanup), - :ok <- handle_collection_deletion(project_changeset), - updated_project <- preload_dependencies(project), - {:ok, _changes} <- - audit_workflows(project_changeset, user_or_repo_connection), - {:ok, _changes} <- - update_workflows_version( - project_changeset, - updated_project.workflows - ), - {:ok, _changes} <- - create_snapshots( - project_changeset, - updated_project.workflows, - user_or_repo_connection - ) do - Enum.each(workflows, &Workflows.Events.workflow_updated/1) - - project_changeset - |> get_assoc(:workflows) - |> Enum.each(&Workflows.publish_kafka_trigger_events/1) - - Lightning.Projects.SandboxPromExPlugin.fire_provisioner_import_event( - Lightning.Projects.Project.sandbox?(updated_project) + with {:ok, updated_project} <- + Repo.transact(fn -> + with :ok <- + maybe_limit_provisioning(project.id, user_or_repo_connection), + project_changeset <- + build_import_changeset( + project, + user_or_repo_connection, + data + ), + edges_to_cleanup <- + edges_referencing_deleted_jobs(project_changeset), + {:ok, %{workflows: workflows} = project} <- + Repo.insert_or_update(project_changeset, + allow_stale: allow_stale + ), + :ok <- cleanup_orphaned_edges(edges_to_cleanup), + :ok <- handle_collection_deletion(project_changeset), + updated_project <- preload_dependencies(project), + {:ok, _changes} <- + audit_workflows(project_changeset, user_or_repo_connection), + {:ok, _changes} <- + update_workflows_version( + project_changeset, + updated_project.workflows + ), + {:ok, _changes} <- + create_snapshots( + project_changeset, + updated_project.workflows, + user_or_repo_connection + ) do + Enum.each(workflows, &Workflows.Events.workflow_updated/1) + + project_changeset + |> get_assoc(:workflows) + |> Enum.each(&Workflows.publish_kafka_trigger_events/1) + + Lightning.Projects.SandboxPromExPlugin.fire_provisioner_import_event( + Lightning.Projects.Project.sandbox?(updated_project) + ) + + {:ok, updated_project} + end + end) do + # Reconcile SharedDocs OUTSIDE the transaction — the reconciler + # mutates in-memory CRDT processes, not the DB. + Enum.each( + updated_project.workflows, + &WorkflowReconciler.reconcile_workflow_from_db( + &1, + user_or_repo_connection ) + ) - {:ok, updated_project} - end - end) + {:ok, updated_project} + end end defp build_import_changeset(project, user_or_repo_connection, data) do diff --git a/lib/lightning_web/channels/workflow_channel.ex b/lib/lightning_web/channels/workflow_channel.ex index 7d6ab4ad3c..850c1570f2 100644 --- a/lib/lightning_web/channels/workflow_channel.ex +++ b/lib/lightning_web/channels/workflow_channel.ex @@ -672,6 +672,16 @@ defmodule LightningWeb.WorkflowChannel do {:reply, {:ok, %{}}, socket} end + @impl true + def handle_info({:workflow_updated_externally, workflow}, socket) do + push(socket, "workflow_saved", %{ + latest_snapshot_lock_version: workflow.lock_version, + workflow: workflow + }) + + {:noreply, socket} + end + @impl true def handle_info({:yjs, chunk}, socket) do push(socket, "yjs", {:binary, chunk}) diff --git a/test/lightning/collaboration/external_workflow_update_test.exs b/test/lightning/collaboration/external_workflow_update_test.exs new file mode 100644 index 0000000000..a4012425f4 --- /dev/null +++ b/test/lightning/collaboration/external_workflow_update_test.exs @@ -0,0 +1,472 @@ +defmodule Lightning.Collaboration.ExternalWorkflowUpdateTest do + @moduledoc """ + Tests that the collaborative editor stays in sync when the workflow is + updated externally (provisioner import, sandbox merge) without going through + the Y.doc. + + Covers two failure modes: + - Someone is online when the external update runs (SharedDoc is alive but + never notified — currently red, fix pending) + - Nobody is online (stale DocumentState is loaded on next open — the + provisioner calls WorkflowReconciler, which goes through Collaborate.start + to update the doc and flush the correct state before the next user opens) + """ + + # async: false — we start supervised GenServers (DocumentSupervisor, Session) + # that are not owned by the test process. + use Lightning.DataCase, async: false + + import Lightning.CollaborationHelpers + import Lightning.Factories + import Mox + + alias Lightning.Collaborate + alias Lightning.Collaboration.DocumentSupervisor + alias Lightning.Collaboration.Registry, as: CollaborationRegistry + alias Lightning.Collaboration.Session + alias Lightning.Projects.Provisioner + + setup :verify_on_exit! + + setup do + Mox.stub( + Lightning.Extensions.MockUsageLimiter, + :limit_action, + fn _action, _context -> :ok end + ) + + # set_mox_global so the mock is reachable from spawned GenServer processes + # (e.g. Session calling LightningMock.broadcast inside save_workflow) + Mox.set_mox_global(LightningMock) + Mox.stub(LightningMock, :broadcast, fn _topic, _message -> :ok end) + + :ok + end + + describe "after a sandbox merge" do + test "a new tab sees the merged version, not the stale editor state with unsaved changes" do + user_a = insert(:user) + user_b = insert(:user) + + workflow = + insert(:simple_workflow) + |> Lightning.Repo.preload([:jobs, :triggers, :edges]) + + on_exit(fn -> ensure_doc_supervisor_stopped(workflow.id) end) + + project = + Lightning.Repo.get!(Lightning.Projects.Project, workflow.project_id) + + [original_job] = workflow.jobs + document_name = "workflow:#{workflow.id}" + + start_supervised!( + {DocumentSupervisor, + workflow: workflow, + document_name: document_name, + name: CollaborationRegistry.via({:doc_supervisor, document_name})} + ) + + # --- Tab A: User A opens the workflow and adds an unsaved job --- + {:ok, session_a} = Collaborate.start(user: user_a, workflow: workflow) + + Session.update_doc(session_a, fn doc -> + Yex.Doc.get_array(doc, "jobs") + |> Yex.Array.push( + Yex.MapPrelim.from(%{ + "id" => Ecto.UUID.generate(), + "name" => "tab-a-unsaved-job", + "adaptor" => "@openfn/language-common@latest", + "body" => "fn(state => state)" + }) + ) + end) + + # --- User B merges the sandbox (provisioner import, adds a new job) --- + v2_body = build_provisioner_body(project, workflow, add_new_job: true) + [%{"id" => provisioner_job_id}] = new_jobs_in(v2_body, [original_job.id]) + + {:ok, _} = Provisioner.import_document(project, user_b, v2_body) + + # --- Tab B: User B opens the workflow to verify what they just merged --- + {:ok, session_b} = Collaborate.start(user: user_b, workflow: workflow) + + doc = Session.get_doc(session_b) + + job_ids = + doc + |> Yex.Doc.get_array("jobs") + |> Yex.Array.to_json() + |> Enum.map(& &1["id"]) + + Session.stop(session_a) + Session.stop(session_b) + ensure_doc_supervisor_stopped(workflow.id) + + assert provisioner_job_id in job_ids, + "Tab B should see the job that User B just deployed (#{provisioner_job_id}), " <> + "but the SharedDoc was not reset after the sandbox merge. " <> + "Got job ids: #{inspect(job_ids)}" + end + end + + # --------------------------------------------------------------------------- + # Tab A saves after a provisioner import + # --------------------------------------------------------------------------- + # + # After the provisioner import the SharedDoc should be reset to v2. + # So when Tab A saves, the result should be v2 (provisioner's job present). + # Today it fails because the SharedDoc is never reset — Tab A saves v1 + + # unsaved changes, producing a v3 that skips v2's content entirely. + + describe "after a sandbox merge, saving from an open tab" do + test "saves the merged version, not the unsaved changes" do + user_a = insert(:user) + user_b = insert(:user) + + workflow = + insert(:simple_workflow) + |> Lightning.Repo.preload([:jobs, :triggers, :edges]) + + on_exit(fn -> ensure_doc_supervisor_stopped(workflow.id) end) + + project = + Lightning.Repo.get!(Lightning.Projects.Project, workflow.project_id) + + [original_job] = workflow.jobs + document_name = "workflow:#{workflow.id}" + + start_supervised!( + {DocumentSupervisor, + workflow: workflow, + document_name: document_name, + name: CollaborationRegistry.via({:doc_supervisor, document_name})} + ) + + # Tab A: User A opens the workflow and adds an unsaved job + {:ok, session_a} = Collaborate.start(user: user_a, workflow: workflow) + + tab_a_unsaved_job_id = Ecto.UUID.generate() + + Session.update_doc(session_a, fn doc -> + Yex.Doc.get_array(doc, "jobs") + |> Yex.Array.push( + Yex.MapPrelim.from(%{ + "id" => tab_a_unsaved_job_id, + "name" => "tab-a-unsaved-job", + "adaptor" => "@openfn/language-common@latest", + "body" => "fn(state => state)" + }) + ) + end) + + # User B does a provisioner import (sandbox merge / CLI deploy) + v2_body = build_provisioner_body(project, workflow, add_new_job: true) + [%{"id" => provisioner_job_id}] = new_jobs_in(v2_body, [original_job.id]) + + {:ok, _} = Provisioner.import_document(project, user_b, v2_body) + + # Tab A saves + {:ok, saved_workflow} = Session.save_workflow(session_a, user_a) + Session.stop(session_a) + ensure_doc_supervisor_stopped(workflow.id) + + saved_job_ids = Enum.map(saved_workflow.jobs, & &1.id) + + assert provisioner_job_id in saved_job_ids, + "Saved workflow should include the provisioner's job (#{provisioner_job_id}). " <> + "Got jobs: #{saved_workflow.jobs |> Enum.map(& &1.name) |> inspect()}" + + refute tab_a_unsaved_job_id in saved_job_ids, + "Saved workflow should not include Tab A's unsaved job — " <> + "the SharedDoc should have been reset to the merged version before the save." + end + end + + # --------------------------------------------------------------------------- + # Unsaved changes indicator base workflow not updated after sandbox merge + # --------------------------------------------------------------------------- + # + # The unsaved changes indicator compares the Y.Doc's workflow.lock_version + # against the server's latest_snapshot_lock_version to detect unsaved changes. + # After a provisioner import (sandbox merge), the Y.Doc's lock_version should + # be updated to the new DB version so the indicator uses v2 as its base. + # Today it fails because the SharedDoc is never reset — lock_version in the + # Y.Doc stays at v1, so the indicator compares against the wrong baseline. + + describe "after a sandbox merge, the unsaved changes indicator" do + test "reflects the merged version as the new base, not the stale pre-merge version" do + user_a = insert(:user) + user_b = insert(:user) + + workflow = + insert(:simple_workflow) + |> Lightning.Repo.preload([:jobs, :triggers, :edges]) + + on_exit(fn -> ensure_doc_supervisor_stopped(workflow.id) end) + + project = + Lightning.Repo.get!(Lightning.Projects.Project, workflow.project_id) + + [_original_job] = workflow.jobs + document_name = "workflow:#{workflow.id}" + + start_supervised!( + {DocumentSupervisor, + workflow: workflow, + document_name: document_name, + name: CollaborationRegistry.via({:doc_supervisor, document_name})} + ) + + # Tab A: User A opens the workflow and adds an unsaved job + {:ok, session_a} = Collaborate.start(user: user_a, workflow: workflow) + + Session.update_doc(session_a, fn doc -> + Yex.Doc.get_array(doc, "jobs") + |> Yex.Array.push( + Yex.MapPrelim.from(%{ + "id" => Ecto.UUID.generate(), + "name" => "tab-a-unsaved-job", + "adaptor" => "@openfn/language-common@latest", + "body" => "fn(state => state)" + }) + ) + end) + + # User B opens sandbox, adds a job at the same node, saves, and merges sandbox. + # The provisioner import updates the DB to v2. + v2_body = build_provisioner_body(project, workflow, add_new_job: true) + {:ok, _} = Provisioner.import_document(project, user_b, v2_body) + + v2_workflow = + Lightning.Workflows.get_workflow(workflow.id, + include: [:jobs, :edges, :triggers] + ) + + doc = Session.get_doc(session_a) + workflow_map = Yex.Doc.get_map(doc, "workflow") + ydoc_lock_version = Yex.Map.fetch!(workflow_map, "lock_version") + + Session.stop(session_a) + ensure_doc_supervisor_stopped(workflow.id) + + assert ydoc_lock_version == v2_workflow.lock_version, + "Y.Doc lock_version should reflect the merged version " <> + "(#{v2_workflow.lock_version}) so the unsaved changes indicator " <> + "uses v2 as its base — but got #{ydoc_lock_version}. " <> + "Tab A's indicator will compare unsaved changes against v1 " <> + "instead of the sandbox-merged v2." + end + end + + # --------------------------------------------------------------------------- + # Nobody online during provisioner import + # --------------------------------------------------------------------------- + # + # Scenario: a user had the workflow open, made unsaved changes, and closed + # their browser. The SharedDoc shut down and flushed a stale Y.Doc to + # DocumentState (v1 content + phantom unsaved job). Later, a provisioner + # import writes v2 to the DB. When the next user opens the workflow, a fresh + # SharedDoc starts and loads that stale DocumentState. + # + # The fix: the provisioner calls WorkflowReconciler after committing. The + # reconciler calls Collaborate.start (with the provisioner's actor), applies the v2 diff + # to the Y.Doc, then calls Session.stop. The shutdown chain flushes the + # correct state to DocumentState before any real user opens the workflow. + # + # We insert the stale DocumentState directly (bypassing PersistenceWriter) + # so the scenario is deterministic in tests. + + describe "when nobody is online during the provisioner import" do + test "a new tab sees the merged version, not stale persisted editor state" do + user_b = insert(:user) + + workflow = + insert(:simple_workflow) + |> Lightning.Repo.preload([:jobs, :triggers, :edges]) + + on_exit(fn -> ensure_doc_supervisor_stopped(workflow.id) end) + + project = + Lightning.Repo.get!(Lightning.Projects.Project, workflow.project_id) + + [original_job] = workflow.jobs + document_name = "workflow:#{workflow.id}" + + # Build stale Y.Doc state: v1 workflow + a phantom unsaved job left behind + # by a previous user who closed their browser without saving. + stale_doc = Yex.Doc.new() + Session.initialize_workflow_document(stale_doc, workflow) + + Yex.Doc.get_array(stale_doc, "jobs") + |> Yex.Array.push( + Yex.MapPrelim.from(%{ + "id" => Ecto.UUID.generate(), + "name" => "previous-user-unsaved-job", + "adaptor" => "@openfn/language-common@latest", + "body" => "fn(state => state)" + }) + ) + + stale_update = Yex.encode_state_as_update!(stale_doc) + + Lightning.Repo.insert!(%Lightning.Collaboration.DocumentState{ + document_name: document_name, + version: :update, + state_data: stale_update + }) + + # Provisioner runs while nobody is online (no active SharedDoc). + # The reconciler calls Collaborate.start (user: user_b), applies the diff, + # then Session.stop — flushing the correct state to DocumentState. + v2_body = build_provisioner_body(project, workflow, add_new_job: true) + [%{"id" => provisioner_job_id}] = new_jobs_in(v2_body, [original_job.id]) + {:ok, _} = Provisioner.import_document(project, user_b, v2_body) + + # Open the workflow the way a real browser tab would. + # Collaborate.start's internal sleep+retry (iter 3b) acts as a sync point: + # it only succeeds after the reconciler's SharedDoc has fully auto_exited + # and flushed — so by the time this returns, DocumentState has the correct + # content. + v2_workflow = + Lightning.Workflows.get_workflow(workflow.id, + include: [:jobs, :edges, :triggers] + ) + + {:ok, session_b} = Collaborate.start(workflow: v2_workflow, user: user_b) + + doc = Session.get_doc(session_b) + + job_ids = + doc + |> Yex.Doc.get_array("jobs") + |> Yex.Array.to_json() + |> Enum.map(& &1["id"]) + + Session.stop(session_b) + ensure_doc_supervisor_stopped(workflow.id) + + assert provisioner_job_id in job_ids, + "User B should see the provisioner's job (#{provisioner_job_id}) " <> + "when opening the workflow after the merge. " <> + "The reconciler should have corrected the stale DocumentState. " <> + "Got job ids: #{inspect(job_ids)}" + end + end + + # --------------------------------------------------------------------------- + # GitHub sync (ProjectRepoConnection actor) + # --------------------------------------------------------------------------- + # + # When a GitHub sync triggers a provisioner import, the actor is a + # ProjectRepoConnection, not a User. The reconciler must still work — no + # presence tracking should fire, but the SharedDoc must be updated correctly. + + describe "when the provisioner is triggered by a GitHub sync" do + test "a new tab sees the synced version" do + workflow = + insert(:simple_workflow) + |> Lightning.Repo.preload([:jobs, :triggers, :edges]) + + project = + Lightning.Repo.get!(Lightning.Projects.Project, workflow.project_id) + + repo_connection = insert(:project_repo_connection, project: project) + user = insert(:user) + + on_exit(fn -> ensure_doc_supervisor_stopped(workflow.id) end) + + [original_job] = workflow.jobs + + v2_body = build_provisioner_body(project, workflow, add_new_job: true) + [%{"id" => provisioner_job_id}] = new_jobs_in(v2_body, [original_job.id]) + + {:ok, _} = Provisioner.import_document(project, repo_connection, v2_body) + + v2_workflow = + Lightning.Workflows.get_workflow(workflow.id, + include: [:jobs, :edges, :triggers] + ) + + {:ok, session} = Collaborate.start(workflow: v2_workflow, user: user) + + job_ids = + Session.get_doc(session) + |> Yex.Doc.get_array("jobs") + |> Yex.Array.to_json() + |> Enum.map(& &1["id"]) + + Session.stop(session) + ensure_doc_supervisor_stopped(workflow.id) + + assert provisioner_job_id in job_ids, + "The reconciler should have flushed the GitHub-synced job (#{provisioner_job_id}). " <> + "Got job ids: #{inspect(job_ids)}" + end + end + + # --------------------------------------------------------------------------- + # Helpers + # --------------------------------------------------------------------------- + + defp build_provisioner_body(project, workflow, opts) do + base_jobs = + Enum.map(workflow.jobs, fn job -> + %{ + "id" => job.id, + "name" => job.name, + "adaptor" => job.adaptor, + "body" => job.body + } + end) + + extra_jobs = + if opts[:add_new_job] do + [ + %{ + "id" => Ecto.UUID.generate(), + "name" => "provisioner-added-job", + "adaptor" => "@openfn/language-common@latest", + "body" => "fn(state => state)" + } + ] + else + [] + end + + %{ + "id" => project.id, + "name" => project.name, + "workflows" => [ + %{ + "id" => workflow.id, + "name" => workflow.name, + "jobs" => base_jobs ++ extra_jobs, + "triggers" => + Enum.map(workflow.triggers, fn t -> + %{"id" => t.id, "enabled" => t.enabled} + end), + "edges" => + Enum.map(workflow.edges, fn e -> + %{ + "id" => e.id, + "source_trigger_id" => e.source_trigger_id, + "source_job_id" => e.source_job_id, + "target_job_id" => e.target_job_id, + "condition_type" => to_string(e.condition_type), + "condition_expression" => e.condition_expression, + "condition_label" => e.condition_label + } + |> Map.reject(fn {_, v} -> is_nil(v) end) + end) + } + ] + } + end + + defp new_jobs_in(body, existing_job_ids) do + body + |> get_in(["workflows", Access.at(0), "jobs"]) + |> Enum.reject(&(&1["id"] in existing_job_ids)) + end +end diff --git a/test/lightning/collaboration/persistence_test.exs b/test/lightning/collaboration/persistence_test.exs index 210a118e95..e52242a28a 100644 --- a/test/lightning/collaboration/persistence_test.exs +++ b/test/lightning/collaboration/persistence_test.exs @@ -19,229 +19,6 @@ defmodule Lightning.Collaboration.PersistenceTest do :ok end - describe "reconcile_workflow_metadata/2" do - setup do - workflow = insert(:workflow) - workflow_id = workflow.id - document_name = "workflow:#{workflow_id}" - - {:ok, - workflow: workflow, workflow_id: workflow_id, document_name: document_name} - end - - test "converts deleted_at DateTime to string when reconciling", %{ - workflow: workflow, - document_name: document_name - } do - # Create a workflow with a deleted_at timestamp - workflow_with_deleted = %{workflow | deleted_at: DateTime.utc_now()} - - # Create persisted Y.Doc state with same lock_version - doc = Yex.Doc.new() - workflow_map = Yex.Doc.get_map(doc, "workflow") - - Yex.Doc.transaction(doc, "initial_state", fn -> - Yex.Map.set(workflow_map, "id", workflow.id) - Yex.Map.set(workflow_map, "name", workflow.name) - Yex.Map.set(workflow_map, "lock_version", workflow.lock_version) - # Old persisted state has deleted_at as nil - Yex.Map.set(workflow_map, "deleted_at", nil) - end) - - {:ok, update_data} = Yex.encode_state_as_update(doc) - - {:ok, _} = - Repo.insert(%DocumentState{ - document_name: document_name, - state_data: update_data, - version: :update - }) - - # Start DocumentSupervisor with workflow that has deleted_at - # This triggers reconcile_workflow_metadata - {:ok, doc_supervisor} = - DocumentSupervisor.start_link( - [workflow: workflow_with_deleted, document_name: document_name], - name: Registry.via({:doc_supervisor, document_name}) - ) - - assert Process.alive?(doc_supervisor) - - # Verify the deleted_at was properly converted to a string in Y.Doc - shared_doc = Registry.whereis({:shared_doc, document_name}) - doc = Yex.Sync.SharedDoc.get_doc(shared_doc) - reconciled_workflow_map = Yex.Doc.get_map(doc, "workflow") - - deleted_at_value = Yex.Map.fetch!(reconciled_workflow_map, "deleted_at") - - # Should be a string (ISO8601), not a DateTime struct - assert is_binary(deleted_at_value) - - # Should match the original DateTime when parsed back - assert {:ok, parsed_dt, _} = DateTime.from_iso8601(deleted_at_value) - assert DateTime.compare(parsed_dt, workflow_with_deleted.deleted_at) == :eq - - # Clean up - GenServer.stop(doc_supervisor, :normal) - end - - test "handles nil deleted_at correctly", %{ - workflow: workflow, - document_name: document_name - } do - # Workflow without deleted_at (nil) - workflow_without_deleted = %{workflow | deleted_at: nil} - - # Create persisted state - doc = Yex.Doc.new() - workflow_map = Yex.Doc.get_map(doc, "workflow") - - Yex.Doc.transaction(doc, "setup", fn -> - Yex.Map.set(workflow_map, "id", workflow.id) - Yex.Map.set(workflow_map, "name", workflow.name) - Yex.Map.set(workflow_map, "lock_version", workflow.lock_version) - Yex.Map.set(workflow_map, "deleted_at", nil) - end) - - {:ok, update_data} = Yex.encode_state_as_update(doc) - - {:ok, _} = - Repo.insert(%DocumentState{ - document_name: document_name, - state_data: update_data, - version: :update - }) - - # Start DocumentSupervisor - {:ok, doc_supervisor} = - DocumentSupervisor.start_link( - [workflow: workflow_without_deleted, document_name: document_name], - name: Registry.via({:doc_supervisor, document_name}) - ) - - assert Process.alive?(doc_supervisor) - - # Verify deleted_at remains nil - shared_doc = Registry.whereis({:shared_doc, document_name}) - doc = Yex.Sync.SharedDoc.get_doc(shared_doc) - reconciled_workflow_map = Yex.Doc.get_map(doc, "workflow") - - deleted_at_value = Yex.Map.fetch!(reconciled_workflow_map, "deleted_at") - assert deleted_at_value == nil - - # Clean up - GenServer.stop(doc_supervisor, :normal) - end - - test "reconciles lock_version when persisted state exists", %{ - workflow: workflow, - document_name: document_name - } do - # Create persisted state with old lock_version - old_lock_version = workflow.lock_version - new_lock_version = old_lock_version + 1 - - # Update workflow in DB to have new lock_version - {:ok, updated_workflow} = - workflow - |> Ecto.Changeset.change(lock_version: new_lock_version) - |> Repo.update() - - # Create persisted Y.Doc state with old lock_version - doc = Yex.Doc.new() - workflow_map = Yex.Doc.get_map(doc, "workflow") - - Yex.Doc.transaction(doc, "setup", fn -> - Yex.Map.set(workflow_map, "id", workflow.id) - Yex.Map.set(workflow_map, "name", workflow.name) - Yex.Map.set(workflow_map, "lock_version", old_lock_version) - Yex.Map.set(workflow_map, "deleted_at", nil) - end) - - {:ok, update_data} = Yex.encode_state_as_update(doc) - - {:ok, _} = - Repo.insert(%DocumentState{ - document_name: document_name, - state_data: update_data, - version: :update - }) - - # Start DocumentSupervisor - should reconcile to new lock_version - {:ok, doc_supervisor} = - DocumentSupervisor.start_link( - [workflow: updated_workflow, document_name: document_name], - name: Registry.via({:doc_supervisor, document_name}) - ) - - assert Process.alive?(doc_supervisor) - - # Verify lock_version was reconciled to the current DB value - shared_doc = Registry.whereis({:shared_doc, document_name}) - doc = Yex.Sync.SharedDoc.get_doc(shared_doc) - reconciled_workflow_map = Yex.Doc.get_map(doc, "workflow") - - reconciled_lock_version = - Yex.Map.fetch!(reconciled_workflow_map, "lock_version") - - assert reconciled_lock_version == new_lock_version - - # Clean up - GenServer.stop(doc_supervisor, :normal) - end - - test "reconciles workflow name when it changed", %{ - workflow: workflow, - document_name: document_name - } do - # Update workflow name in DB - {:ok, updated_workflow} = - workflow - |> Ecto.Changeset.change(name: "Updated Name") - |> Repo.update() - - # Create persisted state with old name - doc = Yex.Doc.new() - workflow_map = Yex.Doc.get_map(doc, "workflow") - - Yex.Doc.transaction(doc, "setup", fn -> - Yex.Map.set(workflow_map, "id", workflow.id) - Yex.Map.set(workflow_map, "name", workflow.name) - Yex.Map.set(workflow_map, "lock_version", workflow.lock_version) - Yex.Map.set(workflow_map, "deleted_at", nil) - end) - - {:ok, update_data} = Yex.encode_state_as_update(doc) - - {:ok, _} = - Repo.insert(%DocumentState{ - document_name: document_name, - state_data: update_data, - version: :update - }) - - # Start DocumentSupervisor - {:ok, doc_supervisor} = - DocumentSupervisor.start_link( - [workflow: updated_workflow, document_name: document_name], - name: Registry.via({:doc_supervisor, document_name}) - ) - - assert Process.alive?(doc_supervisor) - - # Verify name was reconciled - shared_doc = Registry.whereis({:shared_doc, document_name}) - doc = Yex.Sync.SharedDoc.get_doc(shared_doc) - reconciled_workflow_map = Yex.Doc.get_map(doc, "workflow") - - reconciled_name = Yex.Map.fetch!(reconciled_workflow_map, "name") - assert reconciled_name == "Updated Name" - - # Clean up - GenServer.stop(doc_supervisor, :normal) - end - end - describe "bind/3 with no persisted state" do test "initializes workflow document from database" do workflow = insert(:workflow) @@ -275,11 +52,11 @@ defmodule Lightning.Collaboration.PersistenceTest do end describe "bind/3 with stale persisted state" do - test "resets Y.Doc when persisted lock_version differs from database" do + test "loads persisted state as-is without reconciliation" do workflow = insert(:workflow, lock_version: 5) document_name = "workflow:#{workflow.id}" - # Create persisted state with older lock_version + # Create persisted state with an older lock_version and different name doc = Yex.Doc.new() workflow_map = Yex.Doc.get_map(doc, "workflow") @@ -308,15 +85,13 @@ defmodule Lightning.Collaboration.PersistenceTest do assert Process.alive?(doc_supervisor) - # Verify Y.Doc was reset to current database state + # Persisted state is loaded as-is — no automatic reconciliation shared_doc = Registry.whereis({:shared_doc, document_name}) doc = Yex.Sync.SharedDoc.get_doc(shared_doc) workflow_map = Yex.Doc.get_map(doc, "workflow") - # Should have current lock_version, not the old one - assert Yex.Map.fetch!(workflow_map, "lock_version") == 5 - # Should have current name, not the old one - assert Yex.Map.fetch!(workflow_map, "name") == workflow.name + assert Yex.Map.fetch!(workflow_map, "lock_version") == 3.0 + assert Yex.Map.fetch!(workflow_map, "name") == "Old Name" # Clean up GenServer.stop(doc_supervisor, :normal) diff --git a/test/lightning/collaboration/session_test.exs b/test/lightning/collaboration/session_test.exs index aad64432d3..bf0d287ed9 100644 --- a/test/lightning/collaboration/session_test.exs +++ b/test/lightning/collaboration/session_test.exs @@ -1663,173 +1663,75 @@ defmodule Lightning.SessionTest do end describe "persistence reconciliation" do - test "reconciles lock_version when loading persisted Y.Doc state", %{ + test "restores persisted Y.Doc content on doc supervisor restart", %{ user: user } do workflow = insert(:simple_workflow) + document_name = "workflow:#{workflow.id}" - # Start initial session and make some changes + # Start a session and add an unsaved job to the Y.Doc {:ok, _doc_supervisor} = - Lightning.Collaborate.start_document( - workflow, - "workflow:#{workflow.id}" - ) + Lightning.Collaborate.start_document(workflow, document_name) {:ok, session1} = Session.start_link( user: user, workflow: workflow, parent_pid: self(), - document_name: "workflow:#{workflow.id}" + document_name: document_name ) - # Get the SharedDoc and verify initial lock_version - shared_doc = Session.get_doc(session1) - workflow_map = Yex.Doc.get_map(shared_doc, "workflow") - initial_lock_version = Yex.Map.fetch!(workflow_map, "lock_version") - - # Simulate workflow changes in database (e.g., from another save) - # This increments lock_version in the database - new_lock_version = initial_lock_version + 1 + custom_job_id = Ecto.UUID.generate() - {:ok, updated_workflow} = - Lightning.Workflows.save_workflow( - Lightning.Workflows.change_workflow(workflow, %{ - name: "Updated Name" - }), - user + Session.update_doc(session1, fn doc -> + Yex.Doc.get_array(doc, "jobs") + |> Yex.Array.push( + Yex.MapPrelim.from(%{ + "id" => custom_job_id, + "name" => "persisted-job", + "adaptor" => "@openfn/language-common@latest", + "body" => "fn(state => state)" + }) ) + end) - # Verify database has new lock_version - assert updated_workflow.lock_version == new_lock_version - - # Stop the session and document supervisor to simulate server restart + # Stop the session and doc supervisor — PersistenceWriter flushes state Session.stop(session1) ensure_doc_supervisor_stopped(workflow.id) - # Start a new session - this will load persisted Y.Doc state - # The persisted state has old lock_version, but fresh workflow has new one + # Restart — persisted state should be restored including the custom job {:ok, _doc_supervisor2} = - Lightning.Collaborate.start_document( - updated_workflow, - "workflow:#{workflow.id}" - ) + Lightning.Collaborate.start_document(workflow, document_name) {:ok, session2} = Session.start_link( user: user, - workflow: updated_workflow, + workflow: workflow, parent_pid: self(), - document_name: "workflow:#{workflow.id}" + document_name: document_name ) - # Get the SharedDoc and check lock_version was reconciled - shared_doc2 = Session.get_doc(session2) - workflow_map2 = Yex.Doc.get_map(shared_doc2, "workflow") - reconciled_lock_version = Yex.Map.fetch!(workflow_map2, "lock_version") + doc = Session.get_doc(session2) - # The lock_version should match the database, not the stale persisted state - assert reconciled_lock_version == new_lock_version, - "Expected lock_version #{new_lock_version} but got #{reconciled_lock_version}" + job_ids = + Yex.Doc.get_array(doc, "jobs") + |> Yex.Array.to_json() + |> Enum.map(& &1["id"]) - # Verify name was also reconciled - reconciled_name = Yex.Map.fetch!(workflow_map2, "name") - assert reconciled_name == "Updated Name" + assert custom_job_id in job_ids, + "Persisted custom job should be restored on restart" Session.stop(session2) end - test "discards stale persisted Y.Doc when lock_version changes", %{ + test "handles persisted Y.Doc with nil lock_version without crashing", %{ user: user } do - workflow = insert(:simple_workflow) - - # Start initial session with lock_version 0 - {:ok, _doc_supervisor} = - Lightning.Collaborate.start_document( - workflow, - "workflow:#{workflow.id}" - ) - - {:ok, session1} = - Session.start_link( - user: user, - workflow: workflow, - parent_pid: self(), - document_name: "workflow:#{workflow.id}" - ) - - # Get initial state - shared_doc = Session.get_doc(session1) - jobs_array = Yex.Doc.get_array(shared_doc, "jobs") - initial_job_count = Yex.Array.length(jobs_array) - - # Workflow is saved (lock_version increments) - {:ok, updated_workflow} = - Lightning.Workflows.save_workflow( - Lightning.Workflows.change_workflow(workflow, %{ - name: "Changed by another user" - }), - user - ) - - new_lock_version = updated_workflow.lock_version - - # Simulate server restart - persisted Y.Doc has old lock_version - Session.stop(session1) - ensure_doc_supervisor_stopped(workflow.id) - - # Start new session with updated workflow - # Persistence should detect stale lock_version and reload from DB - {:ok, _doc_supervisor2} = - Lightning.Collaborate.start_document( - updated_workflow, - "workflow:#{workflow.id}" - ) - - {:ok, session2} = - Session.start_link( - user: user, - workflow: updated_workflow, - parent_pid: self(), - document_name: "workflow:#{workflow.id}" - ) - - # Verify Y.Doc was reloaded from database - shared_doc2 = Session.get_doc(session2) - jobs_array2 = Yex.Doc.get_array(shared_doc2, "jobs") - - # Should have original jobs from DB (persisted Y.Doc was discarded) - assert Yex.Array.length(jobs_array2) == initial_job_count - - # Verify lock_version matches database - workflow_map2 = Yex.Doc.get_map(shared_doc2, "workflow") - reconciled_lock_version = Yex.Map.fetch!(workflow_map2, "lock_version") - - assert reconciled_lock_version == new_lock_version, - "Lock version should match database" - - # Verify workflow name was updated from database - reconciled_name = Yex.Map.fetch!(workflow_map2, "name") - assert reconciled_name == "Changed by another user" - - Session.stop(session2) - end - - test "handles persisted Y.Doc with nil lock_version when DB has real version", - %{ - user: user - } do - # This tests the bug fix for issue #4164 - # When a workflow is opened before first save, Y.Doc gets lock_version: nil - # If that state is persisted and the workflow is later saved (getting a real lock_version), - # loading the persisted state would crash because extract_lock_version didn't handle {:ok, nil} - workflow = insert(:simple_workflow) doc_name = "workflow:#{workflow.id}" - # Manually create a persisted document state with lock_version: nil - # This simulates a Y.Doc that was persisted before the workflow was ever saved + # Manually create persisted state with nil lock_version — + # simulates a Y.Doc persisted before the workflow was first saved doc = Yex.Doc.new() workflow_map = Yex.Doc.get_map(doc, "workflow") @@ -1847,13 +1749,9 @@ defmodule Lightning.SessionTest do version: :update }) - # Now start a session - this should NOT crash - # The persistence layer should handle the nil lock_version and reset from DB + # Should not crash when loading persisted state with nil lock_version {:ok, _doc_supervisor} = - Lightning.Collaborate.start_document( - workflow, - doc_name - ) + Lightning.Collaborate.start_document(workflow, doc_name) {:ok, session} = Session.start_link( @@ -1863,14 +1761,12 @@ defmodule Lightning.SessionTest do document_name: doc_name ) - # Verify the session started and lock_version was reconciled from DB - shared_doc = Session.get_doc(session) - workflow_map2 = Yex.Doc.get_map(shared_doc, "workflow") - reconciled_lock_version = Yex.Map.fetch!(workflow_map2, "lock_version") + # Persisted state is loaded as-is — nil lock_version stays nil + doc = Session.get_doc(session) + workflow_map2 = Yex.Doc.get_map(doc, "workflow") + lock_version = Yex.Map.fetch!(workflow_map2, "lock_version") - # lock_version should now match the database value - assert reconciled_lock_version == workflow.lock_version, - "Expected lock_version #{workflow.lock_version} but got #{reconciled_lock_version}" + assert is_nil(lock_version) Session.stop(session) end diff --git a/test/lightning/collaboration/workflow_reconciler_test.exs b/test/lightning/collaboration/workflow_reconciler_test.exs index ffdee70fce..4c44b0c416 100644 --- a/test/lightning/collaboration/workflow_reconciler_test.exs +++ b/test/lightning/collaboration/workflow_reconciler_test.exs @@ -780,6 +780,293 @@ defmodule Lightning.Collaboration.WorkflowReconcilerTest do end end + describe "reconcile_workflow_from_db/2" do + setup %{user: _user} do + workflow = + insert(:simple_workflow) + |> Lightning.Repo.preload([:jobs, :triggers, :edges]) + + on_exit(fn -> ensure_doc_supervisor_stopped(workflow.id) end) + + %{workflow: workflow} + end + + test "phantom job (in Y.Doc but not in DB) is removed from the doc", + %{user: user, workflow: workflow} do + {:ok, session} = Collaborate.start(workflow: workflow, user: user) + + phantom_id = Ecto.UUID.generate() + + Session.update_doc(session, fn doc -> + Yex.Doc.get_array(doc, "jobs") + |> Yex.Array.push( + Yex.MapPrelim.from(%{ + "id" => phantom_id, + "name" => "phantom-job", + "adaptor" => "@openfn/language-common@latest", + "body" => "" + }) + ) + end) + + WorkflowReconciler.reconcile_workflow_from_db(workflow, user) + + doc = Session.get_doc(session) + jobs = Yex.Doc.get_array(doc, "jobs") |> Yex.Array.to_json() + job_ids = Enum.map(jobs, & &1["id"]) + + Session.stop(session) + ensure_doc_supervisor_stopped(workflow.id) + + [original_job] = workflow.jobs + assert original_job.id in job_ids + refute phantom_id in job_ids + end + + test "multiple phantom jobs are all removed from the doc", + %{user: user, workflow: workflow} do + {:ok, session} = Collaborate.start(workflow: workflow, user: user) + + phantom_ids = Enum.map(1..3, fn _ -> Ecto.UUID.generate() end) + + Session.update_doc(session, fn doc -> + array = Yex.Doc.get_array(doc, "jobs") + + Enum.each(phantom_ids, fn pid -> + Yex.Array.push( + array, + Yex.MapPrelim.from(%{ + "id" => pid, + "name" => "p", + "adaptor" => "", + "body" => "" + }) + ) + end) + end) + + WorkflowReconciler.reconcile_workflow_from_db(workflow, user) + + doc = Session.get_doc(session) + jobs = Yex.Doc.get_array(doc, "jobs") |> Yex.Array.to_json() + job_ids = Enum.map(jobs, & &1["id"]) + + Session.stop(session) + ensure_doc_supervisor_stopped(workflow.id) + + assert length(job_ids) == 1 + [original_job] = workflow.jobs + assert original_job.id in job_ids + Enum.each(phantom_ids, fn pid -> refute pid in job_ids end) + end + + test "job body is updated in the doc when it differs from DB", + %{user: user, workflow: workflow} do + [db_job] = workflow.jobs + {:ok, session} = Collaborate.start(workflow: workflow, user: user) + + Session.update_doc(session, fn doc -> + jobs_array = Yex.Doc.get_array(doc, "jobs") + + job_map = + Enum.find(jobs_array, fn m -> Yex.Map.fetch!(m, "id") == db_job.id end) + + {:ok, body_text} = Yex.Map.fetch(job_map, "body") + len = Yex.Text.length(body_text) + if len > 0, do: Yex.Text.delete(body_text, 0, len) + Yex.Text.insert(body_text, 0, "stale editor body") + end) + + WorkflowReconciler.reconcile_workflow_from_db(workflow, user) + + doc = Session.get_doc(session) + jobs_array = Yex.Doc.get_array(doc, "jobs") + + job_map = + Enum.find(jobs_array, fn m -> Yex.Map.fetch!(m, "id") == db_job.id end) + + {:ok, body_text} = Yex.Map.fetch(job_map, "body") + body_string = Yex.Text.to_string(body_text) + + Session.stop(session) + ensure_doc_supervisor_stopped(workflow.id) + + assert body_string == db_job.body + end + + test "no-op reconcile does not modify Y.Doc when doc is already in sync with DB", + %{user: user, workflow: workflow} do + {:ok, session} = Collaborate.start(workflow: workflow, user: user) + + [db_job] = workflow.jobs + doc = Session.get_doc(session) + + jobs_array = Yex.Doc.get_array(doc, "jobs") + + job_map = + Enum.find(jobs_array, fn m -> Yex.Map.fetch!(m, "id") == db_job.id end) + + name_before = Yex.Map.fetch!(job_map, "name") + + WorkflowReconciler.reconcile_workflow_from_db(workflow, user) + + name_after = Yex.Map.fetch!(job_map, "name") + + Session.stop(session) + ensure_doc_supervisor_stopped(workflow.id) + + assert name_before == name_after + assert name_after == db_job.name + end + + test "phantom edge (in Y.Doc but not in DB) is removed from the doc", + %{user: user, workflow: workflow} do + {:ok, session} = Collaborate.start(workflow: workflow, user: user) + + phantom_edge_id = Ecto.UUID.generate() + + Session.update_doc(session, fn doc -> + Yex.Doc.get_array(doc, "edges") + |> Yex.Array.push( + Yex.MapPrelim.from(%{ + "id" => phantom_edge_id, + "condition_type" => "always", + "enabled" => true, + "source_trigger_id" => nil, + "source_job_id" => nil, + "target_job_id" => nil + }) + ) + end) + + WorkflowReconciler.reconcile_workflow_from_db(workflow, user) + + doc = Session.get_doc(session) + + edge_ids = + Yex.Doc.get_array(doc, "edges") + |> Yex.Array.to_json() + |> Enum.map(& &1["id"]) + + Session.stop(session) + ensure_doc_supervisor_stopped(workflow.id) + + [original_edge] = workflow.edges + assert original_edge.id in edge_ids + refute phantom_edge_id in edge_ids + end + + test "phantom trigger (in Y.Doc but not in DB) is removed from the doc", + %{user: user, workflow: workflow} do + {:ok, session} = Collaborate.start(workflow: workflow, user: user) + + phantom_trigger_id = Ecto.UUID.generate() + + Session.update_doc(session, fn doc -> + Yex.Doc.get_array(doc, "triggers") + |> Yex.Array.push( + Yex.MapPrelim.from(%{ + "id" => phantom_trigger_id, + "type" => "webhook", + "enabled" => true, + "cron_expression" => nil, + "kafka_configuration" => nil + }) + ) + end) + + WorkflowReconciler.reconcile_workflow_from_db(workflow, user) + + doc = Session.get_doc(session) + + trigger_ids = + Yex.Doc.get_array(doc, "triggers") + |> Yex.Array.to_json() + |> Enum.map(& &1["id"]) + + Session.stop(session) + ensure_doc_supervisor_stopped(workflow.id) + + [original_trigger] = workflow.triggers + assert original_trigger.id in trigger_ids + refute phantom_trigger_id in trigger_ids + end + + test "new kafka trigger is inserted with kafka_configuration fields", + %{user: user, workflow: workflow} do + kafka_trigger = + insert(:trigger, + type: :kafka, + workflow: workflow, + kafka_configuration: build(:triggers_kafka_configuration) + ) + + {:ok, session} = Collaborate.start(workflow: workflow, user: user) + + updated_workflow = + Lightning.Workflows.get_workflow(workflow.id, + include: [:jobs, :triggers, :edges] + ) + + WorkflowReconciler.reconcile_workflow_from_db(updated_workflow, user) + + doc = Session.get_doc(session) + triggers = Yex.Doc.get_array(doc, "triggers") |> Yex.Array.to_json() + + Session.stop(session) + ensure_doc_supervisor_stopped(workflow.id) + + kafka_t = Enum.find(triggers, &(&1["id"] == kafka_trigger.id)) + + assert kafka_t != nil, "kafka trigger should be inserted into Y.Doc" + + assert %{"group_id" => _, "hosts_string" => _} = + kafka_t["kafka_configuration"] + end + + test "trigger cron_expression is updated in the doc when it differs from DB", + %{user: user, workflow: workflow} do + [original_trigger] = workflow.triggers + + {:ok, session} = Collaborate.start(workflow: workflow, user: user) + + Session.update_doc(session, fn doc -> + triggers_array = Yex.Doc.get_array(doc, "triggers") + + t_map = + Enum.find(triggers_array, fn m -> + Yex.Map.fetch!(m, "id") == original_trigger.id + end) + + Yex.Map.set(t_map, "cron_expression", "stale_value") + end) + + updated_trigger = + Lightning.Repo.update!( + Ecto.Changeset.change(original_trigger, cron_expression: "0 * * * *") + ) + + updated_workflow = %{workflow | triggers: [updated_trigger]} + + WorkflowReconciler.reconcile_workflow_from_db(updated_workflow, user) + + doc = Session.get_doc(session) + triggers_array = Yex.Doc.get_array(doc, "triggers") + + t_map = + Enum.find(triggers_array, fn m -> + Yex.Map.fetch!(m, "id") == original_trigger.id + end) + + cron_expression = Yex.Map.fetch!(t_map, "cron_expression") + + Session.stop(session) + ensure_doc_supervisor_stopped(workflow.id) + + assert cron_expression == "0 * * * *" + end + end + defp find_in_ydoc_array(array, id) do array |> Enum.reduce_while(nil, fn item, _ -> diff --git a/test/lightning_web/channels/workflow_channel_test.exs b/test/lightning_web/channels/workflow_channel_test.exs index 055db16d58..35279bc3a4 100644 --- a/test/lightning_web/channels/workflow_channel_test.exs +++ b/test/lightning_web/channels/workflow_channel_test.exs @@ -3409,4 +3409,20 @@ defmodule LightningWeb.WorkflowChannelTest do assert_broadcast "job_code_applied", %{message_id: ^message_id} end end + + describe "handle_info :workflow_updated_externally" do + test "pushes workflow_saved to the socket when the workflow is updated externally", + %{socket: socket, workflow: workflow} do + loaded_workflow = + Lightning.Repo.get!(Lightning.Workflows.Workflow, workflow.id) + + send(socket.channel_pid, {:workflow_updated_externally, loaded_workflow}) + + assert_push "workflow_saved", %{ + latest_snapshot_lock_version: lock_version + } + + assert lock_version == loaded_workflow.lock_version + end + end end