Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@
# notification. To prevent flooding the recipients, it will wait for a period
# before it sends the next email (assuming the failure condition persists).
# Changing this setting will affect the frequency of sending.
# KAFKA_NOTIFICATTION_EMBARGO_SECONDS=3600
# KAFKA_NOTIFICATION_EMBARGO_SECONDS=3600
#
# If the Kafka pipelines failed to persist a message, the message can be
# persisted as JSON to the local file system. To enable this, set
Expand All @@ -282,7 +282,7 @@
# Lightning starts and it must be writable by the user that Lightning runs as.
# KAFKA_ALTERNATE_STORAGE_FILE_PATH=/path/to/alternate/storage
#
# This file to which the registry should be read from. In case the file doesnt
# This file to which the registry should be read from. In case the file doesn't
# exist, Lightning will attempt to fetch the file and write it to the same location.
# For this reason, you have to make sure that the directory exists and it is writable
# ADAPTORS_REGISTRY_JSON_PATH=/path/to/adaptor_registry_cache.json
Expand Down
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ and this project adheres to

- bumped local worker to 1.24.0

- Allow instance admins to install credential schemas and update the adaptor
registry on the fly [#3114](https://github.com/OpenFn/lightning/issues/3114),
[#2209](https://github.com/OpenFn/lightning/issues/2209),
[#325](https://github.com/OpenFn/lightning/issues/325),
[#1996](https://github.com/OpenFn/lightning/issues/1996)

### Fixed

## [2.16.2] - 2026-04-20
Expand Down
3 changes: 3 additions & 0 deletions config/dev.exs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ config :lightning, LightningWeb.Endpoint,
storybook_tailwind: {Tailwind, :install_and_run, [:storybook, ~w(--watch)]}
]

# schemas_path and adaptor_icons_path are only used by build-time mix tasks
# (install_schemas, install_adaptor_icons) for baking data into Docker images.
# At runtime, the DB is the primary source via AdaptorData/ETS cache.
config :lightning,
schemas_path: "priv/schemas",
adaptor_icons_path: "priv/static/images/adaptors",
Expand Down
3 changes: 3 additions & 0 deletions config/prod.exs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ config :phoenix, :filter_parameters, [
"token"
]

# schemas_path and adaptor_icons_path are only used by build-time mix tasks
# (install_schemas, install_adaptor_icons) for baking data into Docker images.
# At runtime, the DB is the primary source via AdaptorData/ETS cache.
config :lightning,
schemas_path: "priv/schemas",
adaptor_icons_path: "priv/static/images/adaptors"
Expand Down
2 changes: 2 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ config :lightning, Lightning.FailureAlerter,
time_scale: 60_000,
rate_limit: 3

# schemas_path and adaptor_icons_path point to test fixtures for build-time
# mix task tests. At runtime, tests use the DB via AdaptorData/ETS cache.
config :lightning,
schemas_path: "test/fixtures/schemas",
adaptor_icons_path: "test/fixtures/adaptors/icons",
Expand Down
123 changes: 123 additions & 0 deletions lib/lightning/adaptor_data.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
defmodule Lightning.AdaptorData do
@moduledoc """
Context for managing adaptor cache entries in the database.

Provides CRUD operations for storing adaptor registry data, credential
schemas, adaptor icons, and other cacheable adaptor metadata. Each entry
is keyed by a `kind` (category) and a unique `key` within that kind.
"""
import Ecto.Query

alias Lightning.AdaptorData.CacheEntry
alias Lightning.Repo

@doc """
Upserts a single cache entry.

If an entry with the same `kind` and `key` already exists, its `data`,
`content_type`, and `updated_at` fields are replaced.

Returns `{:ok, %CacheEntry{}}` or `{:error, %Ecto.Changeset{}}`.
"""
@spec put(String.t(), String.t(), binary(), String.t()) ::
{:ok, CacheEntry.t()} | {:error, Ecto.Changeset.t()}
def put(kind, key, data, content_type \\ "application/json") do
%CacheEntry{}
|> CacheEntry.changeset(%{
kind: kind,
key: key,
data: data,
content_type: content_type
})
|> Repo.insert(
conflict_target: [:kind, :key],
on_conflict: {:replace, [:data, :content_type, :updated_at]},
returning: true
)
end

@doc """
Bulk upserts a list of entries for the given `kind`.

Each entry in `entries` must be a map with `:key`, `:data`, and optionally
`:content_type` keys.

Returns `{count, nil | [%CacheEntry{}]}` where `count` is the number of
rows affected.
"""
@spec put_many(String.t(), [map()]) :: {non_neg_integer(), nil}
def put_many(kind, entries) when is_list(entries) do
now = DateTime.utc_now() |> DateTime.truncate(:microsecond)

rows =
Enum.map(entries, fn entry ->
%{
id: Ecto.UUID.generate(),
kind: kind,
key: Map.fetch!(entry, :key),
data: Map.fetch!(entry, :data),
content_type: Map.get(entry, :content_type, "application/json"),
inserted_at: now,
updated_at: now
}
end)

Repo.insert_all(CacheEntry, rows,
conflict_target: [:kind, :key],
on_conflict: {:replace, [:data, :content_type, :updated_at]}
)
end

@doc """
Gets a single cache entry by `kind` and `key`.

Returns `{:ok, %CacheEntry{}}` or `{:error, :not_found}`.
"""
@spec get(String.t(), String.t()) ::
{:ok, CacheEntry.t()} | {:error, :not_found}
def get(kind, key) do
case Repo.get_by(CacheEntry, kind: kind, key: key) do
nil -> {:error, :not_found}
entry -> {:ok, entry}
end
end

@doc """
Gets all cache entries for the given `kind`.

Returns a list of `%CacheEntry{}` structs ordered by key.
"""
@spec get_all(String.t()) :: [CacheEntry.t()]
def get_all(kind) do
CacheEntry
|> where([e], e.kind == ^kind)
|> order_by([e], asc: e.key)
|> Repo.all()
end

@doc """
Deletes all cache entries for the given `kind`.

Returns `{count, nil}` where `count` is the number of deleted rows.
"""
@spec delete_kind(String.t()) :: {non_neg_integer(), nil}
def delete_kind(kind) do
CacheEntry
|> where([e], e.kind == ^kind)
|> Repo.delete_all()
end

@doc """
Deletes a specific cache entry by `kind` and `key`.

Returns `{:ok, %CacheEntry{}}` or `{:error, :not_found}`.
"""
@spec delete(String.t(), String.t()) ::
{:ok, CacheEntry.t()} | {:error, :not_found}
def delete(kind, key) do
case Repo.get_by(CacheEntry, kind: kind, key: key) do
nil -> {:error, :not_found}
entry -> {:ok, Repo.delete!(entry)}
end
end
end
83 changes: 83 additions & 0 deletions lib/lightning/adaptor_data/cache.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
defmodule Lightning.AdaptorData.Cache do
@moduledoc """
Cachex-backed read-through cache for adaptor data.

Read path: Cachex -> DB -> nil
Write path: DB -> broadcast invalidate -> all nodes clear Cachex
Next read on any node: cache miss -> DB hit -> Cachex populated
"""

@cache :adaptor_data

@doc "Get a cached value. Falls back to DB on miss, populates cache."
def get(kind, key) do
case Cachex.get!(@cache, {kind, key}) do
nil ->
case Lightning.AdaptorData.get(kind, key) do
{:error, :not_found} ->
nil

{:ok, entry} ->
value = %{data: entry.data, content_type: entry.content_type}
Cachex.put!(@cache, {kind, key}, value)
value
end

value ->
value
end
end

@doc "Get all entries of a kind. Falls back to DB on miss."
def get_all(kind) do
case Cachex.get!(@cache, {kind, :__all__}) do
nil ->
case Lightning.AdaptorData.get_all(kind) do
[] ->
[]

entries ->
values =
Enum.map(entries, fn e ->
%{key: e.key, data: e.data, content_type: e.content_type}
end)

Cachex.put!(@cache, {kind, :__all__}, values)
values
end

cached ->
cached
end
end

@doc "Put a value directly into the cache (does not touch the DB)."
def put(kind, key, value) do
Cachex.put!(@cache, {kind, key}, value)
:ok
end

@doc "Invalidate all cached entries for a kind."
def invalidate(kind) do
@cache
|> Cachex.keys!()
|> Enum.filter(&match?({^kind, _}, &1))
|> Enum.each(&Cachex.del!(@cache, &1))

:ok
end

@doc "Invalidate all cached entries."
def invalidate_all do
Cachex.clear!(@cache)
:ok
end

@doc "Broadcast cache invalidation to all nodes."
def broadcast_invalidation(kinds) when is_list(kinds) do
Lightning.API.broadcast(
"adaptor:data",
{:invalidate_cache, kinds, node()}
)
end
end
37 changes: 37 additions & 0 deletions lib/lightning/adaptor_data/cache_entry.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
defmodule Lightning.AdaptorData.CacheEntry do
@moduledoc """
Schema for adaptor cache entries stored in the database.

Each entry is identified by a `kind` (e.g., "registry", "schema", "icon")
and a `key` (e.g., adaptor name or path). The `data` field holds the raw
binary content and `content_type` describes its format.
"""
use Lightning.Schema

@type t :: %__MODULE__{
id: Ecto.UUID.t(),
kind: String.t(),
key: String.t(),
data: binary(),
content_type: String.t(),
inserted_at: DateTime.t(),
updated_at: DateTime.t()
}

schema "adaptor_cache_entries" do
field :kind, :string
field :key, :string
field :data, :binary
field :content_type, :string, default: "application/json"

timestamps(type: :utc_datetime_usec)
end

@doc false
def changeset(entry, attrs) do
entry
|> cast(attrs, [:kind, :key, :data, :content_type])
|> validate_required([:kind, :key, :data])
|> unique_constraint([:kind, :key])
end
end
34 changes: 34 additions & 0 deletions lib/lightning/adaptor_data/listener.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
defmodule Lightning.AdaptorData.Listener do
@moduledoc """
GenServer that subscribes to PubSub for cache invalidation messages.

When a node writes new data to the DB, it broadcasts
`{:invalidate_cache, kinds, node()}`. All nodes (including the sender)
clear those kinds from their ETS cache. The next read on any node will
go to DB and repopulate ETS.
"""

use GenServer
require Logger

def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end

@impl GenServer
def init(_opts) do
Lightning.API.subscribe("adaptor:data")
{:ok, %{}}
end

@impl GenServer
def handle_info({:invalidate_cache, kinds, _origin_node}, state) do
Logger.info("Invalidating adaptor cache for: #{inspect(kinds)}")

Enum.each(kinds, &Lightning.AdaptorData.Cache.invalidate/1)

{:noreply, state}
end

def handle_info(_msg, state), do: {:noreply, state}
end
Loading
Loading