Skip to content

feat: register python data sources#6936

Open
helmanofer wants to merge 4 commits into
Eventual-Inc:mainfrom
helmanofer:codex/register-datasource-api-gmail
Open

feat: register python data sources#6936
helmanofer wants to merge 4 commits into
Eventual-Inc:mainfrom
helmanofer:codex/register-datasource-api-gmail

Conversation

@helmanofer
Copy link
Copy Markdown
Contributor

@helmanofer helmanofer commented May 14, 2026

Summary

  • add a session-scoped Python DataSource registry
  • expose daft.data_sources.register(...) and daft.read_source(...)
  • document the registry flow in the custom connector guide and API refs

Validation

  • .venv/bin/ruff check daft/session.py daft/__init__.py daft/data_sources.py tests/io/test_data_source_registry.py
  • DAFT_RUNNER=native make test EXTRA_ARGS="-q tests/io/test_data_source_registry.py"

helmanofer and others added 2 commits May 14, 2026 12:29
- Skip throwaway construction when constructor requires args
- Wrap any init failure into a clear ValueError
- Hoist _RegisteredDataSource to a lazy module-level class
- Document auto-instantiation in register_data_source docstring

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@github-actions github-actions Bot added the feat label May 14, 2026
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 14, 2026

Greptile Summary

This PR adds a session-scoped Python DataSource registry, exposing daft.data_sources.register/unregister/get/list/read and the top-level daft.read_source, so users can give custom data sources a name and look them up later without holding a direct reference to the class.

  • Session gains a _data_sources: dict[str, type[DataSource]] store and four new methods; a lazy _RegisteredDataSource wrapper ensures the registered name is what the engine sees at scan time regardless of how the instance property is implemented.
  • _data_source_registration_name handles three name-resolution cases (explicit name=, instance @property, classmethod/static) with a signature-inspection guard that raises early when a property-based name would require constructing an instance that needs arguments.
  • Tests cover the main happy paths and error paths; docs are updated for the new workflow.

Confidence Score: 4/5

Safe to merge; the new code is well-tested, additive, and isolated to the registry module and Session class.

The core registration and read path works correctly and is well-tested. The two findings are both style-level: inline imports in data_sources.py violate the team's import-placement rule, and naming a module function list shadows the builtin. Neither affects correctness.

daft/data_sources.py — inline imports and the list naming collision are worth cleaning up before the API stabilises.

Important Files Changed

Filename Overview
daft/data_sources.py New module exposing session-scoped DataSource registry; five thin wrapper functions each repeat an inline import and list shadows the Python builtin.
daft/session.py Adds _data_sources dict to Session, registration/read helpers, and a lazy _RegisteredDataSource wrapper class; inline import in register_data_source is justified by the TYPE_CHECKING guard.
daft/init.py Re-exports read_source and data_sources module; changes are mechanical and correct.
tests/io/test_data_source_registry.py Good coverage of registration, duplicate detection, property-name auto-detection, required-arg error, failing-init wrapping, unregister, and session scoping.
docs/connectors/custom.md Updates custom connector guide to reflect async API, RecordBatch, and the new register/read_source workflow.

Sequence Diagram

sequenceDiagram
    participant User
    participant ds as daft.data_sources
    participant sess as Session
    participant reg as _data_sources dict
    participant src as DataSource class
    participant wrap as _RegisteredDataSource

    User->>ds: "register(MySource, name=my_source)"
    ds->>sess: current_session().register_data_source(MySource)
    sess->>sess: _data_source_registration_name(MySource)
    sess->>reg: "_data_sources[my_source] = MySource"

    User->>ds: "read_source(my_source, **options)"
    ds->>sess: "_session().read_source(my_source, **options)"
    sess->>reg: get_data_source(my_source) returns MySource class
    sess->>src: "MySource(**options) returns instance"
    sess->>wrap: _RegisteredDataSource(my_source, instance)
    wrap->>wrap: .read() via ScanOperatorHandle
    wrap-->>User: DataFrame
Loading

