-
Notifications
You must be signed in to change notification settings - Fork 151
feat: add CatalogProviderList support #1363
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
50974af
c9551ba
afd9d92
a617ada
eeffb72
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,13 +38,61 @@ | |
|
|
||
| __all__ = [ | ||
| "Catalog", | ||
| "CatalogList", | ||
| "CatalogProvider", | ||
| "CatalogProviderList", | ||
| "Schema", | ||
| "SchemaProvider", | ||
| "Table", | ||
| ] | ||
|
|
||
|
|
||
| class CatalogList: | ||
| """DataFusion data catalog list.""" | ||
|
|
||
| def __init__(self, catalog_list: df_internal.catalog.RawCatalogList) -> None: | ||
| """This constructor is not typically called by the end user.""" | ||
| self.catalog_list = catalog_list | ||
|
|
||
| def __repr__(self) -> str: | ||
| """Print a string representation of the catalog list.""" | ||
| return self.catalog_list.__repr__() | ||
|
|
||
| def names(self) -> set[str]: | ||
| """This is an alias for `catalog_names`.""" | ||
| return self.catalog_names() | ||
|
|
||
| def catalog_names(self) -> set[str]: | ||
| """Returns the list of schemas in this catalog.""" | ||
| return self.catalog_list.catalog_names() | ||
|
|
||
| @staticmethod | ||
| def memory_catalog(ctx: SessionContext | None = None) -> CatalogList: | ||
| """Create an in-memory catalog provider list.""" | ||
| catalog_list = df_internal.catalog.RawCatalogList.memory_catalog(ctx) | ||
| return CatalogList(catalog_list) | ||
|
|
||
| def catalog(self, name: str = "datafusion") -> Schema: | ||
| """Returns the catalog with the given ``name`` from this catalog.""" | ||
| catalog = self.catalog_list.catalog(name) | ||
|
|
||
| return ( | ||
| Catalog(catalog) | ||
| if isinstance(catalog, df_internal.catalog.RawCatalog) | ||
| else catalog | ||
| ) | ||
|
|
||
| def register_catalog( | ||
| self, | ||
| name: str, | ||
| catalog: Catalog | CatalogProvider | CatalogProviderExportable, | ||
| ) -> Catalog | None: | ||
| """Register a catalog with this catalog list.""" | ||
| if isinstance(catalog, Catalog): | ||
| return self.catalog_list.register_catalog(name, catalog.catalog) | ||
| return self.catalog_list.register_catalog(name, catalog) | ||
|
|
||
|
|
||
| class Catalog: | ||
| """DataFusion data catalog.""" | ||
|
|
||
|
|
@@ -195,6 +243,38 @@ def kind(self) -> str: | |
| return self._inner.kind | ||
|
|
||
|
|
||
| class CatalogProviderList(ABC): | ||
| """Abstract class for defining a Python based Catalog Provider List.""" | ||
|
|
||
| @abstractmethod | ||
| def catalog_names(self) -> set[str]: | ||
| """Set of the names of all catalogs in this catalog list.""" | ||
| ... | ||
|
|
||
| @abstractmethod | ||
| def catalog(self, name: str) -> Catalog | None: | ||
| """Retrieve a specific catalog from this catalog list.""" | ||
| ... | ||
|
|
||
| def register_catalog( # noqa: B027 | ||
| self, name: str, catalog: CatalogProviderExportable | CatalogProvider | Catalog | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also is the
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, especially if they want to register an in-memory catalog they created with the static method. |
||
| ) -> None: | ||
| """Add a catalog to this catalog list. | ||
|
|
||
| This method is optional. If your catalog provides a fixed list of catalogs, you | ||
| do not need to implement this method. | ||
| """ | ||
|
|
||
|
|
||
| class CatalogProviderListExportable(Protocol): | ||
| """Type hint for object that has __datafusion_catalog_provider_list__ PyCapsule. | ||
|
|
||
| https://docs.rs/datafusion/latest/datafusion/catalog/trait.CatalogProviderList.html | ||
| """ | ||
|
|
||
| def __datafusion_catalog_provider_list__(self, session: Any) -> object: ... | ||
|
|
||
|
|
||
| class CatalogProvider(ABC): | ||
| """Abstract class for defining a Python based Catalog Provider.""" | ||
|
|
||
|
|
@@ -229,6 +309,15 @@ def deregister_schema(self, name: str, cascade: bool) -> None: # noqa: B027 | |
| """ | ||
|
|
||
|
|
||
| class CatalogProviderExportable(Protocol): | ||
| """Type hint for object that has __datafusion_catalog_provider__ PyCapsule. | ||
|
|
||
| https://docs.rs/datafusion/latest/datafusion/catalog/trait.CatalogProvider.html | ||
| """ | ||
|
|
||
| def __datafusion_catalog_provider__(self, session: Any) -> object: ... | ||
|
|
||
|
|
||
| class SchemaProvider(ABC): | ||
| """Abstract class for defining a Python based Schema Provider.""" | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,11 +16,16 @@ | |
| # under the License. | ||
| from __future__ import annotations | ||
|
|
||
| from typing import TYPE_CHECKING | ||
|
|
||
| import datafusion as dfn | ||
| import pyarrow as pa | ||
| import pyarrow.dataset as ds | ||
| import pytest | ||
| from datafusion import SessionContext, Table, udtf | ||
| from datafusion import Catalog, SessionContext, Table, udtf | ||
|
|
||
| if TYPE_CHECKING: | ||
| from datafusion.catalog import CatalogProvider, CatalogProviderExportable | ||
|
|
||
|
|
||
| # Note we take in `database` as a variable even though we don't use | ||
|
|
@@ -93,6 +98,34 @@ def deregister_schema(self, name, cascade: bool): | |
| del self.schemas[name] | ||
|
|
||
|
|
||
| class CustomCatalogProviderList(dfn.catalog.CatalogProviderList): | ||
| def __init__(self): | ||
| self.catalogs = {"my_catalog": CustomCatalogProvider()} | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe this could have an additional catalog just to show it supports multiple ones.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added a different catalog type in the register test to follow. |
||
|
|
||
| def catalog_names(self) -> set[str]: | ||
| return set(self.catalogs.keys()) | ||
|
|
||
| def catalog(self, name: str) -> Catalog | None: | ||
| return self.catalogs[name] | ||
|
|
||
| def register_catalog( | ||
| self, name: str, catalog: CatalogProviderExportable | CatalogProvider | Catalog | ||
| ) -> None: | ||
| self.catalogs[name] = catalog | ||
|
|
||
|
|
||
| def test_python_catalog_provider_list(ctx: SessionContext): | ||
| ctx.register_catalog_provider_list(CustomCatalogProviderList()) | ||
|
|
||
| # Ensure `datafusion` catalog does not exist since | ||
| # we replaced the catalog list | ||
| assert ctx.catalog_names() == {"my_catalog"} | ||
|
|
||
| # Ensure registering works | ||
| ctx.register_catalog_provider("second_catalog", CustomCatalogProvider()) | ||
| assert ctx.catalog_names() == {"my_catalog", "second_catalog"} | ||
|
|
||
|
|
||
| def test_python_catalog_provider(ctx: SessionContext): | ||
| ctx.register_catalog_provider("my_catalog", CustomCatalogProvider()) | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't the return type here be
CatalogProvider?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! This should be any of the 3 returnable types, including
Catalog.