Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 36 additions & 4 deletions lib/absinthe.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,20 @@ defmodule Absinthe do
%{message: String.t()}
| %{message: String.t(), locations: [%{line: pos_integer, column: integer}]}

@type continuations_t :: nil | [Absinthe.Blueprint.Continuation.t()]

@type ordinal_fun :: (term() -> term())

@type ordinal_compare_fun :: (term(), term() -> {boolean(), term()})

@type result_t ::
%{data: nil | result_selection_t}
| %{data: nil | result_selection_t, errors: [result_error_t]}
%{
required(:data) => nil | result_selection_t,
optional(:ordinal_fun) => ordinal_fun(),
optional(:ordinal_compare_fun) => ordinal_compare_fun(),
optional(:continuations) => continuations_t,
optional(:errors) => [result_error_t]
}
| %{errors: [result_error_t]}

@type pipeline_modifier_fun :: (Absinthe.Pipeline.t(), Keyword.t() -> Absinthe.Pipeline.t())
Expand Down Expand Up @@ -98,7 +109,8 @@ defmodule Absinthe do
pipeline_modifier: pipeline_modifier_fun()
]

@type run_result :: {:ok, result_t} | {:error, String.t()}
@type run_result :: {:ok, result_t} | {:more, result_t} | {:error, String.t()}
@type continue_result :: run_result | :no_more_results

@spec run(
binary | Absinthe.Language.Source.t() | Absinthe.Language.Document.t(),
Expand All @@ -113,7 +125,26 @@ defmodule Absinthe do
|> Absinthe.Pipeline.for_document(options)
|> pipeline_modifier.(options)

case Absinthe.Pipeline.run(document, pipeline) do
document
|> Absinthe.Pipeline.run(pipeline)
|> build_result()
end

@spec continue([Absinthe.Blueprint.Continuation.t()]) :: continue_result
def continue(continuations) do
continuations
|> Absinthe.Pipeline.continue()
|> build_result()
end

defp build_result(output) do
case output do
{:ok, %{result: :no_more_results}, _phases} ->
:no_more_results

{:ok, %{result: %{continuations: c} = result}, _phases} when c != [] ->
{:more, result}

{:ok, %{result: result}, _phases} ->
{:ok, result}

Expand All @@ -137,6 +168,7 @@ defmodule Absinthe do
def run!(input, schema, options \\ []) do
case run(input, schema, options) do
{:ok, result} -> result
{:more, result} -> result
{:error, err} -> raise ExecutionError, message: err
end
end
Expand Down
18 changes: 18 additions & 0 deletions lib/absinthe/blueprint/continuation.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
defmodule Absinthe.Blueprint.Continuation do
@moduledoc false

# Continuations allow further resolutions after the initial result is
# returned

alias Absinthe.Pipeline

defstruct [
:phase_input,
:pipeline
]

@type t :: %__MODULE__{
phase_input: Pipeline.data_t(),
pipeline: Pipeline.t()
}
end
6 changes: 4 additions & 2 deletions lib/absinthe/blueprint/result/list.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@ defmodule Absinthe.Blueprint.Result.List do
:values,
errors: [],
flags: %{},
extensions: %{}
extensions: %{},
continuations: []
]

@type t :: %__MODULE__{
emitter: Blueprint.Document.Field.t(),
values: [Blueprint.Execution.node_t()],
errors: [Phase.Error.t()],
flags: Blueprint.flags_t(),
extensions: %{any => any}
extensions: %{any => any},
continuations: [Blueprint.Continuation.t()]
}
end
6 changes: 4 additions & 2 deletions lib/absinthe/blueprint/result/object.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@ defmodule Absinthe.Blueprint.Result.Object do
:fields,
errors: [],
flags: %{},
extensions: %{}
extensions: %{},
continuations: []
]

@type t :: %__MODULE__{
emitter: Blueprint.Document.Field.t(),
fields: [Blueprint.Execution.node_t()],
errors: [Phase.Error.t()],
flags: Blueprint.flags_t(),
extensions: %{any => any}
extensions: %{any => any},
continuations: [Blueprint.Continuation.t()]
}
end
9 changes: 8 additions & 1 deletion lib/absinthe/phase/document/result.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ defmodule Absinthe.Phase.Document.Result do
{:validation_failed, errors}
end

format_result(result, opts)
result
|> format_result(opts)
|> maybe_add_continuations(blueprint.execution.result)
end

