Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/export/exporter_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,10 @@ class StatsExporter {
virtual ~StatsExporter() = default;
};

// Allows PG logging of exceptional cases without postgres.h
// Bridge functions for PG logging from files that cannot include postgres.h
// (e.g. otel_exporter.cc conflicts with libintl.h via gRPC headers).
void LogNegativeValue(const std::string& column_name, int64_t value);
void LogExporterWarning(const char* context, const char* message);

// Expected usage:
// void ProcessBatch(StatsExporter *exporter) {
Expand Down
133 changes: 61 additions & 72 deletions src/export/otel_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ class OTelExporter : public StatsExporter {
++exported_count;
}

opentelemetry::sdk::resource::Resource CreateResource();
void InitMetricsPipeline(const std::string& endpoint,
const opentelemetry::sdk::resource::Resource& resource);
void InitLogPipeline(const std::string& endpoint,
const opentelemetry::sdk::resource::Resource& resource);

// =====================================================================
// Column implementation classes (translate OTel concepts to CH Columns)
// =====================================================================
Expand Down Expand Up @@ -373,94 +379,77 @@ class OTelExporter : public StatsExporter {
std::vector<shared_ptr<BasicColumn>> columns;
};

opentelemetry::sdk::resource::Resource OTelExporter::CreateResource() {
return opentelemetry::sdk::resource::Resource::Create({
{"service.name", "pg_stat_ch"},
{"service.version", std::string(PG_STAT_CH_VERSION)},
{"host.name", GetAHostname("postgres-primary")},
});
}

void OTelExporter::InitMetricsPipeline(const std::string& endpoint,
const opentelemetry::sdk::resource::Resource& resource) {
otlp::OtlpGrpcMetricExporterOptions metric_opts;
metric_opts.endpoint = endpoint;

metrics_sdk::PeriodicExportingMetricReaderOptions reader_opts;
reader_opts.export_interval_millis = std::chrono::milliseconds(psch_otel_metric_interval_ms);
reader_opts.export_timeout_millis = std::chrono::milliseconds(psch_otel_metric_interval_ms / 2);

metrics_reader = metrics_sdk::PeriodicExportingMetricReaderFactory::Create(
otlp::OtlpGrpcMetricExporterFactory::Create(metric_opts), reader_opts);

metrics_provider = std::make_shared<metrics_sdk::MeterProvider>(
std::make_unique<metrics_sdk::ViewRegistry>(), resource);
metrics_provider->AddMetricReader(metrics_reader);
}

void OTelExporter::InitLogPipeline(const std::string& endpoint,
const opentelemetry::sdk::resource::Resource& resource) {
otlp::OtlpGrpcLogRecordExporterOptions log_opts;
log_opts.endpoint = endpoint;

// Cap batch size by byte budget to stay under gRPC 4 MiB default.
static constexpr size_t kOtelMinBytesPerRecord = 1200;
size_t batch_size_by_bytes =
static_cast<size_t>(psch_otel_log_max_bytes) / kOtelMinBytesPerRecord;

logs_sdk::BatchLogRecordProcessorOptions batch_opts;
batch_opts.max_queue_size = psch_otel_log_queue_size;
batch_opts.max_export_batch_size =
std::min(static_cast<size_t>(psch_otel_log_batch_size), batch_size_by_bytes);
batch_opts.schedule_delay_millis = std::chrono::milliseconds(psch_otel_log_delay_ms);
Comment on lines +417 to +421
Copy link

Copilot AI Apr 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InitLogPipeline() uses std::min but this file doesn’t include . It may compile today via transitive includes, but that’s not guaranteed; please add an explicit #include to make the dependency clear and robust.

Copilot uses AI. Check for mistakes.

log_provider = std::make_shared<logs_sdk::LoggerProvider>(
logs_sdk::BatchLogRecordProcessorFactory::Create(
otlp::OtlpGrpcLogRecordExporterFactory::Create(log_opts), batch_opts),
resource);
}

