Skip to content

Commit f70d2d0

Browse files
committed
try separate trhread
1 parent d799f2d commit f70d2d0

5 files changed

Lines changed: 229 additions & 37 deletions

File tree

src/duckdb_py/duckdb_python.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include "duckdb/parser/parser.hpp"
66

77
#include "duckdb_python/python_objects.hpp"
8+
#include "duckdb_python/python_log_storage.hpp"
89
#include "duckdb_python/pyconnection/pyconnection.hpp"
910
#include "duckdb_python/pystatement.hpp"
1011
#include "duckdb_python/pyrelation.hpp"
@@ -1135,6 +1136,9 @@ PYBIND11_MODULE(DUCKDB_PYTHON_LIB_NAME, m) { // NOLINT
11351136
"Tokenizes a SQL string, returning a list of (position, type) tuples that can be "
11361137
"used for e.g., syntax highlighting",
11371138
py::arg("query"));
1139+
m.def("_drain_log_forwarding", &PythonLogStorage::DrainForwarder,
1140+
"Block until all engine log entries queued for Python's logging module have been "
1141+
"forwarded. Forwarding is asynchronous; this is a test/synchronization aid.");
11381142
py::enum_<PySQLTokenType>(m, "token_type", py::module_local())
11391143
.value("identifier", PySQLTokenType::PY_SQL_TOKEN_IDENTIFIER)
11401144
.value("numeric_const", PySQLTokenType::PY_SQL_TOKEN_NUMERIC_CONSTANT)

src/duckdb_py/include/duckdb_python/python_log_storage.hpp

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,16 @@ class PythonLogStorageScanState : public LogStorageScanState {
3333
//! 1. forwards it to Python's standard `logging` module (logging.getLogger("duckdb")), and
3434
//! 2. retains it in-memory so `SELECT * FROM duckdb_logs` keeps working.
3535
//!
36-
//! It subclasses BufferingLogStorage with a buffer size of 1 so each entry is flushed (and
37-
//! therefore forwarded to Python) immediately, rather than batched until a 2048-entry buffer
38-
//! fills — engine WARNINGs are sparse and must surface inline to be useful.
36+
//! It subclasses BufferingLogStorage with a buffer size of 1 so each entry is flushed
37+
//! immediately, rather than batched until a 2048-entry buffer fills — engine WARNINGs are
38+
//! sparse and must surface promptly to be useful.
39+
//!
40+
//! Forwarding to Python is ASYNCHRONOUS. The engine calls FlushChunk while holding
41+
//! LogManager::lock (a non-recursive mutex also taken by CreateLogger/WriteLogEntry). Acquiring
42+
//! the GIL there would deadlock against any other thread that holds the GIL and then enters one
43+
//! of those LogManager methods (i.e. ordinary concurrent queries). So FlushChunk only copies
44+
//! (level, message) into a process-global queue, and a single background thread — which holds
45+
//! no engine lock — drains it and forwards to `logging`. See python_log_storage.cpp.
3946
class PythonLogStorage : public BufferingLogStorage {
4047
public:
4148
explicit PythonLogStorage(DatabaseInstance &db);
@@ -45,22 +52,33 @@ class PythonLogStorage : public BufferingLogStorage {
4552
return "python_log_storage";
4653
}
4754

55+
//! Starts the process-global forwarder thread (idempotent). MUST be called with the GIL held
56+
//! and no engine lock held — i.e. from Connect(), never from the engine log-write path.
57+
static void EnsureForwarderStarted();
58+
59+
//! Blocks (releasing the GIL) until every queued entry has been forwarded to `logging`.
60+
//! Forwarding is asynchronous, so callers that need to observe a just-emitted warning on the
61+
//! Python side must drain first. Exposed to Python as `_duckdb._drain_log_forwarding`
62+
//! for deterministic tests; harmless if the forwarder was never started.
63+
static void DrainForwarder();
64+
4865
//! Single-threaded scan interface — mirrors InMemoryLogStorage so duckdb_logs can read us.
4966
bool CanScan(LoggingTargetTable table) override;
5067
unique_ptr<LogStorageScanState> CreateScanState(LoggingTargetTable table) const override;
5168
bool Scan(LogStorageScanState &state, DataChunk &result) const override;
5269
void InitializeScan(LogStorageScanState &state) const override;
5370

5471
protected:
55-
//! Stores the chunk for duckdb_logs and (for LOG_ENTRIES) forwards it to Python.
72+
//! Stores the chunk for duckdb_logs and (for LOG_ENTRIES) queues it for async forwarding.
5673
void FlushChunk(LoggingTargetTable table, DataChunk &chunk) override;
5774
//! Clears the in-memory buffers.
5875
void ResetAllBuffers() override;
5976

6077
private:
6178
ColumnDataCollection &GetBuffer(LoggingTargetTable table) const;
62-
//! Forwards each row of a LOG_ENTRIES chunk to logging.getLogger("duckdb"). Never throws.
63-
void ForwardEntriesToPython(DataChunk &chunk);
79+
//! Copies each row of a LOG_ENTRIES chunk into the global forward queue. Never touches the
80+
//! GIL or calls Python (it runs under LogManager::lock). Never throws.
81+
void EnqueueEntriesForPython(DataChunk &chunk);
6482

6583
map<LoggingTargetTable, unique_ptr<ColumnDataCollection>> log_storage_buffers;
6684
};

src/duckdb_py/pyconnection.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2299,6 +2299,10 @@ shared_ptr<DuckDBPyConnection> DuckDBPyConnection::Connect(const py::object &dat
22992299
log_manager.SetLogStorage(db_instance, "python_log_storage");
23002300
log_manager.SetEnableLogging(true);
23012301
log_manager.SetLogLevel(LogLevel::LOG_WARNING);
2302+
// Start the background thread that forwards queued entries to Python's logging
2303+
// module. We're here with the GIL held and no engine lock taken — the only safe
2304+
// place to do it (the engine log-write path holds LogManager::lock).
2305+
PythonLogStorage::EnsureForwarderStarted();
23022306
}
23032307
}
23042308
return res;

