Skip to content

Commit 4bbf448

Browse files
committed
improvement: Implement upserts with MERGE (17+)
We can now provide per-row action reporting for upserts
1 parent e96419e commit 4bbf448

7 files changed

Lines changed: 855 additions & 21 deletions

File tree

lib/data_layer.ex

Lines changed: 230 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2224,9 +2224,16 @@ defmodule AshPostgres.DataLayer do
22242224

22252225
source = resolve_source(resource, Enum.at(changesets, 0))
22262226

2227+
# On PostgreSQL 17+ we implement upserts with `MERGE` instead of `INSERT ... ON CONFLICT`,
2228+
# which lets us report per-row whether each record was inserted or updated (via
2229+
# `merge_action()`), surfaced as `:upsert_action` metadata. Below 17, when explicitly
2230+
# disabled, or for upserts whose keys cannot be matched by `MERGE`, we fall back to the
2231+
# `ON CONFLICT` path.
2232+
use_merge? = options[:upsert?] && merge_upsert?(resource, options)
2233+
22272234
try do
22282235
opts =
2229-
if options[:upsert?] do
2236+
if options[:upsert?] && !use_merge? do
22302237
# Ash groups changesets by atomics before dispatching them to the data layer
22312238
# this means that all changesets have the same atomics
22322239
%{atomics: atomics, filter: filter} = Enum.at(changesets, 0)
@@ -2318,9 +2325,13 @@ defmodule AshPostgres.DataLayer do
23182325
end
23192326

23202327
result =
2321-
with_savepoint(repo, opts[:on_conflict], fn ->
2322-
repo.insert_all(source, ecto_changesets, opts)
2323-
end)
2328+
if use_merge? do
2329+
merge_upsert(repo, resource, source, changesets, ecto_changesets, opts, options)
2330+
else
2331+
with_savepoint(repo, opts[:on_conflict], fn ->
2332+
repo.insert_all(source, ecto_changesets, opts)
2333+
end)
2334+
end
23242335

23252336
identity = options[:identity]
23262337
keys = Map.get(identity || %{}, :keys) || Ash.Resource.Info.primary_key(resource)
@@ -2616,6 +2627,221 @@ defmodule AshPostgres.DataLayer do
26162627
end)
26172628
end
26182629

