-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathduck_flock.cpp
More file actions
130 lines (108 loc) · 4.24 KB
/
duck_flock.cpp
File metadata and controls
130 lines (108 loc) · 4.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#ifndef DUCK_FLOCK_H
#define DUCK_FLOCK_H
#include "chsql_extension.hpp"
namespace duckdb {
struct DuckFlockData : FunctionData {
vector<unique_ptr<Connection>> conn;
vector<unique_ptr<QueryResult>> results;
unique_ptr<FunctionData> Copy() const override {
throw std::runtime_error("not implemented");
}
bool Equals(const FunctionData &other) const override {
throw std::runtime_error("not implemented");
};
};
unique_ptr<FunctionData> DuckFlockBind(ClientContext &context, TableFunctionBindInput &input,
vector<LogicalType> &return_types, vector<string> &names) {
auto data = make_uniq<DuckFlockData>();
// Set default schema in case all results fail
return_types = {LogicalType::VARCHAR};
names = {"result"};
// Check for NULL input parameters
if (input.inputs.empty() || input.inputs.size() < 2 ||
input.inputs[0].IsNull() || input.inputs[1].IsNull()) {
return data; // Return with default schema
}
auto strQuery = input.inputs[0].GetValue<string>();
if (strQuery.empty()) {
return data; // Return with default schema
}
vector<string> flock;
auto &raw_flock = ListValue::GetChildren(input.inputs[1]);
if (raw_flock.empty()) {
return data; // Return with default schema
}
// Process each connection
for (auto &duck : raw_flock) {
if (duck.IsNull() || duck.ToString().empty()) {
continue;
}
try {
auto conn = make_uniq<Connection>(*context.db);
if (!conn) {
continue;
}
auto settingResult = conn->Query("SET autoload_known_extensions=1;SET autoinstall_known_extensions=1;");
if (settingResult->HasError()) {
continue;
}
auto req = conn->Prepare("SELECT * FROM read_json($2 || '/?default_format=JSONEachRow&query=' || url_encode($1::VARCHAR))");
if (req->HasError()) {
continue;
}
auto queryResult = req->Execute(strQuery.c_str(), duck.ToString());
if (!queryResult || queryResult->HasError()) {
continue;
}
data->conn.push_back(std::move(conn));
data->results.push_back(std::move(queryResult));
} catch (...) {
continue;
}
}
// If we have valid results, use their schema instead of default
if (!data->results.empty() && !data->results[0]->HasError()) {
return_types.clear();
copy(data->results[0]->types.begin(), data->results[0]->types.end(), back_inserter(return_types));
names.clear();
copy(data->results[0]->names.begin(), data->results[0]->names.end(), back_inserter(names));
}
return std::move(data);
}
void DuckFlockImplementation(ClientContext &context, TableFunctionInput &data_p,
DataChunk &output) {
auto &data = data_p.bind_data->Cast<DuckFlockData>();
if (data.results.empty()) {
return;
}
for (const auto &res : data.results) {
if (!res) {
continue;
}
ErrorData error_data;
unique_ptr<DataChunk> data_chunk = make_uniq<DataChunk>();
try {
if (res->TryFetch(data_chunk, error_data)) {
if (data_chunk && data_chunk->size() != 0) {
output.Append(*data_chunk);
return;
}
}
} catch (...) {
continue;
}
}
}
TableFunction DuckFlockTableFunction() {
TableFunction f(
"url_flock",
{LogicalType::VARCHAR, LogicalType::LIST(LogicalType::VARCHAR)},
DuckFlockImplementation,
DuckFlockBind,
nullptr,
nullptr
);
return f;
}
}
#endif