Reviews (1): Last reviewed commit: "refactor(session): polish data source re..." | Re-trigger Greptile

Comment thread daft/data_sources.py
Comment thread daft/data_sources.py Outdated
Comment thread daft/data_sources.py
Comment on lines +1 to +43
from __future__ import annotations

from typing import TYPE_CHECKING, Any

from daft.session import current_session

if TYPE_CHECKING:
from daft.dataframe import DataFrame
from daft.io.source import DataSource


def register(data_source: type[DataSource], *, name: str | None = None, replace: bool = False) -> None:
"""Register a Python ``DataSource`` class on the current session."""
current_session().register_data_source(data_source, name=name, replace=replace)


def unregister(name: str) -> None:
"""Remove a registered Python ``DataSource`` from the current session."""
current_session().unregister_data_source(name)


def get(name: str) -> type[DataSource]:
"""Return a registered Python ``DataSource`` class."""
return current_session().get_data_source(name)


def list_sources() -> list[str]:
"""List registered Python ``DataSource`` names."""
return current_session().list_data_sources()


def read(name: str, **options: Any) -> DataFrame:
"""Read a registered Python ``DataSource`` by name."""
return current_session().read_source(name, **options)


__all__ = [
"get",
"list_sources",
"read",
"register",
"unregister",
]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can delete this file, only use the session methods.

Comment thread daft/session.py
Comment on lines +764 to +804
def register_data_source(
self,
data_source: type[DataSource],
*,
name: str | None = None,
replace: bool = False,
) -> None:
"""Register a Python ``DataSource`` class with this session.

If ``name`` is omitted and the class declares ``name`` as an instance
``@property``, the class is instantiated with no arguments to read it.
Pass ``name=`` explicitly to skip that construction (e.g. when the
constructor has side effects or requires arguments).
"""
from daft.io.source import DataSource

if not isinstance(data_source, type) or not issubclass(data_source, DataSource):
raise TypeError(f"Expected a DataSource class, got {data_source!r}")

source_name = _data_source_registration_name(data_source, name)
if not replace and source_name in self._data_sources:
raise ValueError(f"DataSource {source_name!r} is already registered")
self._data_sources[source_name] = data_source

def unregister_data_source(self, name: str) -> None:
"""Remove a registered Python ``DataSource`` from this session."""
try:
del self._data_sources[name]
except KeyError as e:
raise ValueError(f"DataSource {name!r} is not registered") from e

def get_data_source(self, name: str) -> type[DataSource]:
"""Return a registered Python ``DataSource`` class."""
try:
return self._data_sources[name]
except KeyError as e:
raise ValueError(f"DataSource {name!r} is not registered") from e

def list_data_sources(self) -> list[str]:
"""List registered Python ``DataSource`` names."""
return sorted(self._data_sources)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please match the existing conventions of attach/detach

Comment thread daft/session.py
Comment on lines +126 to +160
def _registered_data_source_cls() -> type[DataSource]:
global _RegisteredDataSourceCls
if _RegisteredDataSourceCls is not None:
return _RegisteredDataSourceCls

from daft.io.source import DataSource

class _RegisteredDataSource(DataSource):
def __init__(self, registered_name: str, wrapped: DataSource) -> None:
self._registered_name = registered_name
self._wrapped = wrapped

@property
def name(self) -> str:
return self._registered_name

@property
def schema(self) -> Schema:
return self._wrapped.schema

def get_partition_fields(self) -> list[PartitionField]:
return self._wrapped.get_partition_fields()

async def get_tasks(self, pushdowns: Pushdowns) -> AsyncIterator[DataSourceTask]:
async for task in self._wrapped.get_tasks(pushdowns):
yield task

_RegisteredDataSourceCls = _RegisteredDataSource
return _RegisteredDataSourceCls


def _read_registered_data_source(name: str, inner: DataSource) -> DataFrame:
registered_data_source_cls: Any = _registered_data_source_cls()
return registered_data_source_cls(name, inner).read()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of this should be in the rust session, and we shouldn't have any registration code in python

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants