Python Table UDFs#99
Conversation
| python_import_cache.cpp | ||
| python_replacement_scan.cpp | ||
| python_udf.cpp | ||
| python_tvf.cpp |
There was a problem hiding this comment.
python_table_udf.cpp has my preference
There was a problem hiding this comment.
For types like PythonTVFType, would you prefer PythonTUDF, or PythonUDTF, or PythonTableUDF?
For what it's worth, SnowFlake and DataBrix have gone with UDTF: https://docs.snowflake.com/en/developer-guide/udf/sql/udf-sql-tabular-functions and https://docs.databricks.com/aws/en/udf/python-udtf
I don't like TUDF as an abbreviation, but UDTF or TableUDF both sound good to me.
There was a problem hiding this comment.
My preference is table udf
So a search for "udf" finds both versions
There was a problem hiding this comment.
Done: renamed files to "table_udf" and in code to TableUDF
| throw InvalidInputException("Invalid schema format: expected [name, type] pairs, got string '%s'", | ||
| py::str(item).cast<std::string>()); | ||
| } | ||
| if (!py::hasattr(item, "__getitem__") || py::len(item) < 2) { |
There was a problem hiding this comment.
This ignores cases where >2 items are given
But I don't get why we are taking schemas as [[name, type], [name, type]] instead of {name: type, name: type} ?
There was a problem hiding this comment.
At initial design, I wasn't sure if I'd need any other attributes other than Name and Type, so left it as a List of Tuples (or List of Lists).
But, looking back, a mapping makes more sense.
Will do.
There was a problem hiding this comment.
Done:
- Modified to schema={"x": sqltypes.BIGINT, "y": sqltypes.BIGINT, "name": sqltypes.VARCHAR}.
- Updated PR examples and test cases to match.
| switch (type) { | ||
| case PythonTVFType::TUPLES: | ||
| tf = | ||
| duckdb::TableFunction(name, {}, +PyTVFTuplesScanFunction, +PyTVFTuplesBindFunction, +PyTVFTuplesInitGlobal); |
There was a problem hiding this comment.
What is this syntax? +PyTVFTuplesScanFunction ?
I've seen &PyTVFTuplesScanFunction, which makes sense because you're taking the address of the function, but even that is redundant, you can use PyTVFTuplesScanFunction directly afaik
There was a problem hiding this comment.
This was a holdover from a fight with the linter, it's unnecessary / will remove.
There was a problem hiding this comment.
|
|
||
| connection_module.def("create_table_function", &DuckDBPyConnection::RegisterTableFunction, | ||
| "Register a table valued function via Callable", py::arg("name"), py::arg("callable"), | ||
| py::arg("parameters") = py::none(), py::arg("schema") = py::none(), |
There was a problem hiding this comment.
parameters should be a keyword-only argument
making the python signature equivalent to: create_table_function(name, callable, schema, type, *, parameters)
We can even infer the parameters of the function, we have similar logic for scalar udfs
I also feel like type can be a keyword-only argument, defaulting to TUPLES if omitted
There was a problem hiding this comment.
Done:
- kwargs for type & parameters
- default for type is TUPLES
- parameters is optional
I did not (yet?) do the parameter inference yet. I want to think about that a little bit. Added to TODO in PR Comment
connection_module.def("create_table_function", &DuckDBPyConnection::RegisterTableFunction,
"Register a table user defined function via Callable", py::arg("name"), py::arg("callable"),
py::arg("schema"), py::kw_only(), py::arg("type") = PythonTableUDFType::TUPLES,
py::arg("parameters") = py::none());| throw InvalidInputException("Invalid schema format: each schema item must be a [name, type] pair"); | ||
| } | ||
| names.emplace_back(py::str(item[py::int_(0)])); | ||
| types.emplace_back(TransformStringToLogicalType(py::str(item[py::int_(1)]))); |
There was a problem hiding this comment.
This can accept a DuckDBPyType instead, we can extract the logical type from that
see this for example:
case PythonObjectType::Value: {
// Extract the internal object and the type from the Value instance
auto object = ele.attr("object");
auto type = ele.attr("type");
shared_ptr<DuckDBPyType> internal_type;
if (!py::try_cast<shared_ptr<DuckDBPyType>>(type, internal_type)) {
string actual_type = py::str(type.get_type());
throw InvalidInputException("The 'type' of a Value should be of type DuckDBPyType, not '%s'",
actual_type);
}
return TransformPythonValue(object, internal_type->Type());
}There was a problem hiding this comment.
Done:
- schema is a mapping of str=>DuckDBPyType
| } | ||
| }; | ||
|
|
||
| struct PyTVFTuplesGlobalState : public GlobalTableFunctionState { |
There was a problem hiding this comment.
Keep in mind that this only supports single threaded execution, because the virtual function MaxThreads returns 1 by default.
You are using the global state directly so I think you're aware of this, but just double checking.
I think it's correct though, because enabling multi-threaded execution for a Python table UDF sounds like it wouldn't help much. As all time is spent in Python, so the GIL would make it essentially single threaded anyways.
There was a problem hiding this comment.
because the virtual function MaxThreads returns 1 by default.
Do you think we should make this explicit in table_udf, or is it fine to rely on the default?
enabling multi-threaded execution for a Python table UDF sounds like it wouldn't help much
Agree. Although, with free-threading, the GIL constraint goes away... but I think such complex cases (that need multi-threaded consumption of a Python callable) are best handled in Python-land.
| named_parameter_map_t kwargs; | ||
| vector<LogicalType> return_types; | ||
| vector<string> return_names; | ||
| PythonObjectContainer python_objects; // Holds the callable |
There was a problem hiding this comment.
I would like a test where we create a table function, create a view that uses the table function, then unregister the table function, and make sure to exit the scope where the python table function was created.
Then execute the view.
Just to make sure that this is enough to keep the python callable alive
(This message is misplaced, it's related to the callable of the PyTVFInfo)
There was a problem hiding this comment.
Done:
- tests/fast/table_udf/test_tuples.py::test_callable_lifetime_in_view
| static unique_ptr<PyTVFBindData> PyTVFBindInternal(ClientContext &context, TableFunctionBindInput &in, | ||
| vector<LogicalType> &return_types, vector<string> &return_names) { | ||
| // Disable progress bar to prevent GIL deadlock with Jupyter | ||
| // TODO: Decide if this is still needed - was a problem when fully materializing, but switched to streaming |
There was a problem hiding this comment.
Done:
- disabling progress bar was not needed.
This was a holdover from my first implementation, which materialized everything at init time and never released the GIL.
…use a dict[str,duckdb.sqltype] for schema, kwargs for create_table_function, clean up tests
|
Note: There's an unrelated issue with some CTE tests - the tests are also failing on main. |
|
I'm closing so I can take a step back and revisit this and re-evaluate the performance characteristics of this approach. While this approached worked, there's a few opportunities I found to improve this. In particular, using arrow_c_stream instead of requiring Arrow... which would unlock arrow/polars/pandas inputs... redoing how to handle schema, and rethinking the single-threaded implication. |
Per discussion #84, this PR implements Python Table Valued Functions. (aka: User Defined Table Functions)*.
Table Valued Functions* allow Python callables to be registered as DuckDB Table Functions, returning either (
Iterator[Sequence]) or an Arrow table.This is implemented primarily in
python_tvf.cpp. Tuple TVFs scan the py::iter from the Callable. Arrow TVFs delegate to ArrowTableFunction.* While the main code is ready for review, I'll need a pointer on how to properly add and regenerate the bindings. I added them manually in pyconnection.cpp. As I know some work was being done in the stubs and didn't want to conflict.
** Getting the GIL and reference counting part right took a bit of work, especially around destruction. I found PythonObjectContainer late (after trying other approaches), so please let me know if this is the right approach.
Edits
Changes in this PR
case_insensitive_map_t<unique_ptr<ExternalDependency>> registered_table_functions;to PyConnectionRegistration
This implementation adds two new functions:
Parameters
Parameters are declared as a list of parameter names, such as
parameters = ["col1", "col2"].parameters=Nonestill allows positional parameters to be called.myfunction(count:=10)Schema
The schema of a Table Function must be declared at bind time. This is done by capturing the return type at registration time. *
Schemas are defined as
List[Tuple[str,str]], where each tuple is a pair of Column Name and Data Type.* It's possible, but not implemented, to infer the schema for Arrow Tables rather than requiring it. Or, perhaps, to be more lenient.
Test Failures:
There's two test failures:
Tuples Example with positional args
Example with Parameters=None
Arrow Example
Discussion
Feature Name
What to call this feature?
Some databases refer to the function as Table Valued Functions, and others as User Defined Table Function. Either name works for me... I started with TVF but I think I'm not leaning towards UDTFs.
Deleting / Unregistering
There doesn't appear to be a way to truly "delete" or "unregister" table functions from a connection... so does it make sense to even have an unregister? I chose to require an explicit unregister prior to registering a different function with same name, but this is somewhat arbitrary.
Materializing vs Streaming the Iterator
I didn't notice any significant performance difference between streaming the iterator vs fully materializing it. The benefits outweighed the added complexity.
There is perhaps some room for checking the callable result to see if it has a Length to set the Cardinality initially which may help the optimizer.
Other Callable Types
This PR supports TUPLES and ARROW_TABLES. The TUPLE implementation supports streaming, but the ARROW_TABLES are fully materialized.
ARROW_BATCHED_READER would make sense as a next type. I am assuming this is not required in an initial implementation.
(future) Passing a cursor
For "table in" situations, I'd think we could add a "pass_connection" option to create_table_function. If enabled, a cursor to the current connection would be passed to the TVF as a kwarg.
* I left it out of this initial PR to keep it simple.
Something like:
TODO