Skip to content

Commit b2aab77

Browse files
authored
Pull out scalar UDTF code into snippets (#202)
1 parent 18ba1ac commit b2aab77

11 files changed

Lines changed: 786 additions & 633 deletions

docs/geneva/udfs/index.mdx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ Geneva provides three types of user-defined functions for transforming data. Eac
2222
| **Refresh** | Incremental | Incremental | Full |
2323
| **Parallelism** | Fragment-parallel | Fragment-parallel | Partition-parallel |
2424
| **Inherited columns** | N/A — adds to existing rows | Automatic from query | Independent output schema |
25-
| **Registration** | `table.add_columns()` | `db.create_materialized_view(udtf=)` | `db.create_udtf_view()` |
25+
| **Registration** | [`table.add_columns()`](https://lancedb.github.io/geneva/api/table/#geneva.table.Table.add_columns) | [`db.create_scalar_udtf_view()`](https://lancedb.github.io/geneva/api/connection/#geneva.db.Connection.create_scalar_udtf_view) | [`db.create_udtf_view()`](https://lancedb.github.io/geneva/api/connection/#geneva.db.Connection.create_udtf_view) |
2626

2727
## UDFs (1:1)
2828

docs/geneva/udfs/scalar-udtfs.mdx

Lines changed: 55 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,17 @@ description: Use scalar UDTFs for 1:N row expansion — split videos into clips,
55
icon: diagram-subtask
66
---
77

8+
import {
9+
PyScalarUdtfIterator,
10+
PyScalarUdtfList,
11+
PyScalarUdtfBatch,
12+
PyCreateScalarUdtfView,
13+
PyAddColumnsScalarUdtf,
14+
PyIncrementalRefresh,
15+
PyChainingUdtfViews,
16+
PyDocumentChunkingFull,
17+
} from '/snippets/geneva_scalar_udtfs.mdx';
18+
819
<Badge>Beta — introduced in Geneva 0.11.0</Badge>
920

1021
Standard UDFs produce exactly **one output value per input row**. Scalar UDTFs enable **1:N row expansion** — each source row can produce multiple output rows. The results are stored as a materialized view with MV-style incremental refresh.
@@ -19,24 +30,11 @@ Standard UDFs produce exactly **one output value per input row**. Scalar UDTFs e
1930

2031
Use the `@scalar_udtf` decorator on a function that **yields** output rows. Geneva infers the output schema from the return type annotation.
2132

22-
```python
23-
from geneva import scalar_udtf
24-
from typing import Iterator, NamedTuple
25-
26-
class Clip(NamedTuple):
27-
clip_start: float
28-
clip_end: float
29-
clip_bytes: bytes
30-
31-
@scalar_udtf
32-
def extract_clips(video_path: str, duration: float) -> Iterator[Clip]:
33-
"""Yields multiple clips per video."""
34-
clip_length = 10.0
35-
for start in range(0, int(duration), int(clip_length)):
36-
end = min(start + clip_length, duration)
37-
clip_data = extract_video_segment(video_path, start, end)
38-
yield Clip(clip_start=start, clip_end=end, clip_bytes=clip_data)
39-
```
33+
<CodeGroup>
34+
<CodeBlock filename="Python" language="python" icon="python">
35+
{PyScalarUdtfIterator}
36+
</CodeBlock>
37+
</CodeGroup>
4038

4139
Input parameters are bound to source columns **by name** — the parameter `video_path` binds to source column `video_path`, just like standard UDFs.
4240

@@ -48,61 +46,34 @@ A scalar UDTF can yield **zero rows** for a source row. The source row is still
4846

4947
If you prefer to build the full list in memory rather than yielding, you can return a `list` instead of an `Iterator`:
5048

51-
```python
52-
@scalar_udtf
53-
def extract_clips(video_path: str, duration: float) -> list[Clip]:
54-
clips = []
55-
for start in range(0, int(duration), 10):
56-
end = min(start + 10, duration)
57-
clips.append(Clip(clip_start=start, clip_end=end, clip_bytes=b"..."))
58-
return clips
59-
```
49+
<CodeGroup>
50+
<CodeBlock filename="Python" language="python" icon="python">
51+
{PyScalarUdtfList}
52+
</CodeBlock>
53+
</CodeGroup>
6054

6155
### Batched scalar UDTF
6256

63-
For vectorized processing, use `batch=True`. The function receives Arrow arrays and returns a `RecordBatch` of expanded rows:
57+
For vectorized processing, use `batch=True`. The function receives Arrow arrays and returns a `RecordBatch` of expanded rows. Because the return type `pa.RecordBatch` cannot be inferred, you must supply `output_schema` explicitly:
6458

65-
```python
66-
@scalar_udtf(batch=True)
67-
def extract_clips(batch: pa.RecordBatch) -> pa.RecordBatch:
68-
"""Process rows in batches. Same 1:N semantic per row."""
69-
...
70-
```
59+
<CodeGroup>
60+
<CodeBlock filename="Python" language="python" icon="python">
61+
{PyScalarUdtfBatch}
62+
</CodeBlock>
63+
</CodeGroup>
7164

7265
## Creating a Scalar UDTF View
7366

74-
Scalar UDTFs use the existing `create_materialized_view` API with a `udtf=` parameter:
75-
76-
```python
77-
import geneva
78-
79-
db = geneva.connect("/data/mydb")
80-
videos = db.open_table("videos")
67+
Scalar UDTFs use the `create_scalar_udtf_view` API:
8168

82-
# Create the 1:N materialized view
83-
clips = db.create_materialized_view(
84-
"clips",
85-
query=videos.search(None).select(["video_path", "metadata"]),
86-
udtf=extract_clips,
87-
)
88-
89-
# Populate — runs the UDTF on every source row
90-
clips.refresh()
91-
```
69+
<CodeGroup>
70+
<CodeBlock filename="Python" language="python" icon="python">
71+
{PyCreateScalarUdtfView}
72+
</CodeBlock>
73+
</CodeGroup>
9274

9375
The `query` parameter controls which source columns are inherited. Columns listed in `.select()` are carried into every child row automatically.
9476

95-
### Inheriting source columns
96-
97-
```python
98-
# Only video_path and metadata are inherited into the clips table
99-
clips = db.create_materialized_view(
100-
"clips",
101-
query=videos.search(None).select(["video_path", "metadata"]),
102-
udtf=extract_clips,
103-
)
104-
```
105-
10677
## Inherited Columns
10778

10879
Child rows automatically include the parent's columns — no manual join required. The columns available in the child table are determined by the query's `.select()`:
@@ -131,17 +102,11 @@ The first three rows come from the `/v/a.mp4` source row, the last two from `/v/
131102

132103
Since scalar UDTF views are materialized views, you can add UDF-computed columns to the child table and backfill them:
133104

134-
```python
135-
@udf(data_type=pa.list_(pa.float32(), 512))
136-
def clip_embedding(clip_bytes: bytes) -> list[float]:
137-
return embed_model.encode(clip_bytes)
138-
139-
# Add an embedding column to the clips table
140-
clips.add_columns({"embedding": clip_embedding})
141-
142-
# Backfill computes embeddings for all existing clips
143-
clips.backfill("embedding")
144-
```
105+
<CodeGroup>
106+
<CodeBlock filename="Python" language="python" icon="python">
107+
{PyAddColumnsScalarUdtf}
108+
</CodeBlock>
109+
</CodeGroup>
145110

146111
This is a powerful pattern: expand source rows with a scalar UDTF, then enrich the expanded rows with standard UDFs.
147112

@@ -153,77 +118,34 @@ Scalar UDTFs support **incremental refresh**, just like standard materialized vi
153118
- **Deleted source rows**: Child rows linked to the deleted parent are cascade-deleted.
154119
- **Updated source rows**: Old children are deleted, UDTF re-runs, new children inserted.
155120

156-
```python
157-
# Add new videos to the source table
158-
videos.add(new_video_data)
159-
160-
# Incremental refresh — only processes the new videos
161-
clips.refresh()
162-
```
121+
<CodeGroup>
122+
<CodeBlock filename="Python" language="python" icon="python">
123+
{PyIncrementalRefresh}
124+
</CodeBlock>
125+
</CodeGroup>
163126

164127
Only the new source rows are processed. Existing clips from previous refreshes are untouched.
165128

166129
## Chaining UDTF Views
167130

168131
Scalar UDTF views are standard materialized views, so they can serve as the source for further views:
169132

170-
```python
171-
# videos → clips (1:N)
172-
clips = db.create_materialized_view(
173-
"clips", query=videos.search(None), udtf=extract_clips
174-
)
175-
176-
# clips → frames (1:N)
177-
frames = db.create_materialized_view(
178-
"frames", query=clips.search(None), udtf=extract_frames
179-
)
180-
```
133+
<CodeGroup>
134+
<CodeBlock filename="Python" language="python" icon="python">
135+
{PyChainingUdtfViews}
136+
</CodeBlock>
137+
</CodeGroup>
181138

182139
## Full Example: Document Chunking
183140

184-
```python
185-
from geneva import connect, scalar_udtf, udf
186-
from typing import Iterator, NamedTuple
187-
import pyarrow as pa
188-
189-
class Chunk(NamedTuple):
190-
chunk_index: int
191-
chunk_text: str
192-
193-
@scalar_udtf
194-
def chunk_document(text: str) -> Iterator[Chunk]:
195-
"""Split a document into overlapping chunks."""
196-
words = text.split()
197-
chunk_size = 500
198-
overlap = 50
199-
for i, start in enumerate(range(0, len(words), chunk_size - overlap)):
200-
chunk_words = words[start:start + chunk_size]
201-
yield Chunk(chunk_index=i, chunk_text=" ".join(chunk_words))
202-
203-
db = connect("/data/mydb")
204-
docs = db.open_table("documents")
205-
206-
# Create chunked view — inherits doc_id, title, etc. from source
207-
chunks = db.create_materialized_view(
208-
"doc_chunks",
209-
query=docs.search(None).select(["doc_id", "title", "text"]),
210-
udtf=chunk_document,
211-
)
212-
chunks.refresh()
213-
214-
# Add embeddings to chunks for semantic search
215-
@udf(data_type=pa.list_(pa.float32(), 1536))
216-
def embed_text(chunk_text: str) -> list[float]:
217-
return embedding_model.encode(chunk_text)
218-
219-
chunks.add_columns({"embedding": embed_text})
220-
chunks.backfill("embedding") # Backfills embeddings on all existing chunks
221-
222-
# Query — parent columns available alongside chunk columns
223-
chunks.search(None).select(["doc_id", "title", "chunk_text", "embedding"]).to_pandas()
224-
```
141+
<CodeGroup>
142+
<CodeBlock filename="Python" language="python" icon="python">
143+
{PyDocumentChunkingFull}
144+
</CodeBlock>
145+
</CodeGroup>
225146

226147
For a comparison of all three function types (UDFs, Scalar UDTFs, Batch UDTFs), see [Understanding Transforms](/geneva/udfs).
227148

228149
Reference:
229-
* [`create_materialized_view` API](https://lancedb.github.io/geneva/api/connection/#geneva.db.Connection.create_materialized_view)
150+
* [`scalar_udtf` API](https://lancedb.github.io/geneva/api/udtf/#geneva.scalar_udtf)
151+
* [`create_scalar_udtf_view` API](https://lancedb.github.io/geneva/api/connection/#geneva.db.Connection.create_scalar_udtf_view)
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
{/* Auto-generated by scripts/mdx_snippets_gen.py. Do not edit manually. */}
2+
3+
export const PyAddColumnsScalarUdtf = "@udf(data_type=pa.list_(pa.float32(), 512))\ndef clip_embedding(clip_bytes: bytes) -> list[float]:\n return embed_model.encode(clip_bytes)\n\n# Add an embedding column to the clips table\nclips.add_columns({\"embedding\": clip_embedding})\n\n# Backfill computes embeddings for all existing clips\nclips.backfill(\"embedding\")\n";
4+
5+
export const PyChainingUdtfViews = "# videos → clips (1:N)\nclips = db.create_scalar_udtf_view(\n \"clips\", source=videos.search(None), scalar_udtf=extract_clips\n)\n\n# clips → frames (1:N)\nframes = db.create_scalar_udtf_view(\n \"frames\", source=clips.search(None), scalar_udtf=extract_frames\n)\n";
6+
7+
export const PyCreateScalarUdtfView = "import geneva\n\ndb = geneva.connect(\"/data/mydb\")\nvideos = db.open_table(\"videos\")\n\n# Create the 1:N materialized view\nclips = db.create_scalar_udtf_view(\n \"clips\",\n source=videos.search(None).select([\"video_path\", \"metadata\"]),\n scalar_udtf=extract_clips,\n)\n\n# Populate — runs the UDTF on every source row\nclips.refresh()\n";
8+
9+
export const PyDocumentChunkingFull = "from geneva import connect, scalar_udtf, udf\nfrom typing import Iterator, NamedTuple\nimport pyarrow as pa\n\nclass Chunk(NamedTuple):\n chunk_index: int\n chunk_text: str\n\n@scalar_udtf\ndef chunk_document(text: str) -> Iterator[Chunk]:\n \"\"\"Split a document into overlapping chunks.\"\"\"\n words = text.split()\n chunk_size = 500\n overlap = 50\n for i, start in enumerate(range(0, len(words), chunk_size - overlap)):\n chunk_words = words[start:start + chunk_size]\n yield Chunk(chunk_index=i, chunk_text=\" \".join(chunk_words))\n\ndb = connect(\"/data/mydb\")\ndocs = db.open_table(\"documents\")\n\n# Create chunked view — inherits doc_id, title, etc. from source\nchunks = db.create_scalar_udtf_view(\n \"doc_chunks\",\n source=docs.search(None).select([\"doc_id\", \"title\", \"text\"]),\n scalar_udtf=chunk_document,\n)\nchunks.refresh()\n\n# Add embeddings to chunks for semantic search\n@udf(data_type=pa.list_(pa.float32(), 1536))\ndef embed_text(chunk_text: str) -> list[float]:\n return embedding_model.encode(chunk_text)\n\nchunks.add_columns({\"embedding\": embed_text})\nchunks.backfill(\"embedding\") # Backfills embeddings on all existing chunks\n\n# Query — parent columns available alongside chunk columns\nchunks.search(None).select([\"doc_id\", \"title\", \"chunk_text\", \"embedding\"]).to_pandas()\n";
10+
11+
export const PyDocumentChunkingUdtf = "from geneva import scalar_udtf\nfrom typing import Iterator, NamedTuple\n\nclass Chunk(NamedTuple):\n chunk_index: int\n chunk_text: str\n\n@scalar_udtf\ndef chunk_document(text: str) -> Iterator[Chunk]:\n \"\"\"Split a document into overlapping chunks.\"\"\"\n words = text.split()\n chunk_size = 500\n overlap = 50\n for i, start in enumerate(range(0, len(words), chunk_size - overlap)):\n chunk_words = words[start:start + chunk_size]\n yield Chunk(chunk_index=i, chunk_text=\" \".join(chunk_words))\n";
12+
13+
export const PyIncrementalRefresh = "# Add new videos to the source table\nvideos.add(new_video_data)\n\n# Incremental refresh — only processes the new videos\nclips.refresh()\n";
14+
15+
export const PyScalarUdtfBatch = "@scalar_udtf(batch=True, output_schema=clip_schema)\ndef extract_clips(batch: pa.RecordBatch) -> pa.RecordBatch:\n \"\"\"Process rows in batches. Same 1:N semantic per row.\"\"\"\n ...\n";
16+
17+
export const PyScalarUdtfIterator = "from geneva import scalar_udtf\nfrom typing import Iterator, NamedTuple\n\nclass Clip(NamedTuple):\n clip_start: float\n clip_end: float\n clip_bytes: bytes\n\n@scalar_udtf\ndef extract_clips(video_path: str, duration: float) -> Iterator[Clip]:\n \"\"\"Yields multiple clips per video.\"\"\"\n clip_length = 10.0\n for start in range(0, int(duration), int(clip_length)):\n end = min(start + clip_length, duration)\n clip_data = extract_video_segment(video_path, start, end)\n yield Clip(clip_start=start, clip_end=end, clip_bytes=clip_data)\n";
18+
19+
export const PyScalarUdtfList = "@scalar_udtf\ndef extract_clips(video_path: str, duration: float) -> list[Clip]:\n clips = []\n for start in range(0, int(duration), 10):\n end = min(start + 10, duration)\n clips.append(Clip(clip_start=start, clip_end=end, clip_bytes=b\"...\"))\n return clips\n";
20+
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
{/* Auto-generated by scripts/mdx_snippets_gen.py. Do not edit manually. */}
2+
3+
export const PyRegistrationScalarUdtf = "db = geneva.connect(\"/data/mydb\")\ndb.create_scalar_udtf_view(\"my_view\", source=my_source, scalar_udtf=my_scalar_udtf)\n";
4+
5+
export const PyRegistrationUdf = "mock_table.add_columns({\"col\": my_udf})\n";
6+
7+
export const PyRegistrationUdtf = "db = geneva.connect(\"/data/mydb\")\ndb.create_udtf_view(\"my_view\", source=my_source, udtf=my_udtf)\n";
8+

docs/snippets/tables.mdx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
{/* Auto-generated by scripts/mdx_snippets_gen.py. Do not edit manually. */}
22

3+
export const PyWriteWithConcurrency = "import pyarrow as pa\nfrom lancedb.scannable import Scannable\nfrom random import random\n\nVECTOR_DIM = 4\n\nschema = pa.schema(\n [\n pa.field(\"id\", pa.int64()),\n pa.field(\"vector\", pa.list_(pa.float32(), VECTOR_DIM)),\n ]\n)\n\ndef make_batch(batch_idx: int, rows_per_batch: int) -> pa.RecordBatch:\n start = batch_idx * rows_per_batch\n stop = start + rows_per_batch\n row_ids = list(range(start, stop))\n vectors = pa.array(\n [[random() for _ in range(VECTOR_DIM)] for _ in row_ids],\n type=pa.list_(pa.float32(), VECTOR_DIM),\n )\n return pa.RecordBatch.from_arrays(\n [\n pa.array(row_ids, type=pa.int64()),\n vectors,\n ],\n schema=schema,\n )\n\ndef make_batch_reader(\n num_batches: int, rows_per_batch: int\n) -> pa.RecordBatchReader:\n return pa.RecordBatchReader.from_batches(\n schema,\n (make_batch(batch_idx, rows_per_batch) for batch_idx in range(num_batches)),\n )\n\ndef make_large_scannable(num_batches: int, rows_per_batch: int) -> Scannable:\n total_rows = num_batches * rows_per_batch\n return Scannable(\n schema=schema,\n num_rows=total_rows,\n reader=lambda: make_batch_reader(num_batches, rows_per_batch),\n )\n\ndb = tmp_db\ntable = db.create_table(\n \"bulk_ingest_concurrent\",\n make_large_scannable(num_batches=1000, rows_per_batch=10000),\n mode=\"overwrite\",\n)\n";
4+
35
export const PyAddColumnsCalculated = "# Add a discounted price column (10% discount)\ntable.add_columns({\"discounted_price\": \"cast((price * 0.9) as float)\"})\n";
46

57
export const PyAddColumnsDefaultValues = "# Add a stock status column with default value\ntable.add_columns({\"in_stock\": \"cast(true as boolean)\"})\n";
@@ -116,8 +118,6 @@ export const PyVersioningRollback = "table.restore(version_after_mod)\nversions
116118

117119
export const PyVersioningUpdateData = "table.update(where=\"author='Richard'\", values={\"author\": \"Richard Daniel Sanchez\"})\nrows_after_update = table.count_rows(\"author = 'Richard Daniel Sanchez'\")\nprint(f\"Rows updated to Richard Daniel Sanchez: {rows_after_update}\")\n";
118120

119-
export const PyWriteWithConcurrency = "import pyarrow as pa\nfrom lancedb.scannable import Scannable\nfrom random import random\n\nVECTOR_DIM = 4\n\nschema = pa.schema(\n [\n pa.field(\"id\", pa.int64()),\n pa.field(\"vector\", pa.list_(pa.float32(), VECTOR_DIM)),\n ]\n)\n\ndef make_batch(batch_idx: int, rows_per_batch: int) -> pa.RecordBatch:\n start = batch_idx * rows_per_batch\n stop = start + rows_per_batch\n row_ids = list(range(start, stop))\n vectors = pa.array(\n [[random() for _ in range(VECTOR_DIM)] for _ in row_ids],\n type=pa.list_(pa.float32(), VECTOR_DIM),\n )\n return pa.RecordBatch.from_arrays(\n [\n pa.array(row_ids, type=pa.int64()),\n vectors,\n ],\n schema=schema,\n )\n\ndef make_batch_reader(\n num_batches: int, rows_per_batch: int\n) -> pa.RecordBatchReader:\n return pa.RecordBatchReader.from_batches(\n schema,\n (make_batch(batch_idx, rows_per_batch) for batch_idx in range(num_batches)),\n )\n\ndef make_large_scannable(num_batches: int, rows_per_batch: int) -> Scannable:\n total_rows = num_batches * rows_per_batch\n return Scannable(\n schema=schema,\n num_rows=total_rows,\n reader=lambda: make_batch_reader(num_batches, rows_per_batch),\n )\n\ndb = tmp_db\ntable = db.create_table(\n \"bulk_ingest_concurrent\",\n make_large_scannable(num_batches=1000, rows_per_batch=10000),\n mode=\"overwrite\",\n)\n";
120-
121121
export const TsAddColumnsCalculated = "// Add a discounted price column (10% discount)\nawait schemaAddTable.addColumns([\n {\n name: \"discounted_price\",\n valueSql: \"cast((price * 0.9) as float)\",\n },\n]);\n";
122122

123123
export const TsAddColumnsDefaultValues = "// Add a stock status column with default value\nawait schemaAddTable.addColumns([\n {\n name: \"in_stock\",\n valueSql: \"cast(true as boolean)\",\n },\n]);\n";

pyproject.toml

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,6 @@ dependencies = [
1212
"polars>=1.39.2",
1313
"pytest>=9.0.1",
1414
"pytest-asyncio>=1.3.0",
15-
<<<<<<< Updated upstream
16-
"Pillow>=11.0.0",
17-
"geneva>=0.11.0",
18-
=======
1915
"Pillow>=12.1.1",
20-
>>>>>>> Stashed changes
16+
"geneva>=0.12.0",
2117
]

tests/py/test_geneva_defaults.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@
88
import pytest
99

1010

11-
# TODO: remove skip once geneva 0.12.0 is on PyPI (head node defaults changed to 4 CPU / 8Gi)
12-
@pytest.mark.skip(reason="requires geneva>=0.12.0")
1311
def test_head_node_defaults():
1412
# If these change, update the Head Node table in performance.mdx
1513
from geneva.cluster.builder import KubeRayClusterBuilder

tests/py/test_geneva_dependency_verification.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,9 @@ def test_env_vars_via_cluster(monkeypatch):
4040

4141

4242
def test_pip_manifest(monkeypatch):
43-
from unittest.mock import MagicMock
44-
mock_conn = MagicMock()
43+
import geneva
44+
from unittest.mock import MagicMock, create_autospec
45+
mock_conn = create_autospec(geneva.db.Connection, instance=True)
4546
monkeypatch.setattr("geneva.connect", MagicMock(return_value=mock_conn))
4647

4748
# --8<-- [start:pip_manifest]

0 commit comments

Comments
 (0)