Skip to content

Commit 38600b9

Browse files
committed
Limit streaming scans to the binary driver
The PR originally carried coroutine-based HTTP streaming and the\nvendored minicoro dependency even after the binary path moved to\nthe new clickhouse-cpp pull API. That made the diff much larger\nthan the final design and left the PR description out of date.\n\nDrop the unused HTTP streaming implementation, remove minicoro\nfrom the build and submodules, and restore the buffered HTTP\npath. Keep only the binary scan changes: the FDW streaming hook,\nthe block-backed binary row reader, and the fetch_size plumbing.\nAlso restore the buffered binary cancel callback and remove the\nnew C-linkage warning from binary_internal.hh.
1 parent 5889363 commit 38600b9

13 files changed

Lines changed: 157 additions & 684 deletions

File tree

.gitmodules

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,3 @@
11
[submodule "clickhouse-cpp"]
22
path = vendor/clickhouse-cpp
33
url = git@github.com:iskakaushik/clickhouse-cpp.git
4-
[submodule "vendor/minicoro"]
5-
path = vendor/minicoro
6-
url = https://github.com/edubart/minicoro.git

Makefile

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,8 @@ ifndef CH_BUILD
3333
endif
3434

3535
# clickhouse-cpp source and build directories.
36-
VENDOR_DIR = vendor
37-
CH_CPP_DIR = $(VENDOR_DIR)/clickhouse-cpp
38-
CH_CPP_BUILD_DIR = $(VENDOR_DIR)/_build/$(OS)-$(ARCH)-$(CH_BUILD)-$(shell git submodule status $(CH_CPP_DIR) | awk '{print substr($$1, 0, 7)}')
36+
CH_CPP_DIR = vendor/clickhouse-cpp
37+
CH_CPP_BUILD_DIR = vendor/_build/$(OS)-$(ARCH)-$(CH_BUILD)-$(shell git submodule status $(CH_CPP_DIR) | awk '{print substr($$1, 0, 7)}')
3938

4039
# List the clickhouse-cpp libraries we require.
4140
CH_CPP_LIB = $(CH_CPP_BUILD_DIR)/clickhouse/libclickhouse-cpp-lib$(DLSUFFIX)
@@ -57,7 +56,7 @@ else
5756
endif
5857

5958
# Add include directories.
60-
PG_CPPFLAGS = -I./src/include -I$(CH_CPP_DIR) -I$(CH_CPP_DIR)/contrib/absl -I$(VENDOR_DIR)/minicoro
59+
PG_CPPFLAGS = -I./src/include -I$(CH_CPP_DIR) -I$(CH_CPP_DIR)/contrib/absl
6160

6261
# Include other libraries compiled into clickhouse-cpp.
6362
PG_LDFLAGS = -lstdc++ -lssl -lcrypto $(shell $(CURL_CONFIG) --libs)
@@ -76,7 +75,7 @@ endif
7675
# Clean up the clickhouse-cpp build directory and generated files.
7776
EXTRA_CLEAN = sql/$(EXTENSION)--$(EXTVERSION).sql src/fdw.c compile_commands.json test/schedule $(EXTENSION)-$(DISTVERSION).zip
7877
ifndef NO_VENDOR_CLEAN
79-
EXTRA_CLEAN += $(VENDOR_DIR)
78+
EXTRA_CLEAN += $(CH_CPP_BUILD_DIR)
8079
endif
8180

8281
# Import PGXS.
@@ -102,12 +101,8 @@ $(shlib): $(CH_CPP_LIB) $(OBJS)
102101
$(CH_CPP_DIR)/CMakeLists.txt:
103102
git submodule update --init
104103

105-
# Clone minicoro submodule.
106-
$(VENDOR_DIR)/minicoro/minicoro.h:
107-
git submodule update --init
108-
109-
# Require the vendored libraries.
110-
$(OBJS): $(CH_CPP_DIR)/CMakeLists.txt $(VENDOR_DIR)/minicoro/minicoro.h
104+
# Require the vendored clickhouse-cpp.
105+
$(OBJS): $(CH_CPP_DIR)/CMakeLists.txt
111106

112107
# Build clickhouse-cpp.
113108
$(CH_CPP_LIB): export CXXFLAGS=-fPIC
@@ -117,7 +112,7 @@ $(CH_CPP_LIB): $(CH_CPP_DIR)/CMakeLists.txt # Sync with "Reset Vendor Timestamp"
117112
cmake --build $(CH_CPP_BUILD_DIR) --parallel $$(nproc) --target all
118113

119114
# Require the versioned C source and SQL script.
120-
all: sql/$(EXTENSION)--$(EXTVERSION).sql src/fdw.c $(VENDOR_DIR)/minicoro/minicoro.h
115+
all: sql/$(EXTENSION)--$(EXTVERSION).sql src/fdw.c
121116

122117
# Versioned SQL script.
123118
sql/$(EXTENSION)--$(EXTVERSION).sql: sql/$(EXTENSION).sql