defp format_result({:ok, {data, []}}, _) do
Expand Down Expand Up @@ -156,4 +158,9 @@ defmodule Absinthe.Phase.Document.Result do
end

defp format_location(_), do: []

defp maybe_add_continuations(result, %{continuations: continuations}) when continuations != [],
do: Map.put(result, :continuations, continuations)

defp maybe_add_continuations(result, _), do: result
end
54 changes: 54 additions & 0 deletions lib/absinthe/phase/subscription/get_ordinal.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
defmodule Absinthe.Phase.Subscription.GetOrdinal do
use Absinthe.Phase

alias Absinthe.Phase.Subscription.SubscribeSelf

@moduledoc false

alias Absinthe.Blueprint

@spec run(any, Keyword.t()) :: {:ok, Blueprint.t()}
def run(blueprint, _options \\ []) do
with %{type: :subscription, selections: [field]} <- Blueprint.current_operation(blueprint),
{:ok, config} = SubscribeSelf.get_config(field, blueprint.execution.context, blueprint),
{_, ordinal_fun} when is_function(ordinal_fun, 1) <- {:ordinal_fun, config[:ordinal]},
{_, ordinal_compare_fun} when is_function(ordinal_compare_fun, 2) <-
{:ordinal_compare_fun,
Keyword.get(config, :ordinal_compare, &default_ordinal_compare/2)} do
ordinal = ordinal_fun.(blueprint.execution.root_value)

result =
blueprint.result
|> Map.put(:ordinal, ordinal)
|> Map.put(:ordinal_compare_fun, ordinal_compare_fun)

{:ok, %{blueprint | result: result}}
else
{:ordinal_fun, f} when is_function(f) ->
IO.write(
:stderr,
"Ordinal function must be 1-arity"
)

{:ok, blueprint}

{:ordinal_compare_fun, f} when is_function(f) ->
IO.write(
:stderr,
"Ordinal compare function must be 2-arity"
)

{:ok, blueprint}

_ ->
{:ok, blueprint}
end
end

defp default_ordinal_compare(nil, new_ordinal), do: {true, new_ordinal}

defp default_ordinal_compare(old_ordinal, new_ordinal) when old_ordinal < new_ordinal,
do: {true, new_ordinal}

defp default_ordinal_compare(old_ordinal, _new_ordinal), do: {false, old_ordinal}
end
46 changes: 46 additions & 0 deletions lib/absinthe/phase/subscription/prime.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
defmodule Absinthe.Phase.Subscription.Prime do
@moduledoc false

alias Absinthe.Blueprint.Continuation
alias Absinthe.Phase

@spec run(any(), Keyword.t()) :: Absinthe.Phase.result_t()
def run(blueprint, prime_result: prime_result) do
{:ok, put_in(blueprint.execution.root_value, prime_result)}
end

def run(blueprint, prime_fun: prime_fun, resolution_options: options) do
{:ok, prime_results} = prime_fun.(blueprint.execution)

case prime_results do
[first | rest] ->
blueprint = put_in(blueprint.execution.root_value, first)
blueprint = maybe_add_continuations(blueprint, rest, options)
{:ok, blueprint}

[] ->
blueprint = put_in(blueprint.result, :no_more_results)
{:replace, blueprint, []}
end
end

defp maybe_add_continuations(blueprint, [], _options), do: blueprint

defp maybe_add_continuations(blueprint, remaining_results, options) do
continuations =
Enum.map(
remaining_results,
&%Continuation{
phase_input: blueprint,
pipeline: [
{__MODULE__, [prime_result: &1]},
{Phase.Document.Execution.Resolution, options},
Phase.Subscription.GetOrdinal,
Phase.Document.Result
]
}
)

put_in(blueprint.result, %{continuations: continuations})
end
end
34 changes: 32 additions & 2 deletions lib/absinthe/phase/subscription/result.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,40 @@ defmodule Absinthe.Phase.Subscription.Result do
# subscription

alias Absinthe.Blueprint
alias Absinthe.Blueprint.Continuation
alias Absinthe.Phase

@spec run(any, Keyword.t()) :: {:ok, Blueprint.t()}
def run(blueprint, topic: topic) do
result = %{"subscribed" => topic}
def run(blueprint, options) do
topic = Keyword.fetch!(options, :topic)
prime = Keyword.get(options, :prime)

result = maybe_add_prime(%{"subscribed" => topic}, prime, blueprint, options)