bool OTelExporter::EstablishNewConnection() {
try {
const std::string hostname = GetAHostname("postgres-primary");
const std::string endpoint =
(psch_otel_endpoint && *psch_otel_endpoint) ? psch_otel_endpoint : "localhost:4317";
const std::string pgch_version = PG_STAT_CH_VERSION;

// Resource (The "ID Card" for our service)
auto resource_attributes = opentelemetry::sdk::resource::ResourceAttributes{
{"service.name", "pg_stat_ch"},
{"service.version", pgch_version},
{"host.name", hostname} // Ideally fetch real hostname
};
auto resource = opentelemetry::sdk::resource::Resource::Create(resource_attributes);

// Configure Metrics
// -------------------------------------------------------------------------
otlp::OtlpGrpcMetricExporterOptions metric_opts;
metric_opts.endpoint = endpoint;

// Configure Reader (async periodic export — does not block bgworker)
metrics_sdk::PeriodicExportingMetricReaderOptions reader_opts;
reader_opts.export_interval_millis = std::chrono::milliseconds(psch_otel_metric_interval_ms);
reader_opts.export_timeout_millis = std::chrono::milliseconds(psch_otel_metric_interval_ms / 2);

metrics_reader = metrics_sdk::PeriodicExportingMetricReaderFactory::Create(
otlp::OtlpGrpcMetricExporterFactory::Create(metric_opts), reader_opts);

// Create the Provider with our Resource and add our Reader
// Note: We use the ViewRegistry (default)
metrics_provider = std::make_shared<metrics_sdk::MeterProvider>(
std::make_unique<metrics_sdk::ViewRegistry>(), resource);
metrics_provider->AddMetricReader(metrics_reader);

// Configure Logs
// -------------------------------------------------------------------------
otlp::OtlpGrpcLogRecordExporterOptions log_opts;
log_opts.endpoint = endpoint;

// Create Logger Provider with batch processor for throughput.
// Cap max_export_batch_size by the byte budget: even at the minimum variable-field
// size the fixed overhead alone can push a large batch over the gRPC 4 MiB default.
// DequeueEvents already enforces the byte budget on the producer side; this caps
// the SDK's internal batch size as a second line of defence.
static constexpr size_t kOtelMinBytesPerRecord = 1200; // fixed overhead only
size_t batch_size_by_bytes =
static_cast<size_t>(psch_otel_log_max_bytes) / kOtelMinBytesPerRecord;
logs_sdk::BatchLogRecordProcessorOptions batch_opts;
batch_opts.max_queue_size = psch_otel_log_queue_size;
batch_opts.max_export_batch_size =
std::min(static_cast<size_t>(psch_otel_log_batch_size), batch_size_by_bytes);
batch_opts.schedule_delay_millis = std::chrono::milliseconds(psch_otel_log_delay_ms);

log_provider = std::make_shared<logs_sdk::LoggerProvider>(
logs_sdk::BatchLogRecordProcessorFactory::Create(
otlp::OtlpGrpcLogRecordExporterFactory::Create(log_opts), batch_opts),
resource);

// Get Instruments
// -------------------------------------------------------------------------
auto resource = CreateResource();
InitMetricsPipeline(endpoint, resource);
InitLogPipeline(endpoint, resource);

const std::string pgch_version = PG_STAT_CH_VERSION;
meter = metrics_provider->GetMeter("pg_stat_ch", pgch_version);
logger = log_provider->GetLogger("pg_stat_ch", "pg_stat_ch_logs");

return true;
} catch (const std::exception& e) {
// PschLog(LogLevel::Warning, "pg_stat_ch: OTel init failed: %s", e.what());
LogExporterWarning("OTel init failed", e.what());
return false;
}
Comment thread
serprex marked this conversation as resolved.
}

bool OTelExporter::CommitBatch() {
EndRow();

// Both metrics and logs are exported asynchronously by background threads:
// - PeriodicExportingMetricReader: exports histograms every metric_interval_ms
// - BatchLogRecordProcessor: exports log batches every log_delay_ms
//
// We do NOT call ForceFlush here. ForceFlush blocks until the background
// thread finishes its current gRPC export, which stalls dequeuing for seconds
// and causes shmem queue overflow. Instead, EmitLogRecord() just enqueues to
// the batch processor's internal buffer (non-blocking), and the bgworker
// loops immediately back to dequeue more events.
//
// Trade-off: if the batch processor's internal queue fills up (gRPC slower
// than event rate), it silently drops log records. This is acceptable for
// best-effort telemetry — the alternative (blocking) causes shmem drops which
// lose events before they're even processed.

// No ForceFlush — metrics and logs export asynchronously via background
// threads. Blocking here would stall dequeuing and cause shmem overflow.
ResetFailures();
return true;
}
Expand Down
105 changes: 27 additions & 78 deletions src/export/stats_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,15 @@ const char* CmdTypeToString(PschCmdType cmd) {
}
}