src/binary.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,12 @@ extern "C"
233233
client->Select(clickhouse::Query(query->sql)
234234
.SetQuerySettings(ch_binary_settings(query))
235235
.SetParams(ch_binary_params(query))
236+
.OnProgress(
237+
[&check_cancel](const Progress &)
238+
{
239+
if (check_cancel && check_cancel())
240+
throw std::runtime_error("query was canceled");
241+
})
236242
.OnDataCancelable(
237243
[&resp, &values, &check_cancel](const Block &block)
238244
{

src/binary_streaming.cpp

Lines changed: 14 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -35,50 +35,26 @@ struct ch_binary_streaming_state
3535
std::unique_ptr<bool[]> nulls;
3636
size_t columns_count = 0;
3737

38-
const char *error = nullptr;
39-
char *owned_error = nullptr;
38+
std::optional<std::string> error;
4039
bool (*check_cancel) (void) = nullptr;
4140

4241
Client *client = nullptr;
4342
std::string sql;
4443
QuerySettings settings;
4544
QueryParams params;
4645

47-
~ch_binary_streaming_state()
48-
{
49-
free(owned_error);
50-
}
51-
5246
void
53-
SetBorrowedError(const char *message)
47+
SetError(const char *message)
5448
{
5549
if (error)
5650
return;
57-
error = message;
51+
error.emplace(message ? message : kBinaryStreamingOom);
5852
}
5953

60-
void
61-
SetOwnedError(const char *message)
54+
const char *
55+
GetError() const
6256
{
63-
char *copy;
64-
65-
if (error)
66-
return;
67-
if (!message)
68-
{
69-
SetBorrowedError(kBinaryStreamingOom);
70-
return;
71-
}
72-
73-
copy = strdup(message);
74-
if (!copy)
75-
{
76-
SetBorrowedError(kBinaryStreamingOom);
77-
return;
78-
}
79-
80-
owned_error = copy;
81-
error = owned_error;
57+
return error ? error->c_str() : nullptr;
8258
}
8359
};
8460

@@ -93,7 +69,7 @@ binary_streaming_fill_block(ch_binary_streaming_state * st)
9369
{
9470
if (st->check_cancel && st->check_cancel())
9571
{
96-
st->SetBorrowedError(kBinaryStreamingCanceled);
72+
st->SetError(kBinaryStreamingCanceled);
9773
st->done = true;
9874
return false;
9975
}
@@ -102,7 +78,7 @@ binary_streaming_fill_block(ch_binary_streaming_state * st)
10278
}
10379
catch (const std::exception & e)
10480
{
105-
st->SetOwnedError(e.what());
81+
st->SetError(e.what());
10682
st->done = true;
10783
return false;
10884
}
@@ -115,7 +91,7 @@ binary_streaming_fill_block(ch_binary_streaming_state * st)
11591
}
11692
catch (const std::exception & e)
11793
{
118-
st->SetOwnedError(e.what());
94+
st->SetError(e.what());
11995
}
12096
st->current_block.reset();
12197
st->have_block = false;
@@ -130,7 +106,7 @@ binary_streaming_fill_block(ch_binary_streaming_state * st)
130106
if (st->columns_count != 0 &&
131107
block->GetColumnCount() != st->columns_count)
132108
{
133-
st->SetBorrowedError("columns mismatch in blocks");
109+
st->SetError("columns mismatch in blocks");
134110
st->done = true;
135111
return false;
136112
}
@@ -171,7 +147,7 @@ extern "C"
171147
}
172148
catch (const std::exception & e)
173149
{
174-
st->SetOwnedError(e.what());
150+
st->SetError(e.what());
175151
st->done = true;
176152
return st;
177153
}
@@ -216,7 +192,7 @@ extern "C"
216192
st->nulls.reset(new (std::nothrow) bool[st->columns_count]);
217193
if (!st->coltypes || !st->values || !st->nulls)
218194
{
219-
st->SetBorrowedError(kBinaryStreamingOom);
195+
st->SetError(kBinaryStreamingOom);
220196
return false;
221197
}
222198
}
@@ -231,7 +207,7 @@ extern "C"
231207
}
232208
catch (const std::exception & e)
233209
{
234-
st->SetOwnedError(e.what());
210+
st->SetError(e.what());
235211
return false;
236212
}
237213

@@ -264,7 +240,7 @@ extern "C"
264240
const char *
265241
ch_binary_streaming_error(ch_binary_streaming_state * st)
266242
{
267-
return st ? st->error : NULL;
243+
return st ? st->GetError() : NULL;
268244
}
269245

270246
void

src/fdw.c.in

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -956,6 +956,9 @@ clickhouseIterateForeignScan(ForeignScanState * node)
956956
TupleDesc tupdesc;
957957
ch_query query = new_query(fsstate->query, fsstate->numParams, fsstate->param_values);
958958

959+
/* Allow query cancel (e.g. Ctrl+C) between tuple fetches. */
960+
CHECK_FOR_INTERRUPTS();
961+
959962
/* make query if needed */
960963
if (fsstate->ch_cursor == NULL)
961964
{

0 commit comments

Comments
 (0)