Skip to content

Commit c5815ce

Browse files
eddietejedaclaude
andcommitted
feat(managed-databases): migrate to /v1/databases API and fix query catalog
Managed databases now use the dedicated `/v1/databases` endpoints instead of the legacy `/v1/connections` API. Fixes managed table queries by correctly using `"default"` as the SQL catalog (required by the server) while still routing info-schema API calls through the underlying `connection_id`. Changes: - http.py: add DatabasesApi; migrate create/delete/list/get to /v1/databases; add _IN_FLIGHT guard in execute_query so cancelled/timed-out runs raise immediately instead of spinning until timeout; add database_id kwarg to execute_query for X-Database-Id header support - backend.py: rewrite _resolve_managed_connection to use get_database by id + description fallback; fix _table_location to cache _database_id and _database_connection_id on resolution; add _resolve_database_connection_id helper; fix get_schema to use real connection_id when catalog == "default"; fix create_table to return "default" catalog for managed tables; thread database_id through _safe_raw_sql, to_pyarrow, and _get_schema_using_query; fix _infer_default_schema and _infer_default_connection to check cached values before making API calls; fix create_database/drop_database to use new databases API - managed.py: remove MANAGED_SOURCE_TYPE and build_managed_config (replaced by CreateDatabaseRequest) - README.md: add Managed databases section with complete working example and key points about the "default" catalog, table pre-declaration, and async sync - tests: update to match new API shapes and function signatures Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent b3d9d80 commit c5815ce

8 files changed

Lines changed: 280 additions & 146 deletions

File tree