// Clamp a field length to its buffer maximum, warning on overflow.
template <typename LenT>
LenT ClampFieldLen(LenT len, LenT max, const char* field_name) {
if (len <= max)
return len;
elog(WARNING, "pg_stat_ch: invalid %s %u, clamping", field_name, static_cast<unsigned>(len));
Comment on lines +65 to +68
Copy link

Copilot AI Apr 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ClampFieldLen() warning message is missing key context: it doesn’t report the maximum allowed value, and it no longer includes any event identifier (e.g., pid/query_id). That makes diagnosing corrupted/invalid events much harder. Consider logging both the observed len and the max (and optionally pid/query_id from the caller) in the WARNING.

Suggested change
LenT ClampFieldLen(LenT len, LenT max, const char* field_name) {
if (len <= max)
return len;
elog(WARNING, "pg_stat_ch: invalid %s %u, clamping", field_name, static_cast<unsigned>(len));
LenT ClampFieldLen(LenT len, LenT max, const char* field_name, const char* event_context = nullptr) {
if (len <= max)
return len;
if (event_context != nullptr && event_context[0] != '\0') {
elog(WARNING,
"pg_stat_ch: invalid %s len=%llu max=%llu, clamping (%s)",
field_name,
static_cast<unsigned long long>(len),
static_cast<unsigned long long>(max),
event_context);
} else {
elog(WARNING,
"pg_stat_ch: invalid %s len=%llu max=%llu, clamping",
field_name,
static_cast<unsigned long long>(len),
static_cast<unsigned long long>(max));
}

Copilot uses AI. Check for mistakes.
return max;
}

// Dequeue events from the shared memory queue
std::vector<PschEvent> DequeueEvents(int max_events) {
std::vector<PschEvent> events;
Expand All @@ -78,25 +87,16 @@ void ExportEventStats(const std::vector<PschEvent>& events, StatsExporter* expor

exporter->BeginBatch();

elog(DEBUG2, "pg_stat_ch: creating column objects");

// Basic columns
elog(DEBUG3, "pg_stat_ch: creating col_ts_start");
auto col_ts_start = exporter->RecordDateTime("ts_start");
elog(DEBUG3, "pg_stat_ch: col_ts_start created");
auto col_duration_us = exporter->DbDurationColumn();
// Use pre-resolved names from event (resolved at capture time in hooks)
auto col_db = exporter->DbNameColumn();
auto col_username = exporter->DbUserColumn();
elog(DEBUG3, "pg_stat_ch: basic columns created");
auto col_pid = exporter->RecordInt32("pid");
auto col_query_id = exporter->RecordInt64("query_id");
auto col_cmd_type = exporter->DbOperationColumn();
auto col_rows = exporter->MetricUInt64("rows");
auto col_query = exporter->DbQueryTextColumn();
elog(DEBUG3, "pg_stat_ch: all basic columns created");

// Buffer usage columns (hit/read are histograms, rest are records)
auto col_shared_blks_hit = exporter->MetricInt64("shared_blks_hit");
auto col_shared_blks_read = exporter->MetricInt64("shared_blks_read");
auto col_shared_blks_dirtied = exporter->RecordInt64("shared_blks_dirtied");
Expand All @@ -108,77 +108,52 @@ void ExportEventStats(const std::vector<PschEvent>& events, StatsExporter* expor
auto col_temp_blks_read = exporter->RecordInt64("temp_blks_read");
auto col_temp_blks_written = exporter->RecordInt64("temp_blks_written");

// I/O timing columns (records — rarely non-zero)
auto col_shared_blk_read_time_us = exporter->RecordInt64("shared_blk_read_time_us");
auto col_shared_blk_write_time_us = exporter->RecordInt64("shared_blk_write_time_us");
auto col_local_blk_read_time_us = exporter->RecordInt64("local_blk_read_time_us");
auto col_local_blk_write_time_us = exporter->RecordInt64("local_blk_write_time_us");
auto col_temp_blk_read_time_us = exporter->RecordInt64("temp_blk_read_time_us");
auto col_temp_blk_write_time_us = exporter->RecordInt64("temp_blk_write_time_us");

// WAL usage columns (records — rarely non-zero for reads)
auto col_wal_records = exporter->RecordInt64("wal_records");
auto col_wal_fpi = exporter->RecordInt64("wal_fpi");
auto col_wal_bytes = exporter->RecordUInt64("wal_bytes");

// CPU time columns (records)
auto col_cpu_user_time_us = exporter->RecordInt64("cpu_user_time_us");
auto col_cpu_sys_time_us = exporter->RecordInt64("cpu_sys_time_us");

// JIT columns (records — rarely non-zero)
auto col_jit_functions = exporter->RecordInt32("jit_functions");
auto col_jit_generation_time_us = exporter->RecordInt32("jit_generation_time_us");
auto col_jit_deform_time_us = exporter->RecordInt32("jit_deform_time_us");
auto col_jit_inlining_time_us = exporter->RecordInt32("jit_inlining_time_us");
auto col_jit_optimization_time_us = exporter->RecordInt32("jit_optimization_time_us");
auto col_jit_emission_time_us = exporter->RecordInt32("jit_emission_time_us");

// Parallel worker columns (records)
auto col_parallel_workers_planned = exporter->RecordInt16("parallel_workers_planned");
auto col_parallel_workers_launched = exporter->RecordInt16("parallel_workers_launched");

elog(DEBUG3, "pg_stat_ch: creating error columns");
// Error columns (records)
auto col_err_sqlstate = exporter->MetricFixedString(5, "err_sqlstate");
auto col_err_elevel = exporter->RecordUInt8("err_elevel");
auto col_err_message = exporter->RecordString("err_message");
elog(DEBUG3, "pg_stat_ch: error columns created");

// Client context columns; records rather than tags (no histogram in OTel)
auto col_app = exporter->RecordString("app");
auto col_client_addr = exporter->RecordString("client_addr");

elog(DEBUG2, "pg_stat_ch: all columns created, starting event loop");
size_t event_idx = 0;
for (const auto& ev : events) {
elog(DEBUG2, "pg_stat_ch: processing event %zu: pid=%d, query_len=%u", event_idx, ev.pid,
ev.query_len);
exporter->BeginRow();

int64_t unix_us = ev.ts_start + kPostgresEpochOffsetUs;
col_ts_start->Append(unix_us);
col_ts_start->Append(ev.ts_start + kPostgresEpochOffsetUs);
col_duration_us->Append(ev.duration_us);

// Use pre-resolved names from event (resolved at capture time in hooks)
col_db->Append(std::string(ev.datname, ev.datname_len));
col_username->Append(std::string(ev.username, ev.username_len));
Comment on lines 147 to 148
Copy link

Copilot AI Apr 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

datname_len and username_len are used directly to size std::string, but they are not validated against the fixed buffers (datname[64], username[64]). If an event ever contains an out-of-range length, this becomes an out-of-bounds read (and potentially data exfiltration/crash). Since you’re already clamping other variable-length fields, please clamp these too (e.g., to sizeof(ev.datname)-1 / sizeof(ev.username)-1) before constructing strings.

Suggested change
col_db->Append(std::string(ev.datname, ev.datname_len));
col_username->Append(std::string(ev.username, ev.username_len));
auto dlen =
ClampFieldLen(ev.datname_len, static_cast<uint8>(sizeof(ev.datname) - 1), "datname_len");
auto ulen = ClampFieldLen(ev.username_len, static_cast<uint8>(sizeof(ev.username) - 1),
"username_len");
col_db->Append(std::string(ev.datname, dlen));
col_username->Append(std::string(ev.username, ulen));

Copilot uses AI. Check for mistakes.

col_pid->Append(ev.pid);
col_query_id->Append(static_cast<int64_t>(ev.queryid));
col_cmd_type->Append(CmdTypeToString(ev.cmd_type));
col_rows->Append(ev.rows);

// Validate query_len before using it
uint16 safe_query_len = ev.query_len;
if (safe_query_len > PSCH_MAX_QUERY_LEN) {
elog(WARNING, "pg_stat_ch: event %zu has invalid query_len %u, clamping", event_idx,
safe_query_len);
safe_query_len = PSCH_MAX_QUERY_LEN;
}
col_query->Append(std::string(ev.query, safe_query_len));
auto qlen = ClampFieldLen(ev.query_len, static_cast<uint16>(PSCH_MAX_QUERY_LEN), "query_len");
col_query->Append(std::string(ev.query, qlen));

elog(DEBUG3, "pg_stat_ch: event %zu - buffer usage", event_idx);
// Buffer usage
col_shared_blks_hit->Append(ev.shared_blks_hit);
col_shared_blks_read->Append(ev.shared_blks_read);
col_shared_blks_dirtied->Append(ev.shared_blks_dirtied);
Expand All @@ -190,78 +165,52 @@ void ExportEventStats(const std::vector<PschEvent>& events, StatsExporter* expor
col_temp_blks_read->Append(ev.temp_blks_read);
col_temp_blks_written->Append(ev.temp_blks_written);

elog(DEBUG3, "pg_stat_ch: event %zu - I/O timing", event_idx);
// I/O timing
col_shared_blk_read_time_us->Append(ev.shared_blk_read_time_us);
col_shared_blk_write_time_us->Append(ev.shared_blk_write_time_us);
col_local_blk_read_time_us->Append(ev.local_blk_read_time_us);
col_local_blk_write_time_us->Append(ev.local_blk_write_time_us);
col_temp_blk_read_time_us->Append(ev.temp_blk_read_time_us);
col_temp_blk_write_time_us->Append(ev.temp_blk_write_time_us);

elog(DEBUG3, "pg_stat_ch: event %zu - WAL usage", event_idx);
// WAL usage
col_wal_records->Append(ev.wal_records);
col_wal_fpi->Append(ev.wal_fpi);
col_wal_bytes->Append(ev.wal_bytes);

elog(DEBUG3, "pg_stat_ch: event %zu - CPU time", event_idx);
// CPU time
col_cpu_user_time_us->Append(ev.cpu_user_time_us);
col_cpu_sys_time_us->Append(ev.cpu_sys_time_us);

elog(DEBUG3, "pg_stat_ch: event %zu - JIT", event_idx);
// JIT
col_jit_functions->Append(ev.jit_functions);
col_jit_generation_time_us->Append(ev.jit_generation_time_us);
col_jit_deform_time_us->Append(ev.jit_deform_time_us);
col_jit_inlining_time_us->Append(ev.jit_inlining_time_us);
col_jit_optimization_time_us->Append(ev.jit_optimization_time_us);
col_jit_emission_time_us->Append(ev.jit_emission_time_us);

elog(DEBUG3, "pg_stat_ch: event %zu - parallel workers", event_idx);
// Parallel workers
col_parallel_workers_planned->Append(ev.parallel_workers_planned);
col_parallel_workers_launched->Append(ev.parallel_workers_launched);

elog(DEBUG3, "pg_stat_ch: event %zu - error info", event_idx);
// Error info (5-char SQLSTATE, trimmed)
col_err_sqlstate->Append(std::string_view(ev.err_sqlstate, 5));
col_err_elevel->Append(ev.err_elevel);
// Error message (validate length)
uint16 safe_err_msg_len = ev.err_message_len;
if (safe_err_msg_len > PSCH_MAX_ERR_MSG_LEN) {
elog(WARNING, "pg_stat_ch: event %zu has invalid err_message_len %u, clamping", event_idx,
safe_err_msg_len);
safe_err_msg_len = PSCH_MAX_ERR_MSG_LEN;
}
col_err_message->Append(std::string(ev.err_message, safe_err_msg_len));

elog(DEBUG3, "pg_stat_ch: event %zu - client context (app_len=%u, addr_len=%u)", event_idx,
ev.application_name_len, ev.client_addr_len);
// Client context - validate lengths
uint8 safe_app_len = ev.application_name_len;
if (safe_app_len > 63) {
elog(WARNING, "pg_stat_ch: event %zu has invalid app_name_len %u, clamping", event_idx,
safe_app_len);
safe_app_len = 63;
}
uint8 safe_addr_len = ev.client_addr_len;
if (safe_addr_len > 45) {
elog(WARNING, "pg_stat_ch: event %zu has invalid client_addr_len %u, clamping", event_idx,
safe_addr_len);
safe_addr_len = 45;
}
col_app->Append(std::string(ev.application_name, safe_app_len));
col_client_addr->Append(std::string(ev.client_addr, safe_addr_len));

event_idx++;
auto elen = ClampFieldLen(ev.err_message_len, static_cast<uint16>(PSCH_MAX_ERR_MSG_LEN),
"err_message_len");
col_err_message->Append(std::string(ev.err_message, elen));

auto alen = ClampFieldLen(ev.application_name_len, static_cast<uint8>(PSCH_MAX_APP_NAME_LEN),
"app_name_len");
auto clen = ClampFieldLen(ev.client_addr_len, static_cast<uint8>(PSCH_MAX_CLIENT_ADDR_LEN),
"client_addr_len");
col_app->Append(std::string(ev.application_name, alen));
col_client_addr->Append(std::string(ev.client_addr, clen));
}
elog(DEBUG1, "pg_stat_ch: finished processing %zu events", event_idx);
elog(DEBUG1, "pg_stat_ch: finished processing %zu events", events.size());
}

} // namespace

void LogExporterWarning(const char* context, const char* message) {
ereport(WARNING, errmsg("pg_stat_ch: %s: %s", context, message));
}

// Used to report negative values, which are not supported by OTel.
void LogNegativeValue(const std::string& column_name, int64_t value) {
static std::chrono::steady_clock::time_point last_log = {};
Expand Down
Loading
Loading