{:ok, put_in(blueprint.result, result)}
end

def maybe_add_prime(result, nil, _blueprint, _options), do: result

def maybe_add_prime(result, prime_fun, blueprint, options) when is_function(prime_fun, 1) do
continuation = %Continuation{
phase_input: blueprint,
pipeline: [
{Phase.Subscription.Prime, [prime_fun: prime_fun, resolution_options: options]},
{Phase.Document.Execution.Resolution, options},
Phase.Subscription.GetOrdinal,
Phase.Document.Result
]
}

Map.put(result, :continuations, [continuation])
end

def maybe_add_prime(_result, prime_fun, _blueprint, _options) do
raise """
Invalid prime function. Must be a function of arity 1.

#{inspect(prime_fun)}
"""
end
end
12 changes: 6 additions & 6 deletions lib/absinthe/phase/subscription/subscribe_self.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ defmodule Absinthe.Phase.Subscription.SubscribeSelf do
Absinthe.Subscription.subscribe(pubsub, field_keys, subscription_id, blueprint)

pipeline = [
{Phase.Subscription.Result, topic: subscription_id},
{Phase.Subscription.Result, topic: subscription_id, prime: config[:prime]},
{Phase.Telemetry, Keyword.put(options, :event, [:execute, :operation, :stop])}
]

Expand All @@ -46,11 +46,11 @@ defmodule Absinthe.Phase.Subscription.SubscribeSelf do
end
end

defp get_config(
%{schema_node: schema_node, argument_data: argument_data} = field,
context,
blueprint
) do
def get_config(
%{schema_node: schema_node, argument_data: argument_data} = field,
context,
blueprint
) do
name = schema_node.identifier

config =
Expand Down
32 changes: 26 additions & 6 deletions lib/absinthe/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,44 @@ defmodule Absinthe.Pipeline do
* See `Absinthe.Schema` on adjusting the schema pipeline for schema manipulation.
"""

alias Absinthe.Blueprint.Continuation
alias Absinthe.Phase

@type data_t :: any

@type run_result_t ::
{:ok, data_t, [Phase.t()]}
| {:error, String.t() | {:http_method, String.t()}, [Phase.t()]}

@type phase_config_t :: Phase.t() | {Phase.t(), Keyword.t()}

@type t :: [phase_config_t | [phase_config_t]]

@spec run(data_t, t) ::
{:ok, data_t, [Phase.t()]}
| {:error, String.t() | {:http_method, String.t()}, [Phase.t()]}
@spec run(data_t, t) :: run_result_t
def run(input, pipeline) do
pipeline
|> List.flatten()
|> run_phase(input)
end

@spec continue([Continuation.t()]) :: run_result_t
def continue([continuation | rest]) do
result = run_phase(continuation.pipeline, continuation.phase_input)

case result do
{:ok, blueprint, phases} when rest == [] ->
{:ok, blueprint, phases}

{:ok, blueprint, phases} ->
bp_result = Map.put(blueprint.result, :continuations, rest)
blueprint = Map.put(blueprint, :result, bp_result)
{:ok, blueprint, phases}

error ->
error
end
end

@defaults [
adapter: Absinthe.Adapter.LanguageConventions,
operation_name: nil,
Expand Down Expand Up @@ -116,6 +137,7 @@ defmodule Absinthe.Pipeline do
# Execution
{Phase.Subscription.SubscribeSelf, options},
{Phase.Document.Execution.Resolution, options},
Phase.Subscription.GetOrdinal,
# Format Result
Phase.Document.Result,
{Phase.Telemetry, Keyword.put(options, :event, [:execute, :operation, :stop])}
Expand Down Expand Up @@ -393,9 +415,7 @@ defmodule Absinthe.Pipeline do
end)
end

@spec run_phase(t, data_t, [Phase.t()]) ::
{:ok, data_t, [Phase.t()]}
| {:error, String.t() | {:http_method, String.t()}, [Phase.t()]}
@spec run_phase(t, data_t, [Phase.t()]) :: run_result_t
def run_phase(pipeline, input, done \\ [])

def run_phase([], input, done) do
Expand Down
2 changes: 2 additions & 0 deletions lib/absinthe/subscription.ex
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ defmodule Absinthe.Subscription do

@type subscription_field_spec :: {atom, term | (term -> term)}

@type prime_fun :: (Absinthe.Resolution.t() -> {:ok, [map()]})

@doc """
Publish a mutation

Expand Down
Loading
Loading