Skip to content

Commit 85202ba

Browse files
Pierre-Luc Gagnéclaude
andcommitted
feat: add server-side cursor for streaming result sets
Add server_cursor<RowType> class that uses MySQL server-side cursors (CURSOR_TYPE_READ_ONLY) to fetch rows one-at-a-time instead of loading the entire result set into memory. Created via conn.open_cursor<Row>() with optional prefetch_rows hint. Includes integration tests. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent bc12844 commit 85202ba

4 files changed

Lines changed: 378 additions & 0 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ Versioning follows [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
1919
- `async_query(pool, stmt)` / `async_execute(pool, stmt)` — pool-based async query execution via `std::async`; returns `std::future` holding the result; multiple calls run concurrently on different connections from the pool
2020
- Composable FK action syntax — two new styles alongside the existing flat types: `fk_attr::on_delete(fk_attr::cascade)` (intermediate) and `fk_attr::on(fk_attr::delete_(fk_attr::cascade))` (fully composed, reads like SQL `ON DELETE CASCADE`); action tags: `cascade`, `restrict_`, `set_null`, `no_action`; all three styles produce identical attribute types
2121
- `set_case(col, case_when_builder)` — CASE/WHEN expressions in UPDATE SET clauses via `.set_case(col{}, case_when(cond, val).else_(val))` on `update_builder` and `update_set_builder`; chainable with `.set()`, `.where()`, `.order_by()`, `.limit()`
22+
- `server_cursor<RowType>` — streaming result set via MySQL server-side cursors; created via `conn.open_cursor<RowType>(sql, params...)` or with `prefetch_rows{n}` hint; `.fetch()` returns one row at a time without loading the full result set into memory
2223

2324
---
2425

lib/include/ds_mysql/mysql_connection.hpp

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include "ds_mysql/host_name.hpp"
2121
#include "ds_mysql/port_number.hpp"
2222
#include "ds_mysql/prepared_statement.hpp"
23+
#include "ds_mysql/server_cursor.hpp"
2324
#include "ds_mysql/sql_ddl.hpp"
2425
#include "ds_mysql/sql_dml.hpp"
2526
#include "ds_mysql/sql_dql.hpp"
@@ -415,6 +416,48 @@ class mysql_connection {
415416
return prepare(stmt.build_sql());
416417
}
417418

419+
// ---------------------------------------------------------------
420+
// Server-side cursors
421+
// ---------------------------------------------------------------
422+
423+
// Open a server-side cursor for streaming large result sets.
424+
// Fetches rows one at a time (or in prefetch batches) instead of
425+
// loading the entire result set into memory.
426+
//
427+
// auto cursor = conn.open_cursor<RowType>("SELECT * FROM t WHERE x = ?", 42u);
428+
// while (auto row = cursor->fetch()) { ... }
429+
//
430+
// With prefetch hint:
431+
// auto cursor = conn.open_cursor<RowType>(sql, prefetch_rows{100}, 42u);
432+
template <typename RowType, typename... Params>
433+
[[nodiscard]] std::expected<server_cursor<RowType>, std::string> open_cursor(std::string_view sql,
434+
Params const&... params) const {
435+
return open_cursor_impl<RowType>(sql, prefetch_rows{1}, params...);
436+
}
437+
438+
// Open a server-side cursor with a prefetch_rows hint.
439+
template <typename RowType, typename... Params>
440+
[[nodiscard]] std::expected<server_cursor<RowType>, std::string> open_cursor(std::string_view sql,
441+
prefetch_rows hint,
442+
Params const&... params) const {
443+
return open_cursor_impl<RowType>(sql, hint, params...);
444+
}
445+
446+
// Open a server-side cursor from a typed query builder.
447+
template <typename RowType, SqlBuilder Stmt, typename... Params>
448+
[[nodiscard]] std::expected<server_cursor<RowType>, std::string> open_cursor(Stmt const& stmt,
449+
Params const&... params) const {
450+
return open_cursor_impl<RowType>(stmt.build_sql(), prefetch_rows{1}, params...);
451+
}
452+
453+
// Open a server-side cursor from a typed query builder with prefetch hint.
454+
template <typename RowType, SqlBuilder Stmt, typename... Params>
455+
[[nodiscard]] std::expected<server_cursor<RowType>, std::string> open_cursor(Stmt const& stmt,
456+
prefetch_rows hint,
457+
Params const&... params) const {
458+
return open_cursor_impl<RowType>(stmt.build_sql(), hint, params...);
459+
}
460+
418461
// Validate that the C++ table struct T matches the live schema in the database.
419462
//
420463
// Runs DESCRIBE <table> and checks:
@@ -618,6 +661,60 @@ class mysql_connection {
618661
return std::string(mysql_error(connection_.get()));
619662
}
620663

664+
template <typename RowType, typename... Params>
665+
[[nodiscard]] std::expected<server_cursor<RowType>, std::string> open_cursor_impl(std::string_view sql,
666+
prefetch_rows hint,
667+
Params const&... params) const {
668+
using cursor_t = server_cursor<RowType>;
669+
using deleter_t = typename cursor_t::stmt_deleter;
670+
671+
auto stmt = std::unique_ptr<MYSQL_STMT, deleter_t>(mysql_stmt_init(connection_.get()));
672+
if (!stmt) {
673+
return std::unexpected(last_error());
674+
}
675+
676+
if (mysql_stmt_prepare(stmt.get(), sql.data(), static_cast<unsigned long>(sql.size())) != 0) {
677+
return std::unexpected(std::string(mysql_stmt_error(stmt.get())));
678+
}
679+
680+
// Enable server-side cursor.
681+
unsigned long const cursor_type = CURSOR_TYPE_READ_ONLY;
682+
if (mysql_stmt_attr_set(stmt.get(), STMT_ATTR_CURSOR_TYPE, &cursor_type)) {
683+
return std::unexpected(std::string(mysql_stmt_error(stmt.get())));
684+
}
685+
686+
// Set prefetch rows hint.
687+
unsigned long const prefetch = hint.value;
688+
if (mysql_stmt_attr_set(stmt.get(), STMT_ATTR_PREFETCH_ROWS, &prefetch)) {
689+
return std::unexpected(std::string(mysql_stmt_error(stmt.get())));
690+
}
691+
692+
// Bind input parameters.
693+
constexpr auto N = sizeof...(Params);
694+
if constexpr (N > 0) {
695+
std::array<MYSQL_BIND, N> binds{};
696+
std::array<bool, N> null_flags{};
697+
auto params_tuple = std::tie(params...);
698+
[&]<std::size_t... Is>(std::index_sequence<Is...>) {
699+
(stmt_detail::bind_input(binds[Is], std::get<Is>(params_tuple), null_flags[Is]), ...);
700+
}(std::make_index_sequence<N>{});
701+
702+
if (mysql_stmt_bind_param(stmt.get(), binds.data())) {
703+
return std::unexpected(std::string(mysql_stmt_error(stmt.get())));
704+
}
705+
}
706+
707+
if (mysql_stmt_execute(stmt.get()) != 0) {
708+
return std::unexpected(std::string(mysql_stmt_error(stmt.get())));
709+
}
710+
711+
auto cursor = cursor_t{std::move(stmt)};
712+
if (auto r = cursor.bind_results(); !r) {
713+
return std::unexpected(r.error());
714+
}
715+
return cursor;
716+
}
717+
621718
std::unique_ptr<MYSQL, decltype(&mysql_close)> connection_;
622719
};
623720

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
#pragma once
2+
3+
#include <mysql.h>
4+
5+
#include <cstring>
6+
#include <expected>
7+
#include <memory>
8+
#include <optional>
9+
#include <string>
10+
#include <tuple>
11+
#include <vector>
12+
13+
#include "ds_mysql/prepared_statement.hpp"
14+
15+
namespace ds_mysql {
16+
17+
// ===================================================================
18+
// server_cursor — streaming result set via MySQL server-side cursors.
19+
//
20+
// Unlike prepared_statement::query() which loads all rows into memory,
21+
// a server_cursor fetches rows one-at-a-time (or in prefetch batches)
22+
// from the server. Ideal for large result sets.
23+
//
24+
// Created via mysql_connection::open_cursor():
25+
// auto cursor = conn.open_cursor<RowType>("SELECT * FROM t WHERE x = ?", 42u);
26+
// while (auto row = cursor->fetch()) {
27+
// process(*row);
28+
// }
29+
//
30+
// Prefetch hint (number of rows to buffer at once):
31+
// auto cursor = conn.open_cursor<RowType>(sql, prefetch_rows{100}, 42u);
32+
// ===================================================================
33+
34+
struct prefetch_rows {
35+
unsigned long value = 1;
36+
};
37+
38+
template <typename RowType>
39+
class server_cursor {
40+
public:
41+
server_cursor(server_cursor const&) = delete;
42+
server_cursor& operator=(server_cursor const&) = delete;
43+
44+
server_cursor(server_cursor&& other) noexcept
45+
: stmt_(std::move(other.stmt_)),
46+
result_binds_(std::move(other.result_binds_)),
47+
lengths_(std::move(other.lengths_)),
48+
nulls_(std::move(other.nulls_)),
49+
errors_(std::move(other.errors_)),
50+
string_bufs_(std::move(other.string_bufs_)),
51+
done_(other.done_) {
52+
other.done_ = true;
53+
}
54+
55+
server_cursor& operator=(server_cursor&& other) noexcept {
56+
if (this != &other) {
57+
close();
58+
stmt_ = std::move(other.stmt_);
59+
result_binds_ = std::move(other.result_binds_);
60+
lengths_ = std::move(other.lengths_);
61+
nulls_ = std::move(other.nulls_);
62+
errors_ = std::move(other.errors_);
63+
string_bufs_ = std::move(other.string_bufs_);
64+
done_ = other.done_;
65+
other.done_ = true;
66+
}
67+
return *this;
68+
}
69+
70+
~server_cursor() {
71+
close();
72+
}
73+
74+
/// Fetch the next row. Returns std::nullopt when no more rows.
75+
[[nodiscard]] std::expected<std::optional<RowType>, std::string> fetch() {
76+
if (done_) {
77+
return std::optional<RowType>{std::nullopt};
78+
}
79+
80+
int const status = mysql_stmt_fetch(stmt_.get());
81+
if (status == MYSQL_NO_DATA) {
82+
done_ = true;
83+
return std::optional<RowType>{std::nullopt};
84+
}
85+
if (status != 0 && status != MYSQL_DATA_TRUNCATED) {
86+
return std::unexpected(std::string(mysql_stmt_error(stmt_.get())));
87+
}
88+
89+
return extract_row(std::make_index_sequence<std::tuple_size_v<RowType>>{}, status == MYSQL_DATA_TRUNCATED);
90+
}
91+
92+
/// True when all rows have been fetched.
93+
[[nodiscard]] bool done() const noexcept {
94+
return done_;
95+
}
96+
97+
private:
98+
friend class mysql_connection;
99+
100+
struct stmt_deleter {
101+
void operator()(MYSQL_STMT* s) const noexcept {
102+
if (s)
103+
mysql_stmt_close(s);
104+
}
105+
};
106+
107+
static constexpr auto N = std::tuple_size_v<RowType>;
108+
static constexpr std::size_t initial_string_buf_size = 256;
109+
110+
explicit server_cursor(std::unique_ptr<MYSQL_STMT, stmt_deleter> stmt)
111+
: stmt_(std::move(stmt)),
112+
result_binds_(N),
113+
lengths_(N),
114+
nulls_(std::make_unique<bool[]>(N)),
115+
errors_(std::make_unique<bool[]>(N)),
116+
string_bufs_(N) {
117+
std::fill_n(nulls_.get(), N, false);
118+
std::fill_n(errors_.get(), N, false);
119+
}
120+
121+
void close() {
122+
if (stmt_) {
123+
mysql_stmt_free_result(stmt_.get());
124+
}
125+
}
126+
127+
std::expected<void, std::string> bind_results() {
128+
[this]<std::size_t... Is>(std::index_sequence<Is...>) {
129+
(setup_bind<std::tuple_element_t<Is, RowType>>(Is), ...);
130+
}(std::make_index_sequence<N>{});
131+
132+
if (mysql_stmt_bind_result(stmt_.get(), result_binds_.data())) {
133+
return std::unexpected(std::string(mysql_stmt_error(stmt_.get())));
134+
}
135+
return {};
136+
}
137+
138+
template <typename T>
139+
void setup_bind(std::size_t idx) {
140+
auto& bind = result_binds_[idx];
141+
std::memset(&bind, 0, sizeof(MYSQL_BIND));
142+
bind.length = &lengths_[idx];
143+
bind.is_null = &nulls_.get()[idx];
144+
bind.error = &errors_.get()[idx];
145+
146+
using raw = stmt_detail::unwrap_param_type_t<T>;
147+
if constexpr (std::same_as<raw, std::string>) {
148+
string_bufs_[idx].resize(initial_string_buf_size);
149+
bind.buffer_type = MYSQL_TYPE_STRING;
150+
bind.buffer = string_bufs_[idx].data();
151+
bind.buffer_length = static_cast<unsigned long>(string_bufs_[idx].size());
152+
} else {
153+
bind.buffer_type = stmt_detail::mysql_type_traits<raw>::field_type;
154+
bind.is_unsigned = stmt_detail::mysql_type_traits<raw>::is_unsigned;
155+
string_bufs_[idx].resize(sizeof(raw));
156+
bind.buffer = string_bufs_[idx].data();
157+
bind.buffer_length = sizeof(raw);
158+
}
159+
}
160+
161+
template <std::size_t... Is>
162+
std::expected<std::optional<RowType>, std::string> extract_row(std::index_sequence<Is...>, bool truncated) {
163+
if (truncated) {
164+
(refetch_truncated<std::tuple_element_t<Is, RowType>>(Is), ...);
165+
}
166+
return std::optional<RowType>{
167+
RowType{extract_value<std::tuple_element_t<Is, RowType>>(Is)...}};
168+
}
169+
170+
template <typename T>
171+
void refetch_truncated(std::size_t idx) {
172+
using raw = stmt_detail::unwrap_param_type_t<T>;
173+
if constexpr (std::same_as<raw, std::string>) {
174+
if (lengths_[idx] > result_binds_[idx].buffer_length) {
175+
string_bufs_[idx].resize(lengths_[idx]);
176+
result_binds_[idx].buffer = string_bufs_[idx].data();
177+
result_binds_[idx].buffer_length = static_cast<unsigned long>(string_bufs_[idx].size());
178+
mysql_stmt_fetch_column(stmt_.get(), &result_binds_[idx], static_cast<unsigned int>(idx), 0);
179+
}
180+
}
181+
}
182+
183+
template <typename T>
184+
T extract_value(std::size_t idx) const {
185+
if constexpr (is_optional_v<T>) {
186+
if (nulls_.get()[idx])
187+
return std::nullopt;
188+
using inner = unwrap_optional_t<T>;
189+
return extract_value<inner>(idx);
190+
} else if constexpr (std::same_as<stmt_detail::unwrap_param_type_t<T>, std::string>) {
191+
return std::string(string_bufs_[idx].data(), lengths_[idx]);
192+
} else {
193+
using raw = stmt_detail::unwrap_param_type_t<T>;
194+
raw val{};
195+
std::memcpy(&val, string_bufs_[idx].data(), sizeof(raw));
196+
return static_cast<T>(val);
197+
}
198+
}
199+
200+
std::unique_ptr<MYSQL_STMT, stmt_deleter> stmt_;
201+
std::vector<MYSQL_BIND> result_binds_;
202+
std::vector<unsigned long> lengths_;
203+
std::unique_ptr<bool[]> nulls_;
204+
std::unique_ptr<bool[]> errors_;
205+
std::vector<std::vector<char>> string_bufs_;
206+
bool done_ = false;
207+
};
208+
209+
} // namespace ds_mysql

0 commit comments

Comments
 (0)