2630+
defp merge_upsert?(resource, _options) do
2631+
Application.get_env(:ash_postgres, :upsert_with_merge?, true) != false &&
2632+
AshPostgres.DataLayer.Info.pg_version_matches?(resource, ">= 17.0.0")
2633+
end
2634+
2635+
# Implements an upsert as a single `MERGE` statement (PostgreSQL 17+). Returns the same
2636+
# `{count, records | nil}` shape as `repo.insert_all/3` so the downstream result handling
2637+
# in `bulk_create/3` is shared between both paths.
2638+
defp merge_upsert(repo, resource, source, changesets, ecto_changesets, opts, options) do
2639+
changeset = Enum.at(changesets, 0)
2640+
%{atomics: atomics, filter: filter} = changeset
2641+
upsert_keys = options[:upsert_keys] || Ash.Resource.Info.primary_key(resource)
2642+
2643+
base_query =
2644+
from(row in source, as: ^0)
2645+
|> AshSql.Bindings.default_bindings(resource, AshPostgres.SqlImplementation)
2646+
2647+
upsert_set = upsert_set(resource, changesets, upsert_keys, options)
2648+
2649+
set_query =
2650+
case AshSql.Atomics.query_with_atomics(resource, base_query, nil, atomics, %{}, upsert_set) do
2651+
{:empty, _query} ->
2652+
raise "Cannot upsert with no fields to specify in the upsert statement. This can only happen on resources without a primary key."
2653+
2654+
{:ok, query} ->
2655+
query
2656+
2657+
{:error, error} ->
2658+
raise Ash.Error.to_ash_error(error)
2659+
end
2660+
2661+
# The upsert condition (which may reference the incoming row via `EXCLUDED`) becomes
2662+
# `WHEN MATCHED AND ...`.
2663+
when_matched_condition_query =
2664+
if filter do
2665+
merge_filter_query(resource, source, filter)
2666+
end
2667+
2668+
# The full `ON` clause: each key matched as `<key on target> = <key on incoming>`, combined
2669+
# with the resource's base_filter and the identity's `where` (reproducing partial-unique-index
2670+
# matching semantics). `source_fields` are the attributes the incoming side references and which
2671+
# therefore must exist on the `EXCLUDED` source even if these entries don't set them.
2672+
{on_query, source_fields} =
2673+
build_merge_on_query(resource, source, upsert_keys, options[:identity], changeset)
2674+
2675+
table =
2676+
case source do
2677+
{table, _resource} -> table
2678+
_ -> AshPostgres.DataLayer.Info.table(resource)
2679+
end
2680+
2681+
# Any of the rendered sub-queries (the SET expressions, the upsert condition, or the ON
2682+
# clause) can contain ash expressions that raise at runtime. Their `has_error?` flags live
2683+
# on separate query structs, so fold them together to decide whether to savepoint.
2684+
savepoint_query =
2685+
combine_expression_accumulators(set_query, [when_matched_condition_query, on_query])
2686+
2687+
with_savepoint(repo, savepoint_query, fn ->
2688+
AshPostgres.Merge.merge_all(repo,
2689+
resource: resource,
2690+
table: table,
2691+
prefix: opts[:prefix],
2692+
entries: ecto_changesets,
2693+
on_query: on_query,
2694+
source_fields: source_fields,
2695+
set_query: set_query,
2696+
when_matched_condition_query: when_matched_condition_query,
2697+
on_not_matched: :insert,
2698+
returning: opts[:returning],
2699+
report_action?: true
2700+
)
2701+
end)
2702+
end
2703+
2704+
# Returns `base_query` with its `has_error?` accumulator flag set if any of `base_query` or
2705+
# `others` flagged a potentially-raising expression, so a single `with_savepoint/3` call
2706+
# covers every sub-query rendered into the MERGE statement.
2707+
defp combine_expression_accumulators(base_query, others) do
2708+
has_error? =
2709+
[base_query | others]
2710+
|> Enum.reject(&is_nil/1)
2711+
|> Enum.any?(& &1.__ash_bindings__.expression_accumulator.has_error?)
2712+
2713+
put_in(base_query.__ash_bindings__.expression_accumulator.has_error?, has_error?)
2714+
end
2715+
2716+
# Builds a `SELECT 1 ... WHERE <filter>` query whose rendered WHERE clause AshPostgres.Merge
2717+
# extracts and splices into a MERGE condition. Built from the same `from(row in source, as: ^0)`
2718+
# shape as the SET query so Ecto emits a consistent target alias across all clauses.
2719+
defp merge_filter_query(resource, source, filter) do
2720+
query =
2721+
from(row in source, as: ^0)
2722+
|> AshSql.Bindings.default_bindings(resource, AshPostgres.SqlImplementation)
2723+
2724+
case AshSql.Filter.filter(query, filter, resource) do
2725+
{:ok, query} -> Ecto.Query.select(query, 1)
2726+
{:error, error} -> raise Ash.Error.to_ash_error(error)
2727+
end
2728+
end
2729+
2730+
# Builds the rendered `ON` query for a MERGE upsert: a key-match expression for each upsert key,
2731+
# combined with the identity's `where` (and the resource base_filter, applied by `Ash.Query.new/1`).
2732+
# Also returns the attributes the incoming (`EXCLUDED`) side references, which must be present on
2733+
# the source.
2734+
defp build_merge_on_query(resource, source, keys, identity, changeset) do
2735+
nils_distinct? = if identity, do: identity.nils_distinct?, else: true
2736+
identity_where = identity && identity.where
2737+
2738+
{match_expressions, source_fields} =
2739+
Enum.map_reduce(keys, [], fn key, acc ->
2740+
{expression, fields} = key_match_expression(resource, key, nils_distinct?, changeset)
2741+
{expression, fields ++ acc}
2742+
end)
2743+
2744+
match_expression =
2745+
Enum.reduce(match_expressions, fn expression, acc -> Ash.Expr.expr(^acc and ^expression) end)
2746+
2747+
filter =
2748+
resource
2749+
|> Ash.Query.new()
2750+
|> Ash.Query.do_filter(match_expression)
2751+
|> then(fn query ->
2752+
if identity_where, do: Ash.Query.do_filter(query, identity_where), else: query
2753+
end)
2754+
|> Map.get(:filter)
2755+
2756+
{merge_filter_query(resource, source, filter), Enum.uniq(source_fields)}
2757+
end
2758+
2759+
# `<key on the existing row> == <key on the incoming row>`. For `nils_distinct?: false`
2760+
# identities (NULLS NOT DISTINCT) two NULL keys must also match, so widen to an
2761+
# `IS NOT DISTINCT FROM` equivalent. Returns the match expression and the attributes its
2762+
# incoming side references.
2763+
defp key_match_expression(resource, key, nils_distinct?, changeset) do
2764+
{lhs, rhs, source_fields} = key_match_sides(resource, key, changeset)
2765+
2766+
expression =
2767+
if nils_distinct? do
2768+
Ash.Expr.expr(^lhs == ^rhs)
2769+
else
2770+
Ash.Expr.expr(^lhs == ^rhs or (is_nil(^lhs) and is_nil(^rhs)))
2771+
end
2772+
2773+
{expression, source_fields}
2774+
end
2775+
2776+
# Produces `{existing-row expr, incoming-row expr, referenced source attributes}` for a key. The
2777+
# incoming side references the conflicting row via `upsert_conflict/1` (rendered as `EXCLUDED.col`).
2778+
# Calculation keys are resolved to a proper `Ash.Query.Calculation` (with context) and matched as
2779+
# expressions on both sides.
2780+
defp key_match_sides(resource, key, changeset) do
2781+
case Ash.Resource.Info.field(resource, key) do
2782+
%Ash.Resource.Attribute{} = attribute ->
2783+
{%Ash.Query.Ref{attribute: attribute, relationship_path: [], resource: resource},
2784+
%Ash.Query.UpsertConflict{attribute: key}, [key]}
2785+
2786+
%Ash.Resource.Calculation{calculation: {module, opts}} = resource_calculation ->
2787+
{:ok, calculation} =
2788+
Ash.Query.Calculation.new(
2789+
resource_calculation.name,
2790+
module,
2791+
opts,
2792+
resource_calculation.type,
2793+
resource_calculation.constraints
2794+
)
2795+
2796+
calculation =
2797+
Ash.Actions.Read.add_calc_context(
2798+
calculation,
2799+
changeset.context[:private][:actor],
2800+
changeset.context[:private][:authorize?],
2801+
changeset.tenant,
2802+
changeset.context[:private][:tracer],
2803+
changeset.domain,
2804+
resource,
2805+
parent_stack: []
2806+
)
2807+
2808+
resolved = calculation.module.expression(calculation.opts, calculation.context)
2809+
lhs = %Ash.Query.Ref{attribute: calculation, relationship_path: [], resource: resource}
2810+
rhs = rewrite_to_incoming_row(resolved)
2811+
{lhs, rhs, calculation_source_fields(resolved)}
2812+
2813+
other ->
2814+
raise ArgumentError,
2815+
"Cannot use #{inspect(key)} as a MERGE upsert key: unsupported field #{inspect(other && other.__struct__)}"
2816+
end
2817+
end
2818+
2819+
# Rewrites a calculation's (local) attribute references to reference the incoming/conflicting row.
2820+
defp rewrite_to_incoming_row(expression) do
2821+
Ash.Filter.map(expression, fn
2822+
%Ash.Query.Ref{relationship_path: [], attribute: attribute} ->
2823+
%Ash.Query.UpsertConflict{attribute: merge_field_name(attribute)}
2824+
2825+
%Ash.Query.Ref{} = ref ->
2826+
raise ArgumentError,
2827+
"Calculation MERGE upsert key references a relationship (#{inspect(ref.relationship_path)}), which cannot be matched in a single MERGE statement"
2828+
2829+
other ->
2830+
other
2831+
end)
2832+
end
2833+
2834+
# The local attributes a calculation key depends on; they must exist on the `EXCLUDED` source.
2835+
defp calculation_source_fields(expression) do
2836+
expression
2837+
|> Ash.Filter.list_refs()
2838+
|> Enum.filter(&(&1.relationship_path == []))
2839+
|> Enum.map(&merge_field_name(&1.attribute))
2840+
end
2841+
2842+
defp merge_field_name(%{name: name}), do: name
2843+
defp merge_field_name(name) when is_atom(name), do: name
2844+
26192845
@doc false
26202846
def get_source_for_upsert_field(field, resource) do
26212847
case Ash.Resource.Info.attribute(resource, field) do

0 commit comments

Comments
 (0)