src/duckdb_py/python_log_storage.cpp

Lines changed: 147 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@
77
#include "duckdb/common/types/data_chunk.hpp"
88
#include "duckdb/common/types/vector.hpp"
99

10+
#include <condition_variable>
11+
#include <mutex>
12+
#include <thread>
13+
1014
namespace duckdb {
1115

1216
// Maps the engine's textual log level (stored as VARCHAR in the LOG_ENTRIES chunk) to the
@@ -30,6 +34,129 @@ static int LevelStringToPython(const string &level_str) {
3034
return 30;
3135
}
3236

37+
//===--------------------------------------------------------------------===//
38+
// Asynchronous forwarder
39+
//
40+
// The engine invokes FlushChunk while holding LogManager::lock — a non-recursive mutex that is
41+
// also taken by LogManager::CreateLogger / WriteLogEntry / Flush. Acquiring the GIL from inside
42+
// that lock deadlocks: a worker thread holding the lock blocks on the GIL, while another thread
43+
// holding the GIL blocks on the lock (e.g. via CreateLogger at the start of a concurrent query).
44+
// We observed exactly this with two threads each running execute() on one database.
45+
//
46+
// So forwarding is decoupled. FlushChunk only copies plain (level, message) data into this
47+
// process-global queue (no GIL, no Python). A single background thread drains the queue and
48+
// forwards to logging.getLogger("duckdb") with the GIL held but NO engine lock held — breaking
49+
// the lock-ordering cycle. One global thread (not one per DatabaseInstance) avoids spawning a
50+
// thread per connection. The queue holds owned copies, so it is independent of any storage's
51+
// lifetime.
52+
//===--------------------------------------------------------------------===//
53+
namespace {
54+
55+
struct PendingLogEntry {
56+
int level;
57+
string message;
58+
};
59+
60+
struct LogForwarder {
61+
std::mutex mutex; // guards the fields below; NEVER held while acquiring the GIL
62+
std::condition_variable cv; // forwarder waits here for work
63+
std::condition_variable idle_cv; // drainers wait here for the queue to empty
64+
vector<PendingLogEntry> queue;
65+
bool stop = false;
66+
bool started = false;
67+
bool busy = false; // a batch has been dequeued but not yet forwarded
68+
std::thread thread;
69+
};
70+
71+
LogForwarder &GetForwarder() {
72+
static LogForwarder forwarder;
73+
return forwarder;
74+
}
75+
76+
void ForwarderLoop() {
77+
auto &fwd = GetForwarder();
78+
while (true) {
79+
vector<PendingLogEntry> batch;
80+
{
81+
std::unique_lock<std::mutex> lck(fwd.mutex);
82+
fwd.cv.wait(lck, [&fwd] { return fwd.stop || !fwd.queue.empty(); });
83+
if (fwd.stop && fwd.queue.empty()) {
84+
return;
85+
}
86+
batch.swap(fwd.queue);
87+
fwd.busy = true; // queue is empty again, but this batch isn't delivered yet
88+
}
89+
// No engine lock and no forwarder lock held here, so acquiring the GIL cannot deadlock.
90+
if (Py_IsInitialized()) { // else interpreter is finalizing — acquiring the GIL would crash
91+
try {
92+
py::gil_scoped_acquire gil;
93+
auto logging = py::module::import("logging");
94+
auto logger = logging.attr("getLogger")("duckdb");
95+
for (auto &entry : batch) {
96+
logger.attr("log")(entry.level, entry.message);
97+
}
98+
} catch (...) {
99+
// Logging must never disrupt anything.
100+
}
101+
}
102+
{
103+
std::unique_lock<std::mutex> lck(fwd.mutex);
104+
fwd.busy = false;
105+
fwd.idle_cv.notify_all(); // wake any DrainForwarder() waiters
106+
}
107+
}
108+
}
109+
110+
// atexit callback: stop and join the forwarder while the interpreter is still alive. Runs on the
111+
// main thread with the GIL held; the GIL is released around join() because the forwarder may be
112+
// parked in take_gil and could not otherwise wake to observe `stop`.
113+
void StopForwarder() {
114+
auto &fwd = GetForwarder();
115+
{
116+
std::unique_lock<std::mutex> lck(fwd.mutex);
117+
if (!fwd.started) {
118+
return;
119+
}
120+
fwd.stop = true;
121+
}
122+
fwd.cv.notify_all();
123+
if (fwd.thread.joinable()) {
124+
py::gil_scoped_release release;
125+
fwd.thread.join();
126+
}
127+
}
128+
129+
} // namespace
130+
131+
void PythonLogStorage::EnsureForwarderStarted() {
132+
// Called from Connect() with the GIL held and no engine lock held.
133+
auto &fwd = GetForwarder();
134+
{
135+
std::unique_lock<std::mutex> lck(fwd.mutex);
136+
if (fwd.started) {
137+
return;
138+
}
139+
fwd.started = true;
140+
fwd.thread = std::thread(ForwarderLoop);
141+
}
142+
// Stop+join before interpreter finalization. Joining a GIL-blocked thread after Py_Finalize
143+
// would crash, so we hook atexit (which runs while the interpreter is still valid).
144+
try {
145+
auto atexit = py::module::import("atexit");
146+
atexit.attr("register")(py::cpp_function([]() { StopForwarder(); }));
147+
} catch (...) {
148+
}
149+
}
150+
151+
void PythonLogStorage::DrainForwarder() {
152+
auto &fwd = GetForwarder();
153+
// Release the GIL while waiting: the forwarder thread needs it to finish its in-flight batch
154+
// and signal idle. Holding it here would deadlock the very thread we're waiting on.
155+
py::gil_scoped_release release;
156+
std::unique_lock<std::mutex> lck(fwd.mutex);
157+
fwd.idle_cv.wait(lck, [&fwd] { return fwd.queue.empty() && !fwd.busy; });
158+
}
159+
33160
PythonLogStorage::PythonLogStorage(DatabaseInstance &db) : BufferingLogStorage(db, 1, true) {
34161
log_storage_buffers[LoggingTargetTable::LOG_ENTRIES] =
35162
make_uniq<ColumnDataCollection>(Allocator::DefaultAllocator(), GetSchema(LoggingTargetTable::LOG_ENTRIES));
@@ -48,43 +175,38 @@ ColumnDataCollection &PythonLogStorage::GetBuffer(LoggingTargetTable table) cons
48175
return *res->second;
49176
}
50177

51-
void PythonLogStorage::ForwardEntriesToPython(DataChunk &chunk) {
52-
// This fires from engine worker threads with the GIL released, and from within both the
53-
// LogManager lock and this storage's lock. It runs arbitrary user Python (logging
54-
// handlers) and MUST NOT let an exception escape: the engine calls the write path with no
55-
// try/catch, directly from query binding/execution, so a raising handler would otherwise
56-
// fail the user's query. Hence we swallow everything here.
178+
void PythonLogStorage::EnqueueEntriesForPython(DataChunk &chunk) {
179+
// Runs under LogManager::lock (and our scan lock). It MUST NOT touch the GIL or call Python:
180+
// doing so here would deadlock against any thread that holds the GIL and then enters a
181+
// LogManager method that needs the same lock (CreateLogger / WriteLogEntry / Flush). So we
182+
// only copy plain data into the global queue; the forwarder thread does the Python work
183+
// lock-free. The strings are deep-copied (GetString), so they outlive this chunk.
57184
//
58-
// Caveat: because a lock is held across this call, a handler that re-enters DuckDB on the
59-
// same thread and emits another log entry can self-deadlock on the non-recursive lock.
60-
// That is outside our control (and matches the engine's own contract for log storages).
61-
if (!Py_IsInitialized()) {
62-
return; // interpreter is finalizing — acquiring the GIL would crash
63-
}
64-
try {
65-
py::gil_scoped_acquire gil;
66-
auto logging = py::module::import("logging");
67-
auto logger = logging.attr("getLogger")("duckdb");
68-
// LOG_ENTRIES schema: context_id, timestamp, type, log_level (idx 3), message (idx 4).
69-
// log_level and message are both VARCHAR; the buffer chunk is flat.
70-
auto level_data = FlatVector::GetData<string_t>(chunk.data[3]);
71-
auto message_data = FlatVector::GetData<string_t>(chunk.data[4]);
185+
// A side benefit of decoupling: a user logging handler that raises now runs on the forwarder
186+
// thread, where the exception is swallowed — it can never reach the engine's query path.
187+
//
188+
// LOG_ENTRIES schema: context_id, timestamp, type, log_level (idx 3), message (idx 4).
189+
// log_level and message are both VARCHAR; the buffer chunk is flat.
190+
auto level_data = FlatVector::GetData<string_t>(chunk.data[3]);
191+
auto message_data = FlatVector::GetData<string_t>(chunk.data[4]);
192+
auto &fwd = GetForwarder();
193+
{
194+
std::unique_lock<std::mutex> lck(fwd.mutex);
72195
for (idx_t i = 0; i < chunk.size(); i++) {
73-
logger.attr("log")(LevelStringToPython(level_data[i].GetString()), message_data[i].GetString());
196+
fwd.queue.push_back({LevelStringToPython(level_data[i].GetString()), message_data[i].GetString()});
74197
}
75-
} catch (...) {
76-
// Logging must never disrupt query execution.
77198
}
199+
fwd.cv.notify_one();
78200
}
79201

80202
void PythonLogStorage::FlushChunk(LoggingTargetTable table, DataChunk &chunk) {
81203
D_ASSERT(table == LoggingTargetTable::LOG_ENTRIES || table == LoggingTargetTable::LOG_CONTEXTS);
82204
// Retain the entry for duckdb_logs FIRST, so a misbehaving Python handler can never cost
83205
// us a stored row.
84206
log_storage_buffers[table]->Append(chunk);
85-
// Forward only real log entries (not context metadata) to Python's logging module.
207+
// Queue only real log entries (not context metadata) for async forwarding to logging.
86208
if (table == LoggingTargetTable::LOG_ENTRIES) {
87-
ForwardEntriesToPython(chunk);
209+
EnqueueEntriesForPython(chunk);
88210
}
89211
}
90212

0 commit comments

Comments
 (0)