README.md

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ con = ibis.connect(
5252

5353
**Mapping:** Ibis **catalog** = Hotdata connection id; **database** = remote schema; **table** = table name. SQL references look like `connection.schema.table`. With a single connection and schema, defaults are inferred; otherwise set `default_connection` / `default_schema` or qualify `con.table(..., database=(conn_id, schema))`.
5454

55+
> **Managed databases:** SQL and Ibis expressions against managed database tables use `"default"` as the catalog rather than the connection id. The backend resolves this automatically — see [Managed databases](#managed-databases) below.
56+
5557
**Execution:** SQL is compiled with Ibis’s **Postgres** SQLGlot compiler. The client submits queries asynchronously with `POST /v1/query`, polls `GET /v1/query-runs/{id}`, then downloads ready results as Arrow IPC from `GET /v1/results/{id}`. Tuning: `poll_interval_s`, `poll_timeout_s` on `connect()`.
5658

5759
**Types:** Typed tables come from Hotdata’s information schema. `con.sql(...)` types are inferred from a small preview query and Arrow schema; see [Hotdata SQL](https://www.hotdata.dev/docs/sql) for server behavior.
@@ -68,8 +70,8 @@ Supported today:
6870
- **SQL-backed expressions:** Ibis expressions compile with the Postgres SQLGlot compiler and execute through Hotdata. Common `SELECT` workloads such as projection, filtering, joins, grouping, aggregation, ordering, limits, scalar expressions, and `con.sql(...)` work when the generated SQL is accepted by Hotdata.
6971
- **Result materialization:** `.execute()` returns pandas objects. `.to_pyarrow()` and `.to_pyarrow_batches()` use the Arrow IPC result data exposed by Hotdata without converting through JSON rows; batches are split locally after the result is downloaded.
7072
- **Raw SQL escape hatch:** `con.sql("SELECT ...", dialect="postgres")` is the most reliable way to use Hotdata-specific federated table names or SQL that Ibis does not model directly.
71-
- **Managed database lifecycle:** `create_database("sales", schema="public", tables=["orders"])` registers a managed connection (Ibis catalog). `create_table("orders", pandas_df, database=("sales", "public"))` uploads Parquet and loads it with replace mode. Query as `sales.public.orders` in SQL. `drop_table` clears a managed table; `drop_database` deletes the connection.
72-
- **Parquet uploads:** `create_table` accepts pandas DataFrames, PyArrow tables, or schema-only empty tables. Tables must live in a managed connection — declare them with `create_database(..., tables=[...])` first. Loads always use replace mode; pass `overwrite=True` to replace an existing synced table (the default `overwrite=False` raises if the table already exists).
73+
- **Managed database lifecycle:** `create_database("sales", schema="public", tables=["orders"])` provisions a managed connection (Ibis catalog). `create_table("orders", pandas_df, database=("sales", "public"))` uploads Parquet and loads it. Query using `database=("default", "public")` or the `"default"."public"."orders"` SQL prefix. `drop_table` clears a managed table; `drop_database` deletes the connection. See [Managed databases](#managed-databases) for a complete example.
74+
- **Parquet uploads:** `create_table` accepts pandas DataFrames, PyArrow tables, or schema-only empty tables. Tables must live in a managed connection — declare them with `create_database(..., tables=[...])` first. Loads are asynchronous; poll `_managed_table_synced(conn_id, schema, table)` if you need to query immediately. Loads always use replace mode; pass `overwrite=True` to replace an existing synced table (the default `overwrite=False` raises if the table already exists).
7375

7476
Not supported as full Ibis backend features:
7577

@@ -81,6 +83,58 @@ Not supported as full Ibis backend features:
8183
- **Complete Ibis compliance:** The backend is experimental and has focused test coverage for connection, discovery, schema mapping, execution, uploads, and Arrow results. It has not yet been validated against the full Ibis backend test suite.
8284
- **Hotdata platform APIs beyond SQL and managed databases:** embeddings, indexes, query history management, sandbox lifecycle management, and other Hotdata-specific APIs are outside the Ibis backend surface.
8385

86+
## Managed databases
87+
88+
Managed databases are temporary, workspace-owned connections for uploading and querying your own data. Tables must be declared at creation time, loads are asynchronous, and SQL uses `"default"` as the catalog (not the raw connection id).
89+
90+
```python
91+
import time
92+
import ibis
93+
import pandas as pd
94+
95+
con = ibis.hotdata.connect(
96+
api_url="https://api.hotdata.dev",
97+
token="YOUR_API_TOKEN",
98+
workspace_id="ws_…",
99+
)
100+
101+
# 1. Create the managed database and declare tables upfront.
102+
# Tables must be declared here — load_managed_table rejects undeclared names.
103+
con.create_database("my-dataset", schema="public", tables=["orders"])
104+
105+
# 2. Resolve the database id + underlying connection id.
106+
db = con._resolve_managed_connection("my-dataset")
107+
db_id = db["id"] # "dbid…"
108+
conn_id = db["default_connection_id"] # "conn…"
109+
110+
# 3. Upload data (pandas DataFrame or PyArrow table).
111+
df = pd.DataFrame({"order_id": [1, 2, 3], "amount": [9.99, 49.99, 5.00]})
112+
con.create_table("orders", df, database=(db_id, "public"), overwrite=True)
113+
114+
# 4. Loads are async — wait for the table to sync before querying.
115+
while not con._managed_table_synced(conn_id, "public", "orders"):
116+
time.sleep(1)
117+
118+
# 5. Query with Ibis expressions.
119+
# Use database=("default", schema) — managed databases require "default" as the
120+
# SQL catalog; the backend resolves the underlying connection automatically.
121+
t = con.table("orders", database=("default", "public"))
122+
result = t.filter(t.amount > 10).order_by("amount").execute()
123+
124+
# 6. Or with raw SQL (same "default" catalog prefix).
125+
result = con.sql('SELECT sum(amount) AS total FROM "default"."public"."orders"').execute()
126+
127+
# 7. Clean up.
128+
con.drop_database("my-dataset")
129+
```
130+
131+
**Key points:**
132+
- `create_database(..., tables=[...])` — table names must be listed here before uploading.
133+
- `create_table(..., database=(db_id, schema))` — pass the managed database id (from `_resolve_managed_connection`) as the first element of the tuple, not the connection id.
134+
- SQL catalog is `"default"`, not the connection id — `"default"."schema"."table"` is the correct form.
135+
- After `create_table`, ibis table references automatically use `database=("default", schema)`; use the same form for subsequent `con.table(...)` calls.
136+
- Loads are asynchronous. Poll `_managed_table_synced(conn_id, schema, table)` or add a small sleep before querying.
137+
84138
## Development
85139

86140
```bash

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ classifiers = [
2424
]
2525
dependencies = [
2626
"ibis-framework>=10.0,<11",
27-
"hotdata>=0.2.0",
27+
"hotdata>=0.2.3",
2828
"pyarrow>=15",
2929
"pyarrow-hotfix>=0.6",
3030
"pandas>=2",

src/ibis_hotdata/backend.py

Lines changed: 73 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
from ibis.backends.sql import SQLBackend
4444

4545
from ibis_hotdata.http import HotdataAPIError, HotdataClient
46-
from ibis_hotdata.managed import DEFAULT_SCHEMA, MANAGED_SOURCE_TYPE
46+
from ibis_hotdata.managed import DEFAULT_SCHEMA
4747
from ibis_hotdata.types import dtype_from_hotdata_sql_type
4848

4949
_INFORMATION_SCHEMA_PAGE_SIZE = 500
@@ -144,6 +144,7 @@ def do_connect(
144144
verify_ssl: bool | str = True,
145145
default_connection: str | None = None,
146146
default_schema: str | None = None,
147+
database_id: str | None = None,
147148
poll_interval_s: float = 0.25,
148149
poll_timeout_s: float = 600.0,
149150
) -> None:
@@ -181,6 +182,9 @@ def do_connect(
181182
self.disconnect()
182183
self._default_connection = default_connection
183184
self._default_schema = default_schema
185+
self._database_id = database_id
186+
# Resolved lazily: the actual connection_id behind _database_id (for info schema API).
187+
self._database_connection_id: str | None = None
184188
self._poll_interval_s = poll_interval_s
185189
self._poll_timeout_s = poll_timeout_s
186190

@@ -200,9 +204,9 @@ def disconnect(self) -> None:
200204
# --- hierarchy ---------------------------------------------------------
201205

202206
def _infer_default_connection(self) -> str:
203-
ids = self._connection_ids()
204207
if self._default_connection is not None:
205208
return self._default_connection
209+
ids = self._connection_ids()
206210
if len(ids) == 1:
207211
self._default_connection = ids[0]
208212
return self._default_connection
@@ -212,6 +216,8 @@ def _infer_default_connection(self) -> str:
212216
)
213217

214218
def _infer_default_schema(self, connection_id: str) -> str:
219+
if self._default_schema is not None:
220+
return self._default_schema
215221
schemas = sorted(
216222
{
217223
row["schema"]
@@ -220,12 +226,6 @@ def _infer_default_schema(self, connection_id: str) -> str:
220226
)
221227
}
222228
)
223-
if self._default_schema is not None:
224-
if self._default_schema not in schemas:
225-
raise com.IbisInputError(
226-
f"Unknown schema {self._default_schema!r} for connection {connection_id!r}"
227-
)
228-
return self._default_schema
229229
if len(schemas) == 1:
230230
self._default_schema = schemas[0]
231231
return self._default_schema
@@ -331,13 +331,22 @@ def _resolve_connection(self, name_or_id: str) -> dict[str, Any]:
331331
raise com.IbisError(f"Unknown Hotdata connection {name_or_id!r}")
332332

333333
def _resolve_managed_connection(self, name_or_id: str) -> dict[str, Any]:
334-
conn = self._resolve_connection(name_or_id)
335-
if conn.get("source_type") != MANAGED_SOURCE_TYPE:
336-
raise com.IbisInputError(
337-
f"{name_or_id!r} is not a managed database "
338-
f"(source_type={conn.get('source_type')!r})"
339-
)
340-
return conn
334+
"""Resolve a managed database by id or description, returning its detail dict."""
335+
# Try direct ID lookup first
336+
try:
337+
return self._http.get_database(name_or_id)
338+
except HotdataAPIError as exc:
339+
if exc.status_code != 404:
340+
raise _ibis_err_from_hotdata(exc) from exc
341+
# Fall back to description scan
342+
data = self._http.list_databases()
343+
for db in data.get("databases", []):
344+
if db.get("description") == name_or_id:
345+
try:
346+
return self._http.get_database(db["id"])
347+
except HotdataAPIError as exc:
348+
raise _ibis_err_from_hotdata(exc) from exc
349+
raise com.IbisError(f"Unknown managed database {name_or_id!r}")
341350

342351
def _managed_table_synced(
343352
self,
@@ -380,8 +389,30 @@ def _table_location(
380389
raise com.IbisInputError(
381390
"create_table with database=schema requires default_connection or current catalog"
382391
)
383-
conn_record = self._resolve_managed_connection(conn)
384-
return conn_record["id"], schema
392+
db_record = self._resolve_managed_connection(conn)
393+
conn_id = db_record["default_connection_id"]
394+
# Keep the cached mapping in sync so get_schema can use the real connection_id
395+
# when the SQL catalog is "default" (the prefix managed databases require).
396+
self._database_id = self._database_id or db_record["id"]
397+
self._database_connection_id = conn_id
398+
return conn_id, schema
399+
400+
def _resolve_database_connection_id(self) -> str | None:
401+
"""Return the actual connection_id for the current managed database context.
402+
403+
Managed database SQL uses ``"default"`` as the catalog, but the information
404+
schema REST API still needs the real ``connection_id``. This method resolves
405+
that mapping lazily and caches the result.
406+
"""
407+
if self._database_id is None:
408+
return None
409+
if self._database_connection_id is None:
410+
try:
411+
db = self._http.get_database(self._database_id)
412+
self._database_connection_id = db.get("default_connection_id")
413+
except HotdataAPIError:
414+
pass
415+
return self._database_connection_id
385416

386417
# --- schema / sql execution --------------------------------------------
387418

@@ -394,9 +425,16 @@ def get_schema(
394425
) -> sch.Schema:
395426
conn = catalog or self.current_catalog
396427
schema_name = database or self.current_database
428+
# Managed database tables use "default" as the SQL catalog but the info
429+
# schema REST API needs the real connection_id.
430+
api_conn = (
431+
self._resolve_database_connection_id() or conn
432+
if conn == "default"
433+
else conn
434+
)
397435
matches: list[dict[str, Any]] = []
398436
for row in self._iterate_information_schema(
399-
{"connection_id": conn, "schema": schema_name, "table": table_name},
437+
{"connection_id": api_conn, "schema": schema_name, "table": table_name},
400438
include_columns=True,
401439
):
402440
if row["table"] == table_name and row["schema"] == schema_name:
@@ -420,6 +458,7 @@ def _get_schema_using_query(self, query: str) -> sch.Schema:
420458
try:
421459
data = self._http.execute_query(
422460
preview_sql,
461+
database_id=self._database_id,
423462
poll_interval_s=self._poll_interval_s,
424463
poll_timeout_s=self._poll_timeout_s,
425464
)
@@ -441,6 +480,7 @@ def _safe_raw_sql(
441480
try:
442481
payload = self._http.execute_query(
443482
query,
483+
database_id=self._database_id,
444484
poll_interval_s=self._poll_interval_s,
445485
poll_timeout_s=self._poll_timeout_s,
446486
)
@@ -470,6 +510,7 @@ def to_pyarrow(
470510
try:
471511
payload = self._http.execute_query(
472512
sql,
513+
database_id=self._database_id,
473514
poll_interval_s=self._poll_interval_s,
474515
poll_timeout_s=self._poll_timeout_s,
475516
)
@@ -525,21 +566,18 @@ def create_database(
525566
raise com.UnsupportedOperationError(
526567
"Hotdata create_database creates a managed connection (catalog); catalog= is not supported"
527568
)
569+
# Check if a database with this description already exists
570+
existing = None
528571
try:
529-
existing = self._resolve_connection(name)
572+
existing = self._resolve_managed_connection(name)
530573
except com.IbisError:
531-
existing = None
574+
pass
532575
if existing is not None:
533576
if not force:
534577
raise com.IbisInputError(f"Managed database {name!r} already exists")
535-
if existing.get("source_type") != MANAGED_SOURCE_TYPE:
536-
raise com.IbisInputError(
537-
f"{name!r} is not a managed database "
538-
f"(source_type={existing.get('source_type')!r})"
539-
)
540578
return
541579
try:
542-
self._http.create_managed_database(name, schema=schema, tables=list(tables or ()))
580+
self._http.create_managed_database(description=name, schema=schema, tables=list(tables or ()))
543581
except HotdataAPIError as exc:
544582
raise _ibis_err_from_hotdata(exc) from exc
545583

@@ -557,15 +595,15 @@ def drop_database(
557595
"Hotdata drop_database deletes a managed connection (catalog); catalog= is not supported"
558596
)
559597
try:
560-
conn = self._resolve_managed_connection(name)
598+
db = self._resolve_managed_connection(name)
561599
except com.IbisInputError:
562600
raise
563601
except com.IbisError:
564602
if force:
565603
return
566604
raise
567605
try:
568-
self._http.delete_connection(conn["id"])
606+
self._http.delete_database(db["id"])
569607
except HotdataAPIError as exc:
570608
if force and exc.status_code == 404:
571609
return
@@ -622,6 +660,9 @@ def create_table(
622660

623661
data = self._local_table_to_parquet(obj, schema)
624662
connection_id, schema_name = self._table_location(database)
663+
# Cache the resolved connection_id so get_schema can use it for info schema
664+
# API calls when the "default" catalog is used in managed database contexts.
665+
self._database_connection_id = connection_id
625666
if not overwrite and self._managed_table_synced(connection_id, schema_name, name):
626667
raise com.IbisInputError(
627668
f"Table {name!r} already exists; pass overwrite=True to replace"
@@ -636,7 +677,10 @@ def create_table(
636677
)
637678
except HotdataAPIError as exc:
638679
raise _ibis_err_from_hotdata(exc) from exc
639-
return self.table(name, database=(connection_id, schema_name))
680+
# Managed database SQL requires "default" as the catalog prefix, not the
681+
# raw connection_id. _table_location always sets _database_id when resolving
682+
# a managed connection, so we can always use the "default" catalog here.
683+
return self.table(name, database=("default", schema_name))
640684

641685
def drop_table(
642686
self,

0 commit comments

Comments
 (0)