Skip to content

Commit d799f2d

Browse files
committed
Subclass BufferingLogStorage
1 parent db1737d commit d799f2d

4 files changed

Lines changed: 322 additions & 79 deletions

File tree

src/duckdb_py/include/duckdb_python/python_log_storage.hpp

Lines changed: 45 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,29 +9,60 @@
99
#pragma once
1010

1111
#include "duckdb/logging/log_storage.hpp"
12-
#include "duckdb/logging/logging.hpp"
12+
#include "duckdb/common/map.hpp"
13+
#include "duckdb/common/unique_ptr.hpp"
1314

1415
namespace duckdb {
1516

16-
class PythonLogStorage : public LogStorage {
17+
class ColumnDataCollection;
18+
class DatabaseInstance;
19+
20+
//! Scan state backing PythonLogStorage's in-memory buffers (so `duckdb_logs` can read them).
21+
//! We define our own rather than reuse the engine's InMemoryLogStorageScanState to avoid
22+
//! depending on whether that type's symbols are exported across platforms.
23+
class PythonLogStorageScanState : public LogStorageScanState {
24+
public:
25+
explicit PythonLogStorageScanState(LoggingTargetTable table) : LogStorageScanState(table) {
26+
}
27+
~PythonLogStorageScanState() override = default;
28+
29+
ColumnDataScanState scan_state;
30+
};
31+
32+
//! A composite log storage that does two things for every engine log entry:
33+
//! 1. forwards it to Python's standard `logging` module (logging.getLogger("duckdb")), and
34+
//! 2. retains it in-memory so `SELECT * FROM duckdb_logs` keeps working.
35+
//!
36+
//! It subclasses BufferingLogStorage with a buffer size of 1 so each entry is flushed (and
37+
//! therefore forwarded to Python) immediately, rather than batched until a 2048-entry buffer
38+
//! fills — engine WARNINGs are sparse and must surface inline to be useful.
39+
class PythonLogStorage : public BufferingLogStorage {
1740
public:
18-
PythonLogStorage() = default;
19-
~PythonLogStorage() override = default;
41+
explicit PythonLogStorage(DatabaseInstance &db);
42+
~PythonLogStorage() override;
2043

2144
const string GetStorageName() override {
2245
return "python_log_storage";
2346
}
2447

25-
void WriteLogEntry(timestamp_t timestamp, LogLevel level, const string &log_type, const string &log_message,
26-
const RegisteredLoggingContext &context) override;
27-
void WriteLogEntries(DataChunk &chunk, const RegisteredLoggingContext &context) override;
28-
void FlushAll() override {
29-
}
30-
void Flush(LoggingTargetTable table) override {
31-
}
32-
bool IsEnabled(LoggingTargetTable table) override {
33-
return true;
34-
}
48+
//! Single-threaded scan interface — mirrors InMemoryLogStorage so duckdb_logs can read us.
49+
bool CanScan(LoggingTargetTable table) override;
50+
unique_ptr<LogStorageScanState> CreateScanState(LoggingTargetTable table) const override;
51+
bool Scan(LogStorageScanState &state, DataChunk &result) const override;
52+
void InitializeScan(LogStorageScanState &state) const override;
53+
54+
protected:
55+
//! Stores the chunk for duckdb_logs and (for LOG_ENTRIES) forwards it to Python.
56+
void FlushChunk(LoggingTargetTable table, DataChunk &chunk) override;
57+
//! Clears the in-memory buffers.
58+
void ResetAllBuffers() override;
59+
60+
private:
61+
ColumnDataCollection &GetBuffer(LoggingTargetTable table) const;
62+
//! Forwards each row of a LOG_ENTRIES chunk to logging.getLogger("duckdb"). Never throws.
63+
void ForwardEntriesToPython(DataChunk &chunk);
64+
65+
map<LoggingTargetTable, unique_ptr<ColumnDataCollection>> log_storage_buffers;
3566
};
3667

3768
} // namespace duckdb

src/duckdb_py/pyconnection.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2288,7 +2288,7 @@ shared_ptr<DuckDBPyConnection> DuckDBPyConnection::Connect(const py::object &dat
22882288
{
22892289
auto &db_instance = *res->con.GetDatabase().instance;
22902290
auto &log_manager = db_instance.GetLogManager();
2291-
auto storage = make_shared_ptr<PythonLogStorage>();
2291+
auto storage = make_shared_ptr<PythonLogStorage>(db_instance);
22922292
shared_ptr<LogStorage> storage_base = storage;
22932293
// RegisterLogStorage returns false if the name is already registered on this
22942294
// DatabaseInstance. Instances are cached and shared across connections/cursors, so
Lines changed: 78 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,92 +1,120 @@
11
#include "duckdb_python/python_log_storage.hpp"
22
#include "duckdb_python/pybind11/pybind_wrapper.hpp"
3+
4+
#include "duckdb/common/allocator.hpp"
5+
#include "duckdb/common/exception.hpp"
6+
#include "duckdb/common/types/column/column_data_collection.hpp"
37
#include "duckdb/common/types/data_chunk.hpp"
48
#include "duckdb/common/types/vector.hpp"
5-
#include "duckdb/logging/logging.hpp"
69

710
namespace duckdb {
811

9-
static int LogLevelToPython(LogLevel level) {
10-
switch (level) {
11-
case LogLevel::LOG_TRACE:
12-
case LogLevel::LOG_DEBUG:
13-
return 10; // logging.DEBUG
14-
case LogLevel::LOG_INFO:
15-
return 20; // logging.INFO
16-
case LogLevel::LOG_WARNING:
17-
return 30; // logging.WARNING
18-
case LogLevel::LOG_ERROR:
19-
return 40; // logging.ERROR
20-
case LogLevel::LOG_FATAL:
21-
return 50; // logging.CRITICAL
22-
default:
23-
return 30;
24-
}
25-
}
26-
12+
// Maps the engine's textual log level (stored as VARCHAR in the LOG_ENTRIES chunk) to the
13+
// integer levels of Python's logging module.
2714
static int LevelStringToPython(const string &level_str) {
2815
if (level_str == "TRACE" || level_str == "DEBUG") {
29-
return 10;
16+
return 10; // logging.DEBUG
3017
}
3118
if (level_str == "INFO") {
32-
return 20;
19+
return 20; // logging.INFO
3320
}
3421
if (level_str == "WARNING") {
35-
return 30;
22+
return 30; // logging.WARNING
3623
}
3724
if (level_str == "ERROR") {
38-
return 40;
25+
return 40; // logging.ERROR
3926
}
4027
if (level_str == "FATAL") {
41-
return 50;
28+
return 50; // logging.CRITICAL
4229
}
4330
return 30;
4431
}
4532

46-
// Both write methods run on engine worker threads and invoke arbitrary user Python (the
47-
// handlers installed on the "duckdb" logger). The engine calls these directly from query
48-
// binding/execution with NO surrounding try/catch (see LogManager::WriteLogEntry), so an
49-
// exception escaping here would fail the user's query. Logging is a side effect — it must
50-
// never do that. Hence every body swallows all exceptions.
51-
//
52-
// Note also that the engine holds LogManager::lock (a non-recursive mutex) across this call.
53-
// A handler that re-enters DuckDB on the same thread and emits another log entry would
54-
// self-deadlock on that lock — outside our control, but worth knowing.
33+
PythonLogStorage::PythonLogStorage(DatabaseInstance &db) : BufferingLogStorage(db, 1, true) {
34+
log_storage_buffers[LoggingTargetTable::LOG_ENTRIES] =
35+
make_uniq<ColumnDataCollection>(Allocator::DefaultAllocator(), GetSchema(LoggingTargetTable::LOG_ENTRIES));
36+
log_storage_buffers[LoggingTargetTable::LOG_CONTEXTS] =
37+
make_uniq<ColumnDataCollection>(Allocator::DefaultAllocator(), GetSchema(LoggingTargetTable::LOG_CONTEXTS));
38+
}
5539

56-
void PythonLogStorage::WriteLogEntry(timestamp_t, LogLevel level, const string &, const string &log_message,
57-
const RegisteredLoggingContext &) {
58-
if (!Py_IsInitialized()) {
59-
return; // interpreter is finalizing — acquiring the GIL would crash
60-
}
61-
try {
62-
py::gil_scoped_acquire gil;
63-
auto logging = py::module::import("logging");
64-
auto logger = logging.attr("getLogger")("duckdb");
65-
logger.attr("log")(LogLevelToPython(level), log_message);
66-
} catch (...) {
67-
// Logging must not disrupt query execution.
40+
PythonLogStorage::~PythonLogStorage() {
41+
}
42+
43+
ColumnDataCollection &PythonLogStorage::GetBuffer(LoggingTargetTable table) const {
44+
auto res = log_storage_buffers.find(table);
45+
if (res == log_storage_buffers.end()) {
46+
throw InternalException("PythonLogStorage: failed to find buffer for logging target table");
6847
}
48+
return *res->second;
6949
}
7050

71-
void PythonLogStorage::WriteLogEntries(DataChunk &chunk, const RegisteredLoggingContext &) {
51+
void PythonLogStorage::ForwardEntriesToPython(DataChunk &chunk) {
52+
// This fires from engine worker threads with the GIL released, and from within both the
53+
// LogManager lock and this storage's lock. It runs arbitrary user Python (logging
54+
// handlers) and MUST NOT let an exception escape: the engine calls the write path with no
55+
// try/catch, directly from query binding/execution, so a raising handler would otherwise
56+
// fail the user's query. Hence we swallow everything here.
57+
//
58+
// Caveat: because a lock is held across this call, a handler that re-enters DuckDB on the
59+
// same thread and emits another log entry can self-deadlock on the non-recursive lock.
60+
// That is outside our control (and matches the engine's own contract for log storages).
7261
if (!Py_IsInitialized()) {
7362
return; // interpreter is finalizing — acquiring the GIL would crash
7463
}
7564
try {
7665
py::gil_scoped_acquire gil;
7766
auto logging = py::module::import("logging");
7867
auto logger = logging.attr("getLogger")("duckdb");
79-
// DataChunk is in LOG_ENTRIES format: context_id, timestamp, type, log_level, message.
80-
// log_level (idx 3) and message (idx 4) are both VARCHAR; the chunk is freshly
81-
// allocated by the engine so the vectors are flat.
68+
// LOG_ENTRIES schema: context_id, timestamp, type, log_level (idx 3), message (idx 4).
69+
// log_level and message are both VARCHAR; the buffer chunk is flat.
8270
auto level_data = FlatVector::GetData<string_t>(chunk.data[3]);
8371
auto message_data = FlatVector::GetData<string_t>(chunk.data[4]);
8472
for (idx_t i = 0; i < chunk.size(); i++) {
8573
logger.attr("log")(LevelStringToPython(level_data[i].GetString()), message_data[i].GetString());
8674
}
8775
} catch (...) {
88-
// Logging must not disrupt query execution.
76+
// Logging must never disrupt query execution.
8977
}
9078
}
9179

80+
void PythonLogStorage::FlushChunk(LoggingTargetTable table, DataChunk &chunk) {
81+
D_ASSERT(table == LoggingTargetTable::LOG_ENTRIES || table == LoggingTargetTable::LOG_CONTEXTS);
82+
// Retain the entry for duckdb_logs FIRST, so a misbehaving Python handler can never cost
83+
// us a stored row.
84+
log_storage_buffers[table]->Append(chunk);
85+
// Forward only real log entries (not context metadata) to Python's logging module.
86+
if (table == LoggingTargetTable::LOG_ENTRIES) {
87+
ForwardEntriesToPython(chunk);
88+
}
89+
}
90+
91+
void PythonLogStorage::ResetAllBuffers() {
92+
BufferingLogStorage::ResetAllBuffers();
93+
for (const auto &buffer : log_storage_buffers) {
94+
buffer.second->Reset();
95+
}
96+
}
97+
98+
bool PythonLogStorage::CanScan(LoggingTargetTable table) {
99+
unique_lock<mutex> lck(lock);
100+
return IsEnabledInternal(table);
101+
}
102+
103+
unique_ptr<LogStorageScanState> PythonLogStorage::CreateScanState(LoggingTargetTable table) const {
104+
return make_uniq<PythonLogStorageScanState>(table);
105+
}
106+
107+
bool PythonLogStorage::Scan(LogStorageScanState &state, DataChunk &result) const {
108+
unique_lock<mutex> lck(lock);
109+
auto &python_scan_state = state.Cast<PythonLogStorageScanState>();
110+
return GetBuffer(python_scan_state.table).Scan(python_scan_state.scan_state, result);
111+
}
112+
113+
void PythonLogStorage::InitializeScan(LogStorageScanState &state) const {
114+
unique_lock<mutex> lck(lock);
115+
auto &python_scan_state = state.Cast<PythonLogStorageScanState>();
116+
GetBuffer(python_scan_state.table)
117+
.InitializeScan(python_scan_state.scan_state, ColumnDataScanProperties::DISALLOW_ZERO_COPY);
118+
}
119+
92120
} // namespace duckdb

0 commit comments

Comments
 (0)