Recotem discovers DataSource plugins via Python entry points. A plugin is any installed package that registers in the recotem.datasources group.
The examples/plugins/echo-source/ directory in this repository is a minimal, runnable reference implementation.
A plugin must provide a class with three class-level attributes and one
required method (fetch); __init__ and the optional probe are described
below.
from __future__ import annotations
import random
from typing import ClassVar
import pandas as pd
from pydantic import BaseModel, Field
from recotem.datasource.base import DataSourceError, FetchContext
class EchoSource:
"""Returns a synthetic DataFrame — useful for testing and CI."""
# 1. type_name: discriminator value matched against the recipe YAML
# `source.type` field. Must be a non-empty string and unique across
# all installed plugins. By convention use a short lower-case slug.
type_name: ClassVar[str] = "echo"
# 2. Config: pydantic BaseModel describing the recipe sub-fields for this
# source. All fields appear under `source:` in the YAML alongside the
# `type:` discriminator. The loader passes the entire `source:` mapping
# (including `type`) to `Config.model_validate(...)`, so either declare
# `type` as a field on Config (the builtin convention — see below) or
# rely on pydantic's default `extra="ignore"` to drop it. Combining
# `extra="forbid"` with no `type` field will fail recipe load with an
# "unexpected key" error.
class Config(BaseModel):
n_users: int = Field(default=10, ge=1)
n_items: int = Field(default=20, ge=1)
n_rows: int = Field(default=100, ge=1)
seed: int = Field(default=42)
# 3. extras_required: pip extras to suggest when optional dependencies
# are missing. Leave empty if the plugin has no optional deps.
extras_required: ClassVar[list[str]] = []
# 4. no_expand_fields: frozenset of field names inside the source config
# whose string values must NEVER receive ${RECOTEM_RECIPE_*} env-var
# expansion. List any fields that carry raw SQL, query parameters, or
# other content where ${} should be treated as literals.
# Use frozenset() (empty) when no fields need protection beyond the
# global baseline (query, query_parameters) that is always guarded.
# This attribute is REQUIRED — validate_plugin_contract enforces its
# presence and its type (frozenset). A missing or wrong-type attribute
# raises DataSourceError at plugin discovery with a pointer to this doc.
no_expand_fields: ClassVar[frozenset[str]] = frozenset()
def __init__(self, config: "EchoSource.Config") -> None:
self._config = config
def fetch(self, ctx: FetchContext) -> pd.DataFrame:
"""Return a DataFrame whose columns include those named in
the recipe `schema` block (user_column, item_column, optional
time_column).
Returns a DataFrame with columns: user_id (str), item_id (str),
timestamp (int epoch seconds).
"""
cfg = self._config
max_possible = cfg.n_users * cfg.n_items
if cfg.n_rows > max_possible:
raise DataSourceError(
f"EchoSource: n_rows ({cfg.n_rows}) exceeds n_users * n_items "
f"({max_possible}). Reduce n_rows or increase n_users/n_items."
)
rng = random.Random(cfg.seed)
users = [f"user_{i}" for i in range(cfg.n_users)]
items = [f"item_{j}" for j in range(cfg.n_items)]
all_pairs = [(u, v) for u in users for v in items]
sampled = rng.sample(all_pairs, cfg.n_rows)
base_ts = 1_700_000_000
rows = [
{"user_id": u, "item_id": v, "timestamp": base_ts + idx}
for idx, (u, v) in enumerate(sampled)
]
return pd.DataFrame(rows, columns=["user_id", "item_id", "timestamp"])
def probe(self) -> None:
"""Optional. Called by recotem validate to test connectivity.
Should be cheap — never load full data.
Raise DataSourceError on failure.
Return value is ignored by recotem (Protocol declares -> None).
"""
cfg = self._config
max_possible = cfg.n_users * cfg.n_items
if cfg.n_rows > max_possible:
raise DataSourceError(
f"EchoSource: n_rows ({cfg.n_rows}) exceeds n_users * n_items "
f"({max_possible})."
)
# discarded by recotem validate — kept here for illustration only
return {"status": "ok", "rows_to_emit": cfg.n_rows, "items": cfg.n_items} # type: ignore[return-value]-
type_nameis the discriminator value. It appears assource.type: echoin the recipe. The registry validates that it is a non-empty string and unique across all loaded plugins; duplicatetype_namevalues cause bothrecotem trainandrecotem serveto fail at startup with aDataSourceError(exit code 3) listing the conflicting fully-qualified class names. -
Configis a pydanticBaseModel. Fields are validated at recipe load. Use pydantic validators for constraints. Required fields without defaults cause aRecipeErrorwhen missing from the recipe. -
extras_requiredis purely documentation. The registry only validates that it is alist[str]; recotem never auto-installs or auto-checks these extras. Surface a helpful message yourself in__init__(see Deferred imports) — the value of the attribute is what you cite there. -
no_expand_fieldsis required and must be afrozenset[str]. It names every field in the sourceConfigwhose string values must never receive${RECOTEM_RECIPE_*}environment-variable expansion.validate_plugin_contractchecks that this attribute is present and is afrozenset; a missing or wrong-type declaration raisesDataSourceErrorat plugin-discovery time with a pointer to this doc.- For most plugins, declare
no_expand_fields: ClassVar[frozenset[str]] = frozenset()— the global baseline (query,query_parameters) is already guarded unconditionally by the recipe loader. - For plugins with SQL or parameterised-query fields, list them explicitly:
no_expand_fields: ClassVar[frozenset[str]] = frozenset({"sql", "bind_params"}). This provides defence-in-depth and documents the security intent for future maintainers.
- For most plugins, declare
-
fetch(ctx)must return apandas.DataFrame. The DataFrame must contain at least the columns referenced inrecipe.schema(user_column,item_column, and optionallytime_column). The training pipeline accesses those columns by name immediately after fetch — a missing column surfaces as aKeyErrorand exits the train run. -
fetch()must raiseDataSourceErrorfor any external or transient failure (auth errors, network errors, query errors, empty results).DataSourceErroris mapped to exit code 3. Any other exception surfaces as exit code 1. Wrap third-party exceptions explicitly:def fetch(self, ctx: FetchContext) -> pd.DataFrame: try: return self._do_fetch() except SomeLibraryError as exc: raise DataSourceError(str(exc)) from exc
-
Deferred imports. Do not import optional dependencies at module top-level. Defer to
__init__orfetch():def __init__(self, config: "MySource.Config") -> None: try: import my_optional_dep # noqa: F401 except ImportError as exc: raise DataSourceError( "MySource requires 'recotem[myextra]'. " "Install with: pip install 'recotem[myextra]'" ) from exc self.config = config
This ensures missing extras produce a clear
DataSourceErrormentioning the required extra by name, rather than anImportErrorwith exit code 1.
The reference plugin under examples/plugins/echo-source/ uses this layout:
recotem-echo-source/
├── pyproject.toml
└── src/
└── recotem_echo/
├── __init__.py # re-exports EchoSource so "recotem_echo:EchoSource" resolves
└── source.py # EchoSource class definition
A flatter recotem_echo/__init__.py containing the class directly also works
— what matters is that the entry-point string <module>:<class> resolves.
pyproject.toml:
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "recotem-echo-source"
version = "0.1.0"
requires-python = ">=3.12"
dependencies = ["recotem>=2.0,<3", "pandas>=2.2,<4"]
[project.entry-points."recotem.datasources"]
echo = "recotem_echo:EchoSource"
[tool.hatch.build.targets.wheel]
packages = ["src/recotem_echo"]The entry-point key (echo) is the name reported in registry log/error
messages but is not used as the discriminator — Recotem uses the loaded
class's type_name attribute. By convention, keep them the same.
uv pip install -e examples/plugins/echo-source/Verify discovery by running recotem validate against a recipe that uses the
plugin — the loader resolves source.type through the entry-point registry
and will report Unknown DataSource type 'echo' if the plugin is not
installed in the same environment as recotem.
Note:
recotem schemabuilds the JSON Schema at runtime by constructing a discriminated union of every registered DataSourceConfigclass (including plugin-provided ones) and substituting it into theRecipemodel. PluginConfigschemas do appear in the output — this is what makes IDE autocompletion work forsource.*fields. The union is assembled viabuild_source_config_union()at invocation time, so the plugin must be installed in the same Python environment asrecotem.
Recipe:
name: echo_test
source:
type: echo
n_users: 50
n_items: 100
n_rows: 500
seed: 42 # optional; omit to use the default seed
schema:
user_column: user_id
item_column: item_id
time_column: timestamp # EchoSource emits integer epoch-second timestamps
training:
algorithms: [TopPop]
metric: ndcg
cutoff: 10
n_trials: 1
output:
path: ./artifacts/echo_test.recotemTrain:
recotem train recipe.yamlFetchContext carries metadata that fetch() can optionally use:
@dataclass
class FetchContext:
recipe_name: str # the recipe's name field
run_id: str # unique ID for this training run (UUID)
extra: dict[str, Any] = field(default_factory=dict) # reserved for future useMost plugins ignore ctx. It is useful for logging and for idempotency keys when fetching from write-heavy sources.
- Synchronous, returning a single
pandas.DataFrame. Generators,Iterator[DataFrame], andasync defare not supported — the training pipeline callsfetch(ctx)directly and reads.columnsimmediately. - Whole-DataFrame in memory. Recotem trains on the full result set
(irspack constructs a sparse matrix from it). For larger-than-memory
sources, do the chunking and aggregation inside
fetch()and return a pre-aggregated DataFrame (e.g. counts of(user, item)pairs). - Credentials never come via
FetchContext.extra(it is reserved). Read them from environment variables (preferred — works with K8s Secrets, systemdEnvironmentFile, Docker--env-file) or from recipe-declaredConfigfields (but never accept secrets in YAML — reference an env var via${RECOTEM_RECIPE_*}instead).
If your plugin's recipe uses item_metadata, the metadata is loaded by
recotem.metadata.loader.load_item_metadata. Failures surface as
MetadataError (not DataSourceError) so they are distinguishable from
source-fetch failures. The exception carries a .cause attribute indicating
the failure origin:
.cause |
Meaning |
|---|---|
"http_fetch" |
HTTP/HTTPS fetch failed (SSRF guard, byte cap, sha256 mismatch). __cause__ is HttpFetchError. |
"parse" |
File could not be parsed as the declared type (CSV/Parquet). |
"field_missing" |
A required field is absent and on_field_missing="error". |
"io" |
Local or object-store read failed. |
"unknown" |
Catch-all for unexpected failures. |
The loader accepts an optional recipe_name= keyword argument. When provided,
the recipe name is threaded into HTTP fetcher log context so that redirect and
byte-cap log events (e.g. metadata_source_redirect) are correlated with the
recipe that triggered the load. This is set automatically by the watcher; you
only need it when calling load_item_metadata directly (e.g. in tests).
The plugin contract is part of the recotem 2.x public surface. Pin
recotem>=2.0,<3 in your plugin's pyproject.toml — the type_name /
Config / fetch(ctx) shape is stable within a major version. The
probe() hook may gain optional parameters in a future minor release;
use **kwargs: Any if you want to be future-proof.
The entry-point key in [project.entry-points."recotem.datasources"] is
informational only (used in error messages); the discriminator is the
class's type_name. If two installed plugins both declare
type_name = "csv", both recotem train and recotem serve exit 3 at
startup with both fully-qualified class names — uninstall one or rename
its type_name.
recotem validate recipes/my_recipe.yaml instantiates the source class
(which exercises the __init__ deferred-import / extras check) but does
not call fetch(). If the source defines an optional probe() method,
recotem validate calls it for a lightweight connectivity / auth check:
def probe(self) -> dict:
"""Optional. Called by recotem validate to test connectivity.
Should be cheap (LIMIT 1, dry-run, fs.exists, ...) — never load full data.
Raise DataSourceError on failure. Return a small status dict that
recotem validate logs (e.g. {"status": "ok", "rows_to_emit": n_rows}).
"""
...When probe() is defined, recotem validate reports DataSource: probe OK (<type_name>); when it is not, it reports DataSource: extras OK (<type_name>, no probe defined). The builtin CSVSource / ParquetSource
use fsspec exists(), and BigQuerySource uses a dry-run query job.
Test fetch() directly without the CLI:
from recotem_echo import EchoSource
from recotem.datasource.base import FetchContext
source = EchoSource(EchoSource.Config(n_users=20, n_items=50, n_rows=200))
ctx = FetchContext(recipe_name="test", run_id="abc")
df = source.fetch(ctx)
assert {"user_id", "item_id", "timestamp"}.issubset(df.columns)
assert len(df) == 200Use recotem.recipe.load_recipe in integration tests to confirm the full
YAML → Recipe → DataSource path. recipe.source is an instance of the
plugin's Config model:
from recotem.recipe import load_recipe
from recotem_echo import EchoSource
recipe = load_recipe("tests/fixtures/echo_recipe.yaml")
assert isinstance(recipe.source, EchoSource.Config)