|
1 | 1 | // SPDX-License-Identifier: Apache-2.0 |
2 | 2 | // SPDX-FileCopyrightText: Copyright the Vortex contributors |
3 | | - |
4 | | -#include "duckdb_vx.h" |
5 | 3 | #include "duckdb_vx/data.hpp" |
6 | 4 | #include "duckdb_vx/error.hpp" |
7 | | - |
| 5 | +#include "duckdb_vx/table_function.h" |
| 6 | +#include "vortex.h" |
8 | 7 | #include "duckdb/function/copy_function.hpp" |
9 | 8 | #include "duckdb/main/capi/capi_internal.hpp" |
10 | 9 | #include "duckdb/main/client_context.hpp" |
11 | 10 | #include "duckdb/main/connection.hpp" |
12 | 11 | #include "duckdb/parser/parsed_data/create_copy_function_info.hpp" |
13 | 12 |
|
14 | 13 | using namespace duckdb; |
| 14 | +using vortex::CData; |
| 15 | +using vortex::IntoErrString; |
15 | 16 |
|
16 | | -namespace vortex { |
17 | | - |
18 | | -struct CCopyBindData final : TableFunctionData { |
19 | | - CCopyBindData(const duckdb_vx_copy_func_vtab_t vtab_p, unique_ptr<CData> ffi_data_p) |
20 | | - : vtab(vtab_p), ffi_data(std::move(ffi_data_p)) { |
| 17 | +struct CopyBindData final : TableFunctionData { |
| 18 | + CopyBindData(unique_ptr<CData> ffi_data) : ffi_data(std::move(ffi_data)) { |
21 | 19 | } |
22 | | - |
23 | | - const duckdb_vx_copy_func_vtab_t vtab; |
24 | 20 | unique_ptr<CData> ffi_data; |
25 | 21 | }; |
26 | 22 |
|
27 | | -struct CCopyGlobalData final : GlobalFunctionData { |
28 | | - explicit CCopyGlobalData(unique_ptr<CData> ffi_data_p) : ffi_data(std::move(ffi_data_p)) { |
| 23 | +struct CopyGlobalData final : GlobalFunctionData { |
| 24 | + CopyGlobalData(unique_ptr<CData> ffi_data) : ffi_data(std::move(ffi_data)) { |
29 | 25 | } |
30 | 26 |
|
31 | 27 | unique_ptr<CData> ffi_data; |
32 | 28 | }; |
33 | 29 |
|
34 | | -struct CCopyLocalData final : LocalFunctionData { |
35 | | - explicit CCopyLocalData(unique_ptr<CData> ffi_data_p) : ffi_data(std::move(ffi_data_p)) { |
| 30 | +unique_ptr<FunctionData> copy_to_bind(ClientContext &, |
| 31 | + CopyFunctionBindInput &, |
| 32 | + const vector<string> &column_names, |
| 33 | + const vector<LogicalType> &column_types) { |
| 34 | + vector<const char *> ffi_column_names(column_names.size()); |
| 35 | + for (size_t i = 0; i < column_names.size(); ++i) { |
| 36 | + ffi_column_names[i] = column_names[i].c_str(); |
36 | 37 | } |
37 | 38 |
|
38 | | - unique_ptr<CData> ffi_data; |
39 | | -}; |
40 | | - |
41 | | -static duckdb_vx_copy_func_vtab_t copy_vtab_one; |
42 | | - |
43 | | -unique_ptr<FunctionData> c_bind_one(ClientContext & /*context*/, |
44 | | - CopyFunctionBindInput &info, |
45 | | - const vector<string> &column_names, |
46 | | - const vector<LogicalType> &column_types) { |
47 | | - |
48 | | - auto c_column_names = vector<char *>(); |
49 | | - c_column_names.reserve(column_names.size()); |
50 | | - for (const auto &col_id : column_names) { |
51 | | - c_column_names.push_back(const_cast<char *>(col_id.c_str())); |
52 | | - } |
53 | | - |
54 | | - auto c_column_types = vector<duckdb_logical_type>(); |
55 | | - c_column_types.reserve(c_column_types.size()); |
56 | | - for (auto &col_type : column_types) { |
57 | | - c_column_types.push_back(reinterpret_cast<duckdb_logical_type>(const_cast<LogicalType *>(&col_type))); |
| 39 | + vector<duckdb_logical_type> ffi_column_types(column_types.size()); |
| 40 | + for (size_t i = 0; i < column_types.size(); ++i) { |
| 41 | + // duckdb C api doesn't allow passing const LogicalTypes. We never |
| 42 | + // modify input in copy function. |
| 43 | + ffi_column_types[i] = |
| 44 | + reinterpret_cast<duckdb_logical_type>(const_cast<LogicalType *>(&column_types[i])); |
58 | 45 | } |
59 | 46 |
|
60 | 47 | duckdb_vx_error error_out = nullptr; |
61 | | - // TODO(myrrc): do we pass ownership of c_column_names in bind? |
62 | | - // If yes, it's a UB as we'd double delete on function return |
63 | | - auto ffi_bind_data = copy_vtab_one.bind(reinterpret_cast<duckdb_vx_copy_func_bind_input>(&info), |
64 | | - c_column_names.data(), |
65 | | - c_column_names.size(), |
66 | | - c_column_types.data(), |
67 | | - c_column_types.size(), |
68 | | - &error_out); |
| 48 | + const duckdb_vx_data ffi_bind_data = duckdb_copy_function_copy_to_bind(ffi_column_names.data(), |
| 49 | + ffi_column_names.size(), |
| 50 | + ffi_column_types.data(), |
| 51 | + ffi_column_types.size(), |
| 52 | + &error_out); |
69 | 53 | if (error_out) { |
70 | 54 | throw BinderException(IntoErrString(error_out)); |
71 | 55 | } |
72 | | - |
73 | | - return make_uniq<CCopyBindData>( |
74 | | - // This should only be filled out once |
75 | | - copy_vtab_one, |
76 | | - unique_ptr<CData>(reinterpret_cast<CData *>(ffi_bind_data))); |
| 56 | + auto cdata = unique_ptr<CData>(reinterpret_cast<CData *>(ffi_bind_data)); |
| 57 | + return make_uniq<CopyBindData>(std::move(cdata)); |
77 | 58 | } |
78 | 59 |
|
79 | 60 | unique_ptr<GlobalFunctionData> |
80 | | -c_init_global(ClientContext &context, FunctionData &bind_data, const string &file_path) { |
81 | | - auto &bind = bind_data.Cast<CCopyBindData>(); |
82 | | - duckdb_vx_error error_out = nullptr; |
83 | | - auto global_data = bind.vtab.init_global(reinterpret_cast<duckdb_client_context>(&context), |
84 | | - bind.ffi_data->DataPtr(), |
85 | | - file_path.c_str(), |
86 | | - &error_out); |
87 | | - if (error_out) { |
88 | | - throw ExecutorException(IntoErrString(error_out)); |
89 | | - } |
| 61 | +copy_to_initialize_global(ClientContext &context, FunctionData &bind_data, const string &file_path) { |
| 62 | + void *const ffi_bind = bind_data.Cast<CopyBindData>().ffi_data->DataPtr(); |
| 63 | + const auto ffi_ctx = reinterpret_cast<duckdb_client_context>(&context); |
90 | 64 |
|
91 | | - return make_uniq<CCopyGlobalData>(unique_ptr<CData>(reinterpret_cast<CData *>(global_data))); |
92 | | -} |
93 | | - |
94 | | -unique_ptr<LocalFunctionData> c_init_local(ExecutionContext & /*context*/, FunctionData &bind_data) { |
95 | | - auto &bind = bind_data.Cast<CCopyBindData>(); |
96 | 65 | duckdb_vx_error error_out = nullptr; |
97 | | - auto data = bind.vtab.init_local(bind.ffi_data->DataPtr(), &error_out); |
| 66 | + const duckdb_vx_data ffi_global = |
| 67 | + duckdb_copy_function_copy_to_initialize_global(ffi_ctx, ffi_bind, file_path.c_str(), &error_out); |
98 | 68 | if (error_out) { |
99 | 69 | throw ExecutorException(IntoErrString(error_out)); |
100 | 70 | } |
101 | 71 |
|
102 | | - return make_uniq<CCopyLocalData>(unique_ptr<CData>(reinterpret_cast<CData *>(data))); |
| 72 | + auto cdata = unique_ptr<CData>(reinterpret_cast<CData *>(ffi_global)); |
| 73 | + return make_uniq<CopyGlobalData>(std::move(cdata)); |
103 | 74 | } |
104 | 75 |
|
105 | | -void c_copy_to_sink(ExecutionContext & /*context*/, |
106 | | - FunctionData &bind_data, |
107 | | - GlobalFunctionData &gstate, |
108 | | - LocalFunctionData &lstate, |
109 | | - DataChunk &input) { |
110 | | - auto &bind = bind_data.Cast<CCopyBindData>(); |
111 | | - auto &global = gstate.Cast<CCopyGlobalData>(); |
112 | | - auto &local = lstate.Cast<CCopyLocalData>(); |
| 76 | +void copy_to_sink(ExecutionContext &, |
| 77 | + FunctionData &bind_data, |
| 78 | + GlobalFunctionData &gstate, |
| 79 | + LocalFunctionData &, |
| 80 | + DataChunk &input) { |
| 81 | + void *const ffi_bind = bind_data.Cast<CopyBindData>().ffi_data->DataPtr(); |
| 82 | + void *const ffi_global = gstate.Cast<CopyGlobalData>().ffi_data->DataPtr(); |
| 83 | + auto ffi_chunk = reinterpret_cast<duckdb_data_chunk>(&input); |
113 | 84 | duckdb_vx_error error_out = nullptr; |
114 | | - bind.vtab.copy_to_sink(bind.ffi_data->DataPtr(), |
115 | | - global.ffi_data->DataPtr(), |
116 | | - local.ffi_data->DataPtr(), |
117 | | - reinterpret_cast<duckdb_data_chunk>(&input), |
118 | | - &error_out); |
| 85 | + duckdb_copy_function_copy_to_sink(ffi_bind, ffi_global, ffi_chunk, &error_out); |
119 | 86 | if (error_out) { |
120 | 87 | throw ExecutorException(IntoErrString(error_out)); |
121 | 88 | } |
122 | 89 | } |
123 | 90 |
|
124 | | -void copy_to_finalize(ClientContext & /*context*/, FunctionData &bind_data, GlobalFunctionData &gstate) { |
125 | | - auto &bind = bind_data.Cast<CCopyBindData>(); |
126 | | - auto &global = gstate.Cast<CCopyGlobalData>(); |
| 91 | +void copy_to_finalize(ClientContext &, FunctionData &, GlobalFunctionData &gstate) { |
| 92 | + void *const ffi_global = gstate.Cast<CopyGlobalData>().ffi_data->DataPtr(); |
127 | 93 | duckdb_vx_error error_out = nullptr; |
128 | | - bind.vtab.copy_to_finalize(bind.ffi_data->DataPtr(), global.ffi_data->DataPtr(), &error_out); |
| 94 | + duckdb_copy_function_copy_to_finalize(ffi_global, &error_out); |
129 | 95 | if (error_out) { |
130 | 96 | throw ExecutorException(IntoErrString(error_out)); |
131 | 97 | } |
132 | 98 | } |
133 | 99 |
|
134 | | -extern "C" duckdb_vx_copy_func_vtab_t *get_vtab_one() { |
135 | | - return ©_vtab_one; |
136 | | -} |
| 100 | +extern "C" duckdb_state duckdb_vx_register_copy_function(duckdb_database ffi_db) { |
| 101 | + D_ASSERT(ffi_db); |
| 102 | + const DatabaseWrapper &wrapper = *reinterpret_cast<DatabaseWrapper *>(ffi_db); |
| 103 | + DatabaseInstance &db = *wrapper.database->instance; |
137 | 104 |
|
138 | | -extern "C" duckdb_state duckdb_vx_copy_func_register_vtab_one(duckdb_database ffi_db) { |
139 | | - if (!ffi_db) { |
140 | | - return DuckDBError; |
141 | | - } |
142 | | - |
143 | | - auto wrapper = reinterpret_cast<duckdb::DatabaseWrapper *>(ffi_db); |
144 | | - auto db = wrapper->database->instance; |
145 | | - auto copy_function = CopyFunction(copy_vtab_one.name); |
146 | | - |
147 | | - copy_function.copy_to_bind = c_bind_one; |
148 | | - copy_function.copy_to_initialize_global = c_init_global; |
149 | | - copy_function.copy_to_initialize_local = c_init_local; |
150 | | - |
151 | | - copy_function.copy_to_sink = c_copy_to_sink; |
152 | | - copy_function.copy_to_finalize = copy_to_finalize; |
153 | | - copy_function.extension = copy_vtab_one.extension; |
| 105 | + CopyFunction fn("vortex"); |
| 106 | + fn.copy_to_bind = copy_to_bind; |
| 107 | + fn.copy_to_initialize_global = copy_to_initialize_global; |
| 108 | + fn.copy_to_initialize_local = [](auto &, auto &) { |
| 109 | + return make_uniq<LocalFunctionData>(); |
| 110 | + }; |
| 111 | + fn.copy_to_sink = copy_to_sink; |
| 112 | + fn.copy_to_finalize = copy_to_finalize; |
| 113 | + fn.extension = "vortex"; |
154 | 114 |
|
155 | 115 | // TODO(joe): expose this via c our api |
156 | | - copy_function.execution_mode = [](bool /*preserve_insertion_order*/, bool /*supports_batch_index*/) { |
| 116 | + fn.execution_mode = [](bool, bool) { |
157 | 117 | return CopyFunctionExecutionMode::REGULAR_COPY_TO_FILE; |
158 | 118 | }; |
159 | 119 | // TODO(joe): handle parameters as in table_function |
160 | 120 |
|
161 | 121 | try { |
162 | | - auto &system_catalog = Catalog::GetSystemCatalog(*db); |
163 | | - auto data = CatalogTransaction::GetSystemTransaction(*db); |
164 | | - CreateCopyFunctionInfo copy_info(std::move(copy_function)); |
| 122 | + Catalog &system_catalog = Catalog::GetSystemCatalog(db); |
| 123 | + CatalogTransaction data = CatalogTransaction::GetSystemTransaction(db); |
| 124 | + CreateCopyFunctionInfo copy_info(std::move(fn)); |
165 | 125 | system_catalog.CreateCopyFunction(data, copy_info); |
166 | | - } catch (...) { |
| 126 | + } catch (const std::exception &e) { |
| 127 | + ErrorData data(e); |
| 128 | + DUCKDB_LOG_ERROR(db, "Failed to create Vortex copy function:\t" + data.Message()); |
167 | 129 | return DuckDBError; |
168 | 130 | } |
169 | 131 | return DuckDBSuccess; |
170 | 132 | } |
171 | | - |
172 | | -} // namespace vortex |
0 commit comments