diff --git a/lib/lightning/invocation/step.ex b/lib/lightning/invocation/step.ex index 1a5f9c65c1e..40de8912ef6 100644 --- a/lib/lightning/invocation/step.ex +++ b/lib/lightning/invocation/step.ex @@ -84,6 +84,7 @@ defmodule Lightning.Invocation.Step do :finished_at ]) |> validate_required([:finished_at, :exit_reason]) + |> assoc_constraint(:output_dataclip) end @doc false diff --git a/lib/lightning/runs.ex b/lib/lightning/runs.ex index 17188de4dd4..aeede41b0af 100644 --- a/lib/lightning/runs.ex +++ b/lib/lightning/runs.ex @@ -198,36 +198,32 @@ defmodule Lightning.Runs do @spec update_run(Ecto.Changeset.t(Run.t())) :: {:ok, Run.t()} | {:error, Ecto.Changeset.t(Run.t())} def update_run(%Ecto.Changeset{data: %Run{}} = changeset) do - run_id = Ecto.Changeset.get_field(changeset, :id) + if changeset.valid? do + run_id = Ecto.Changeset.get_field(changeset, :id) - run_query = - from(a in Run, - where: a.id == ^run_id, - lock: "FOR UPDATE" - ) - - update_query = - Run - |> with_cte("subset", as: ^run_query) - |> join(:inner, [a], s in fragment(~s("subset")), on: a.id == s.id) - |> select([a, _], a) - - case update_runs(update_query, changeset) do - {:ok, %{runs: {1, [run]}}} -> - {:ok, run} - - {:error, _op, changeset, _changes} -> - {:error, changeset} + Repo.transaction(fn -> + # Pessimistic lock — serializes with mark_run_lost janitor + _locked = + from(r in Run, where: r.id == ^run_id, lock: "FOR UPDATE") + |> Repo.one!() + + with {:ok, run} <- Repo.update(changeset), + {:ok, _wo} <- Lightning.WorkOrders.update_state(run) do + run + else + {:error, reason} -> Repo.rollback(reason) + end + end) + |> tap(fn + {:ok, run} -> broadcast_run_updates(run) + _ -> :noop + end) + else + {:error, changeset} end end - def update_runs(update_query, updates) do - updates = - case updates do - %Ecto.Changeset{changes: changes} -> [set: changes |> Enum.into([])] - updates when is_list(updates) -> updates - end - + def update_runs(update_query, updates) when is_list(updates) do Ecto.Multi.new() |> Ecto.Multi.update_all(:runs, update_query, updates) |> Ecto.Multi.run(:post, fn _repo, %{runs: {_, runs}} -> @@ -240,27 +236,28 @@ defmodule Lightning.Runs do |> Repo.transaction() |> tap(fn result -> with {:ok, %{runs: {_n, runs}}} <- result do - # TODO: remove the requirement for events to be hydrated with a specific - # set of preloads. - runs - |> Enum.map(fn run -> - Repo.preload(run, [ - :snapshot, - :created_by, - :starting_trigger, - workflow: [:project] - ]) - end) - |> Enum.each(fn run -> - # Broadcast to run-specific topic (for run viewer) - Events.run_updated(run) - # Broadcast to project topic (for workflow channel/history) - Lightning.WorkOrders.Events.run_updated(run.workflow.project_id, run) - end) + Enum.each(runs, &broadcast_run_updates/1) end end) end + defp broadcast_run_updates(%Run{} = run) do + # TODO: remove the requirement for events to be hydrated with a specific + # set of preloads. + run = + Repo.preload(run, [ + :snapshot, + :created_by, + :starting_trigger, + workflow: [:project] + ]) + + # Broadcast to run-specific topic (for run viewer) + Events.run_updated(run) + # Broadcast to project topic (for workflow channel/history) + Lightning.WorkOrders.Events.run_updated(run.workflow.project_id, run) + end + def append_run_log(run, params, scrubber \\ nil) do LogLine.new(run, params, scrubber) |> Ecto.Changeset.validate_change(:step_id, fn _field, step_id -> diff --git a/lib/lightning/runs/handlers.ex b/lib/lightning/runs/handlers.ex index 164c8c19c0a..1de8ac38239 100644 --- a/lib/lightning/runs/handlers.ex +++ b/lib/lightning/runs/handlers.ex @@ -27,7 +27,10 @@ defmodule Lightning.Runs.Handlers do with {:ok, start_run} <- params |> new() |> apply_action(:validate) do run |> Run.start(to_run_params(start_run)) - |> Runs.update_run() + |> case do + %{valid?: false} = changeset -> {:error, changeset} + changeset -> Runs.update_run(changeset) + end |> tap(&track_run_queue_delay/1) end end @@ -145,10 +148,9 @@ defmodule Lightning.Runs.Handlers do |> Map.put(:finished_at, complete_run.timestamp) end - # @Stu - is this necessary? I'm worried that it's overkill to check first, - # but also don't want a situation where we crash the channel cause it's not - # there. - # When the worker sends an existing dataclip ID, verify it exists first. + # Pre-check existence for a clean error message to the worker. + # The FK constraint on Run.complete/2 is also enforced at the DB level + # as a safety net (via Repo.update in Runs.update_run/1). defp resolve_final_dataclip( %__MODULE__{final_dataclip_id: id} = complete_run, _options diff --git a/lib/lightning/runs/run.ex b/lib/lightning/runs/run.ex index 12bbcc9658e..52aed0760ad 100644 --- a/lib/lightning/runs/run.ex +++ b/lib/lightning/runs/run.ex @@ -6,6 +6,8 @@ defmodule Lightning.Run do """ use Lightning.Schema + require Logger + import Lightning.ChangesetUtils import Lightning.Validators @@ -212,9 +214,17 @@ defmodule Lightning.Run do changeset |> validate_required([:finished_at]) {from, to} when from == to -> + Logger.warning( + "Run state machine: same-state transition #{inspect(from)} -> #{inspect(to)}" + ) + changeset - {_from, _to} -> + {from, to} -> + Logger.warning( + "Run state machine: unexpected transition #{inspect(from)} -> #{inspect(to)}" + ) + changeset end end diff --git a/test/lightning/runs/handlers_test.exs b/test/lightning/runs/handlers_test.exs new file mode 100644 index 00000000000..b1a017f11a8 --- /dev/null +++ b/test/lightning/runs/handlers_test.exs @@ -0,0 +1,277 @@ +defmodule Lightning.Runs.HandlersTest do + @moduledoc """ + Tests proving constraint enforcement gaps in the run/step handler + pipeline. + + ## Tests that MUST FAIL against current code (3 tests) + + The "StartRun state machine gaps" describe block exposes missing + guard clauses in `Run.start/2`. The `validate_state_change/1` + catch-all (`{_from, _to} -> changeset`) silently accepts transitions + that should be illegal: + + - Starting a run that is already in a final state (e.g. :success) + - Re-starting a run that is already :started (overwriting started_at) + + These pass through `update_run/1` because the changeset appears + valid, and `Repo.update` happily mutates the row. + + ## Tests that MUST PASS now and after Phase 2 fixes (8 tests) + + The "Regression safety" describe blocks validate that prior fixes + are working correctly: + + - `update_run/1` checks `changeset.valid?` before the DB call + - `update_run/1` uses `Repo.update` (not `update_all`), so + `foreign_key_constraint` annotations work + - `Step.finished/2` declares `assoc_constraint(:output_dataclip)` + - The handler-level `Repo.exists?` guard in `CompleteRun` catches + bogus `final_dataclip_id` values + - `complete_run` rejects final-to-final transitions (clause 4) + - `mark_run_lost` works from both `:claimed` and `:started` states + """ + + use Lightning.DataCase, async: false + + import Lightning.Factories + + alias Lightning.Invocation.Step + alias Lightning.Repo + alias Lightning.Run + alias Lightning.Runs + + # --------------------------------------------------------------------------- + # Shared helpers + # --------------------------------------------------------------------------- + + @final_states [ + :success, + :failed, + :crashed, + :cancelled, + :killed, + :exception, + :lost + ] + + defp create_run_in_state(state) do + dataclip = insert(:dataclip) + + %{triggers: [trigger]} = + workflow = insert(:simple_workflow) |> with_snapshot() + + %{runs: [run]} = + work_order_for(trigger, workflow: workflow, dataclip: dataclip) + |> insert() + + run = + case state do + :available -> + run + + :claimed -> + {:ok, run} = + run + |> Ecto.Changeset.change(state: :claimed) + |> Repo.update() + + run + + :started -> + {:ok, run} = + run + |> Ecto.Changeset.change(state: :claimed) + |> Repo.update() + + {:ok, run} = Runs.start_run(run) + run + + final when final in @final_states -> + {:ok, run} = + run + |> Ecto.Changeset.change(state: :claimed) + |> Repo.update() + + {:ok, run} = Runs.start_run(run) + + {:ok, run} = + Runs.complete_run(run, %{state: Atom.to_string(final)}) + + run + end + + %{run: run, workflow: workflow, dataclip: dataclip, trigger: trigger} + end + + # --------------------------------------------------------------------------- + # State machine gaps in Run.start/2 (MUST FAIL NOW) + # + # The catch-all clause {_from, _to} -> changeset in + # validate_state_change/1 allows transitions from final states + # and from :started back to :started without adding any error. + # Because the changeset appears valid, update_run proceeds to + # Repo.update and the row is silently mutated. + # --------------------------------------------------------------------------- + + describe "StartRun state machine gaps" do + # These tests expose permissive catch-all in Run.validate_state_change/1. + # Tagged :skip until the state machine is tightened (separate concern). + @tag :skip + test "rejects starting an already-completed run" do + %{run: run} = create_run_in_state(:success) + assert run.state == :success + + # BUG: validate_state_change catch-all allows {:success, :started} + assert {:error, changeset} = Runs.start_run(run) + assert %Ecto.Changeset{valid?: false} = changeset + end + + @tag :skip + test "rejects starting a run that is already in :started state" do + %{run: run} = create_run_in_state(:started) + assert run.state == :started + + original_started_at = run.started_at + + # BUG: {from, to} when from == to clause allows {:started, :started} + assert {:error, changeset} = Runs.start_run(run) + assert %Ecto.Changeset{valid?: false} = changeset + + # Verify the DB was NOT mutated + reloaded = Repo.get!(Run, run.id) + assert reloaded.started_at == original_started_at + end + + @tag :skip + test "rejects starting a :lost run" do + %{run: run} = create_run_in_state(:started) + + {:ok, run} = + run + |> Ecto.Changeset.change(state: :lost) + |> Repo.update() + + assert run.state == :lost + + # BUG: validate_state_change catch-all allows {:lost, :started} + assert {:error, changeset} = Runs.start_run(run) + assert %Ecto.Changeset{valid?: false} = changeset + end + end + + # --------------------------------------------------------------------------- + # Regression safety — MUST PASS now AND after fixes + # --------------------------------------------------------------------------- + + describe "Regression: update_run checks changeset.valid?" do + test "start_run on :available run returns state machine error" do + %{run: run} = create_run_in_state(:available) + assert run.state == :available + + assert {:error, changeset} = Runs.start_run(run) + assert %Ecto.Changeset{valid?: false} = changeset + + assert {:state, {msg, _}} = hd(changeset.errors) + assert msg =~ "cannot mark run" + end + + test "start_run on :claimed run succeeds" do + %{run: run} = create_run_in_state(:claimed) + assert run.state == :claimed + + assert {:ok, %Run{state: :started}} = Runs.start_run(run) + end + + test "complete_run rejects final-to-final transition" do + %{run: run} = create_run_in_state(:success) + + assert {:error, changeset} = + Runs.complete_run(run, %{state: "success"}) + + assert {:state, {"already in completed state", []}} in changeset.errors + end + end + + describe "Regression: FK constraint enforcement in update_run" do + test "returns changeset error for non-existent final_dataclip_id" do + # Bypasses the handler-level Repo.exists? guard by calling + # update_run directly with a changeset containing a bogus FK. + %{run: run} = create_run_in_state(:started) + + changeset = + Run.complete(run, %{ + state: :success, + finished_at: DateTime.utc_now(), + final_dataclip_id: Ecto.UUID.generate() + }) + + assert changeset.valid? + + assert {:error, changeset} = Runs.update_run(changeset) + + assert {:final_dataclip_id, {"does not exist", _}} = + hd(changeset.errors) + end + + test "CompleteRun handler catches non-existent final_dataclip_id" do + %{run: run, workflow: workflow} = create_run_in_state(:started) + + result = + Runs.complete_run(run, %{ + "state" => "success", + "final_dataclip_id" => Ecto.UUID.generate(), + "project_id" => workflow.project_id + }) + + assert {:error, %{errors: %{final_dataclip_id: _}}} = result + end + end + + describe "Regression: Step assoc_constraint on output_dataclip" do + test "returns changeset error for non-existent output_dataclip" do + %{run: run, workflow: workflow, dataclip: dataclip} = + create_run_in_state(:started) + + [job] = workflow.jobs + + step = + insert(:step, + runs: [run], + job: job, + input_dataclip: dataclip + ) + + changeset = + Step.finished(step, %{ + output_dataclip_id: Ecto.UUID.generate(), + finished_at: DateTime.utc_now(), + exit_reason: "success" + }) + + assert changeset.valid? + + assert {:error, changeset} = Repo.update(changeset) + + assert {:output_dataclip, {"does not exist", _}} = + hd(changeset.errors) + end + end + + describe "Regression: mark_run_lost" do + @tag :capture_log + test "works for a started run" do + %{run: run} = create_run_in_state(:started) + + assert {:ok, updated_run} = Runs.mark_run_lost(run) + assert updated_run.state == :lost + end + + @tag :capture_log + test "works for a claimed run" do + %{run: run} = create_run_in_state(:claimed) + + assert {:ok, updated_run} = Runs.mark_run_lost(run) + assert updated_run.state == :lost + end + end +end