Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 48 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

Use [Ibis](https://ibis-project.org/) to create on-demand databases, upload data, and query with Python expressions — get pandas or Arrow results back without writing SQL.

**Requirements:** Python 3.10+, **ibis-framework** 10.x, **hotdata** ≥0.2.3.
**Requirements:** Python 3.10+, **ibis-framework** ≥10,<11, **hotdata** ≥0.2.3.

## Install

Expand All @@ -25,22 +25,22 @@ con = ibis.hotdata.connect(
)

# 1. Create a database and declare the tables you'll load
con.create_database("sales", schema="public", tables=["orders"])
con.create_database("sales", tables=["orders"])

# 2. Upload a pandas DataFrame (or PyArrow table)
df = pd.DataFrame({
"order_id": [1, 2, 3],
"amount": [9.99, 49.99, 5.00],
"region": ["west", "east", "west"],
})
con.create_table("orders", df, database=("sales", "public"), overwrite=True)
con.create_table("orders", df, database=("sales", "main"), overwrite=True)

# 3. Uploads are async — wait briefly before querying
time.sleep(2)

# 4. Query with Ibis expressions
# Managed tables are always accessed with catalog "default"
t = con.table("orders", database=("default", "public"))
t = con.table("orders", database=("default", "main"))
result = (
t.group_by("region")
.agg(total=t.amount.sum())
Expand All @@ -49,7 +49,7 @@ result = (
)

# 5. Clean up
con.drop_table("orders", database=("sales", "public"))
con.drop_table("orders", database=("sales", "main"))
con.drop_database("sales")
```

Expand All @@ -60,13 +60,26 @@ con = ibis.hotdata.connect(
api_url="https://api.hotdata.dev",
token="YOUR_API_KEY",
workspace_id="ws_...",
# optional
session_id=None, # sandbox id (X-Session-Id header)
timeout=120.0, # per-request HTTP timeout in seconds
verify_ssl=True, # False to skip TLS verification, or path to CA bundle
default_connection=None, # default catalog (connection id); auto-detected if only one exists
default_schema=None, # default schema; auto-detected if only one exists
database_id=None, # bind an existing managed database id at connect time
poll_interval_s=0.25, # polling interval for async queries
poll_timeout_s=600.0, # max time to wait for a query result
)
```

URL-style also works:
URL-style also works, with the same parameters as query string keys:

```python
con = ibis.connect("hotdata://api.hotdata.dev/?token=...&workspace_id=ws_...")
con = ibis.connect(
"hotdata://api.hotdata.dev/"
"?token=...&workspace_id=ws_..."
"&default_connection=my_conn&default_schema=public"
)
```

## Managed databases
Expand All @@ -77,25 +90,33 @@ Managed databases are the primary way to bring data into Hotdata with Ibis. Decl

```python
# Declare the database and all table names up front
con.create_database("analytics", schema="public", tables=["events", "users"])
con.create_database("analytics", tables=["events", "users"])

# Upload from a pandas DataFrame
con.create_table("events", events_df, database=("analytics", "public"), overwrite=True)
con.create_table("events", events_df, database=("analytics", "main"), overwrite=True)

# PyArrow tables also work
import pyarrow as pa
table = pa.table({"id": [1, 2], "name": ["alice", "bob"]})
con.create_table("users", table, database=("analytics", "public"), overwrite=True)
con.create_table("users", table, database=("analytics", "main"), overwrite=True)

# Schema-only (no data): creates an empty table with the declared schema
import ibis.expr.schema as sch
con.create_table(
"staging",
schema=sch.Schema({"id": "int64", "ts": "timestamp"}),
database=("analytics", "main"),
)
```

Table names must be declared when the database is created — you cannot add new table names later without recreating the database.
Table names must be declared when the database is created — you cannot upload to a table name that was not listed in `tables=`.

### Query

When querying, use `"default"` as the catalog:

```python
t = con.table("events", database=("default", "public"))
t = con.table("events", database=("default", "main"))

result = (
t.filter(t.event_type == "click")
Expand All @@ -110,17 +131,22 @@ Or with raw SQL:
```python
result = con.sql(
'SELECT user_id, COUNT(*) AS n '
'FROM "default"."public"."events" '
'FROM "default"."main"."events" '
'WHERE event_type = \'click\' '
'GROUP BY user_id'
).execute()
```

### Delete

Pass `force=True` to silently skip errors when the database or table does not exist:

```python
con.drop_table("events", database=("analytics", "public"))
con.drop_table("events", database=("analytics", "main"))
con.drop_table("events", database=("analytics", "main"), force=True) # no-op if missing

con.drop_database("analytics")
con.drop_database("analytics", force=True) # no-op if missing
```

### Addressing summary
Expand All @@ -135,7 +161,7 @@ con.drop_database("analytics")
### Ibis expressions

```python
t = con.table("orders", database=("default", "public"))
t = con.table("orders", database=("default", "main"))

summary = (
t.filter(t.amount > 10)
Expand All @@ -146,13 +172,13 @@ summary = (
)
```

`.execute()` returns a **pandas DataFrame**. Use `.to_pyarrow()` for an Arrow table or `.to_pyarrow_batches()` to stream batches without materializing the full result.
`.execute()` returns a **pandas DataFrame**. `.to_pyarrow()` returns an Arrow table. `.to_pyarrow_batches()` returns a `RecordBatchReader` — note that Hotdata returns a single Arrow IPC payload per query, so this method downloads the full result first and then splits it into local batches.

### Raw SQL

```python
base = con.sql(
'SELECT * FROM "default"."public"."orders"',
'SELECT * FROM "default"."main"."orders"',
dialect="postgres",
)
result = base.filter(base.amount > 10).execute()
Expand Down Expand Up @@ -189,17 +215,20 @@ con.list_tables(database=("my_postgres", "public")) # tables
| Feature | Status |
|---------|--------|
| `create_database` / `drop_database` (managed) | ✅ |
| `create_table` / `drop_table` (DataFrame or Arrow upload) | ✅ |
| `create_table` from pandas / PyArrow / schema-only | ✅ |
| `drop_table` | ✅ |
| `con.table(...)` with full schema metadata | ✅ |
| Ibis expressions: filter, select, join, group\_by, agg, order\_by, limit | ✅ |
| `con.sql(...)` raw SQL | ✅ |
| `.execute()` → pandas, `.to_pyarrow()`, `.to_pyarrow_batches()` | ✅ |
| `list_catalogs`, `list_databases`, `list_tables` | ✅ |
| Arrow / Parquet column types (timestamp, decimal, list, duration, …) | ✅ |
| Temporary tables | ❌ |
| In-memory tables (`ibis.memtable(...)`) | ❌ |
| Python UDFs | ❌ |
| INSERT / UPDATE / DELETE on external connections | ❌ |

SQL compilation uses Ibis's Postgres dialect. Use `con.sql(...)` as a fallback for expressions that don't compile cleanly.
SQL compilation uses Ibis's Postgres dialect. Column types returned by Hotdata's information schema are resolved via PyArrow's type system, so Parquet-loaded tables with Arrow-native types (timestamps with time zones, decimals, lists, durations) are mapped correctly to Ibis types.

## Development

Expand Down
15 changes: 8 additions & 7 deletions src/ibis_hotdata/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
from ibis.backends.sql import SQLBackend

from ibis_hotdata.http import HotdataAPIError, HotdataClient
from ibis_hotdata.managed import DEFAULT_SCHEMA
from ibis_hotdata.types import dtype_from_hotdata_sql_type

_INFORMATION_SCHEMA_PAGE_SIZE = 500
Expand Down Expand Up @@ -203,7 +202,7 @@ def do_connect(
)

def disconnect(self) -> None:
if getattr(self, "_http", None) is not None:
if hasattr(self, "_http"):
self._http.close()

# --- hierarchy ---------------------------------------------------------
Expand Down Expand Up @@ -253,10 +252,12 @@ def _to_catalog_db_tuple(self, table_loc: sge.Table):
"""Use the compiler SQL dialect when stringifying qualifiers (backend name is not a dialect)."""

dialect = self.dialect
if (sg_cat := table_loc.args["catalog"]) is not None:
sg_cat = table_loc.args["catalog"]
if sg_cat is not None:
sg_cat.args["quoted"] = False
sg_cat = sg_cat.sql(dialect=dialect)
if (sg_db := table_loc.args["db"]) is not None:
sg_db = table_loc.args["db"]
if sg_db is not None:
sg_db.args["quoted"] = False
sg_db = sg_db.sql(dialect=dialect)

Expand Down Expand Up @@ -429,7 +430,7 @@ def _resolve_database_connection_id(self) -> str | None:
db = self._http.get_database(self._database_id)
self._database_connection_id = db.get("default_connection_id")
except HotdataAPIError:
pass
pass # best-effort: if the lookup fails, callers fall back to the catalog name
return self._database_connection_id

# --- schema / sql execution --------------------------------------------
Expand Down Expand Up @@ -575,7 +576,7 @@ def create_database(
/,
*,
catalog: str | None = None,
schema: str = DEFAULT_SCHEMA,
schema: str = "main",
tables: Sequence[str] | None = None,
force: bool = False,
) -> None:
Expand Down Expand Up @@ -722,7 +723,7 @@ def drop_table(
raise _ibis_err_from_hotdata(exc) from exc

def _register_in_memory_table(self, _op: ops.InMemoryTable) -> None:
return
pass # Hotdata has no local in-memory table concept; Ibis calls this hook before execute

@cached_property
def version(self) -> str:
Expand Down
22 changes: 11 additions & 11 deletions src/ibis_hotdata/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import http
import io
import json
import time
Expand Down Expand Up @@ -30,15 +31,14 @@
from hotdata.models.database_default_table_decl import DatabaseDefaultTableDecl
from hotdata.models.load_managed_table_request import LoadManagedTableRequest

from ibis_hotdata.managed import DEFAULT_SCHEMA

T = TypeVar("T")

# Matches Hotdata / runtimedb ``GET /v1/results/{{id}}`` Arrow responses.
APPLICATION_ARROW_STREAM = "application/vnd.apache.arrow.stream"

# Statuses that mean the query run is still in progress.
_IN_FLIGHT = {"running", "queued", "pending"}
# runtimedb QueryRunStatus only emits "running", "succeeded", "failed".
_IN_FLIGHT = {"running"}


def _sleep_until(deadline: float, interval: float) -> None:
Expand Down Expand Up @@ -197,7 +197,7 @@ def create_managed_database(
self,
description: str | None = None,
*,
schema: str = DEFAULT_SCHEMA,
schema: str = "public",
tables: Sequence[str] = (),
) -> dict[str, Any]:
"""POST ``/v1/databases`` — creates a managed database with an auto-provisioned default catalog."""
Expand Down Expand Up @@ -264,27 +264,27 @@ def _poll_result_arrow(
status = raw.status
ctype = (raw.headers.get("Content-Type") or "").split(";")[0].strip().lower()

if status == 200 and ctype == APPLICATION_ARROW_STREAM.lower():
if status == http.HTTPStatus.OK and ctype == APPLICATION_ARROW_STREAM.lower():
table = _ipc_stream_bytes_to_table(body)
return self._arrow_payload_from_table(table, result_id=result_id)

if status == 202:
if status == http.HTTPStatus.ACCEPTED:
_sleep_until(deadline, poll_interval_s)
continue

if status == 409:
if status == http.HTTPStatus.CONFLICT:
d = _json_utf8(body) if body else {}
raise HotdataAPIError(
d.get("error_message") or "Result failed",
status_code=409,
status_code=http.HTTPStatus.CONFLICT,
body=d,
)

if status == 404:
if status == http.HTTPStatus.NOT_FOUND:
d = _json_utf8(body) if body else {}
raise HotdataAPIError(
d.get("detail") or f"Result {result_id!r} not found",
status_code=404,
status_code=http.HTTPStatus.NOT_FOUND,
body=d,
)

Expand All @@ -304,7 +304,7 @@ def _arrow_payload_from_table(
) -> dict[str, Any]:
sch = table.schema
columns = sch.names
nullable = [sch.field(i).nullable for i in range(len(columns))]
nullable = [field.nullable for field in sch]
return {
"format": "arrow",
"pa_table": table,
Expand Down
5 changes: 0 additions & 5 deletions src/ibis_hotdata/managed.py

This file was deleted.

Loading
Loading