diff --git a/src/export/exporter_interface.h b/src/export/exporter_interface.h index c21db5a..183ecb7 100644 --- a/src/export/exporter_interface.h +++ b/src/export/exporter_interface.h @@ -79,8 +79,10 @@ class StatsExporter { virtual ~StatsExporter() = default; }; -// Allows PG logging of exceptional cases without postgres.h +// Bridge functions for PG logging from files that cannot include postgres.h +// (e.g. otel_exporter.cc conflicts with libintl.h via gRPC headers). void LogNegativeValue(const std::string& column_name, int64_t value); +void LogExporterWarning(const char* context, const char* message); // Expected usage: // void ProcessBatch(StatsExporter *exporter) { diff --git a/src/export/otel_exporter.cc b/src/export/otel_exporter.cc index 67d06c2..cef7290 100644 --- a/src/export/otel_exporter.cc +++ b/src/export/otel_exporter.cc @@ -150,6 +150,12 @@ class OTelExporter : public StatsExporter { ++exported_count; } + opentelemetry::sdk::resource::Resource CreateResource(); + void InitMetricsPipeline(const std::string& endpoint, + const opentelemetry::sdk::resource::Resource& resource); + void InitLogPipeline(const std::string& endpoint, + const opentelemetry::sdk::resource::Resource& resource); + // ===================================================================== // Column implementation classes (translate OTel concepts to CH Columns) // ===================================================================== @@ -373,94 +379,77 @@ class OTelExporter : public StatsExporter { std::vector> columns; }; +opentelemetry::sdk::resource::Resource OTelExporter::CreateResource() { + return opentelemetry::sdk::resource::Resource::Create({ + {"service.name", "pg_stat_ch"}, + {"service.version", std::string(PG_STAT_CH_VERSION)}, + {"host.name", GetAHostname("postgres-primary")}, + }); +} + +void OTelExporter::InitMetricsPipeline(const std::string& endpoint, + const opentelemetry::sdk::resource::Resource& resource) { + otlp::OtlpGrpcMetricExporterOptions metric_opts; + metric_opts.endpoint = endpoint; + + metrics_sdk::PeriodicExportingMetricReaderOptions reader_opts; + reader_opts.export_interval_millis = std::chrono::milliseconds(psch_otel_metric_interval_ms); + reader_opts.export_timeout_millis = std::chrono::milliseconds(psch_otel_metric_interval_ms / 2); + + metrics_reader = metrics_sdk::PeriodicExportingMetricReaderFactory::Create( + otlp::OtlpGrpcMetricExporterFactory::Create(metric_opts), reader_opts); + + metrics_provider = std::make_shared( + std::make_unique(), resource); + metrics_provider->AddMetricReader(metrics_reader); +} + +void OTelExporter::InitLogPipeline(const std::string& endpoint, + const opentelemetry::sdk::resource::Resource& resource) { + otlp::OtlpGrpcLogRecordExporterOptions log_opts; + log_opts.endpoint = endpoint; + + // Cap batch size by byte budget to stay under gRPC 4 MiB default. + static constexpr size_t kOtelMinBytesPerRecord = 1200; + size_t batch_size_by_bytes = + static_cast(psch_otel_log_max_bytes) / kOtelMinBytesPerRecord; + + logs_sdk::BatchLogRecordProcessorOptions batch_opts; + batch_opts.max_queue_size = psch_otel_log_queue_size; + batch_opts.max_export_batch_size = + std::min(static_cast(psch_otel_log_batch_size), batch_size_by_bytes); + batch_opts.schedule_delay_millis = std::chrono::milliseconds(psch_otel_log_delay_ms); + + log_provider = std::make_shared( + logs_sdk::BatchLogRecordProcessorFactory::Create( + otlp::OtlpGrpcLogRecordExporterFactory::Create(log_opts), batch_opts), + resource); +} + bool OTelExporter::EstablishNewConnection() { try { - const std::string hostname = GetAHostname("postgres-primary"); const std::string endpoint = (psch_otel_endpoint && *psch_otel_endpoint) ? psch_otel_endpoint : "localhost:4317"; - const std::string pgch_version = PG_STAT_CH_VERSION; - // Resource (The "ID Card" for our service) - auto resource_attributes = opentelemetry::sdk::resource::ResourceAttributes{ - {"service.name", "pg_stat_ch"}, - {"service.version", pgch_version}, - {"host.name", hostname} // Ideally fetch real hostname - }; - auto resource = opentelemetry::sdk::resource::Resource::Create(resource_attributes); - - // Configure Metrics - // ------------------------------------------------------------------------- - otlp::OtlpGrpcMetricExporterOptions metric_opts; - metric_opts.endpoint = endpoint; - - // Configure Reader (async periodic export — does not block bgworker) - metrics_sdk::PeriodicExportingMetricReaderOptions reader_opts; - reader_opts.export_interval_millis = std::chrono::milliseconds(psch_otel_metric_interval_ms); - reader_opts.export_timeout_millis = std::chrono::milliseconds(psch_otel_metric_interval_ms / 2); - - metrics_reader = metrics_sdk::PeriodicExportingMetricReaderFactory::Create( - otlp::OtlpGrpcMetricExporterFactory::Create(metric_opts), reader_opts); - - // Create the Provider with our Resource and add our Reader - // Note: We use the ViewRegistry (default) - metrics_provider = std::make_shared( - std::make_unique(), resource); - metrics_provider->AddMetricReader(metrics_reader); - - // Configure Logs - // ------------------------------------------------------------------------- - otlp::OtlpGrpcLogRecordExporterOptions log_opts; - log_opts.endpoint = endpoint; - - // Create Logger Provider with batch processor for throughput. - // Cap max_export_batch_size by the byte budget: even at the minimum variable-field - // size the fixed overhead alone can push a large batch over the gRPC 4 MiB default. - // DequeueEvents already enforces the byte budget on the producer side; this caps - // the SDK's internal batch size as a second line of defence. - static constexpr size_t kOtelMinBytesPerRecord = 1200; // fixed overhead only - size_t batch_size_by_bytes = - static_cast(psch_otel_log_max_bytes) / kOtelMinBytesPerRecord; - logs_sdk::BatchLogRecordProcessorOptions batch_opts; - batch_opts.max_queue_size = psch_otel_log_queue_size; - batch_opts.max_export_batch_size = - std::min(static_cast(psch_otel_log_batch_size), batch_size_by_bytes); - batch_opts.schedule_delay_millis = std::chrono::milliseconds(psch_otel_log_delay_ms); - - log_provider = std::make_shared( - logs_sdk::BatchLogRecordProcessorFactory::Create( - otlp::OtlpGrpcLogRecordExporterFactory::Create(log_opts), batch_opts), - resource); - - // Get Instruments - // ------------------------------------------------------------------------- + auto resource = CreateResource(); + InitMetricsPipeline(endpoint, resource); + InitLogPipeline(endpoint, resource); + + const std::string pgch_version = PG_STAT_CH_VERSION; meter = metrics_provider->GetMeter("pg_stat_ch", pgch_version); logger = log_provider->GetLogger("pg_stat_ch", "pg_stat_ch_logs"); return true; } catch (const std::exception& e) { - // PschLog(LogLevel::Warning, "pg_stat_ch: OTel init failed: %s", e.what()); + LogExporterWarning("OTel init failed", e.what()); return false; } } bool OTelExporter::CommitBatch() { EndRow(); - - // Both metrics and logs are exported asynchronously by background threads: - // - PeriodicExportingMetricReader: exports histograms every metric_interval_ms - // - BatchLogRecordProcessor: exports log batches every log_delay_ms - // - // We do NOT call ForceFlush here. ForceFlush blocks until the background - // thread finishes its current gRPC export, which stalls dequeuing for seconds - // and causes shmem queue overflow. Instead, EmitLogRecord() just enqueues to - // the batch processor's internal buffer (non-blocking), and the bgworker - // loops immediately back to dequeue more events. - // - // Trade-off: if the batch processor's internal queue fills up (gRPC slower - // than event rate), it silently drops log records. This is acceptable for - // best-effort telemetry — the alternative (blocking) causes shmem drops which - // lose events before they're even processed. - + // No ForceFlush — metrics and logs export asynchronously via background + // threads. Blocking here would stall dequeuing and cause shmem overflow. ResetFailures(); return true; } diff --git a/src/export/stats_exporter.cc b/src/export/stats_exporter.cc index 3195bbf..e11af99 100644 --- a/src/export/stats_exporter.cc +++ b/src/export/stats_exporter.cc @@ -60,6 +60,15 @@ const char* CmdTypeToString(PschCmdType cmd) { } } +// Clamp a field length to its buffer maximum, warning on overflow. +template +LenT ClampFieldLen(LenT len, LenT max, const char* field_name) { + if (len <= max) + return len; + elog(WARNING, "pg_stat_ch: invalid %s %u, clamping", field_name, static_cast(len)); + return max; +} + // Dequeue events from the shared memory queue std::vector DequeueEvents(int max_events) { std::vector events; @@ -78,25 +87,16 @@ void ExportEventStats(const std::vector& events, StatsExporter* expor exporter->BeginBatch(); - elog(DEBUG2, "pg_stat_ch: creating column objects"); - - // Basic columns - elog(DEBUG3, "pg_stat_ch: creating col_ts_start"); auto col_ts_start = exporter->RecordDateTime("ts_start"); - elog(DEBUG3, "pg_stat_ch: col_ts_start created"); auto col_duration_us = exporter->DbDurationColumn(); - // Use pre-resolved names from event (resolved at capture time in hooks) auto col_db = exporter->DbNameColumn(); auto col_username = exporter->DbUserColumn(); - elog(DEBUG3, "pg_stat_ch: basic columns created"); auto col_pid = exporter->RecordInt32("pid"); auto col_query_id = exporter->RecordInt64("query_id"); auto col_cmd_type = exporter->DbOperationColumn(); auto col_rows = exporter->MetricUInt64("rows"); auto col_query = exporter->DbQueryTextColumn(); - elog(DEBUG3, "pg_stat_ch: all basic columns created"); - // Buffer usage columns (hit/read are histograms, rest are records) auto col_shared_blks_hit = exporter->MetricInt64("shared_blks_hit"); auto col_shared_blks_read = exporter->MetricInt64("shared_blks_read"); auto col_shared_blks_dirtied = exporter->RecordInt64("shared_blks_dirtied"); @@ -108,7 +108,6 @@ void ExportEventStats(const std::vector& events, StatsExporter* expor auto col_temp_blks_read = exporter->RecordInt64("temp_blks_read"); auto col_temp_blks_written = exporter->RecordInt64("temp_blks_written"); - // I/O timing columns (records — rarely non-zero) auto col_shared_blk_read_time_us = exporter->RecordInt64("shared_blk_read_time_us"); auto col_shared_blk_write_time_us = exporter->RecordInt64("shared_blk_write_time_us"); auto col_local_blk_read_time_us = exporter->RecordInt64("local_blk_read_time_us"); @@ -116,16 +115,13 @@ void ExportEventStats(const std::vector& events, StatsExporter* expor auto col_temp_blk_read_time_us = exporter->RecordInt64("temp_blk_read_time_us"); auto col_temp_blk_write_time_us = exporter->RecordInt64("temp_blk_write_time_us"); - // WAL usage columns (records — rarely non-zero for reads) auto col_wal_records = exporter->RecordInt64("wal_records"); auto col_wal_fpi = exporter->RecordInt64("wal_fpi"); auto col_wal_bytes = exporter->RecordUInt64("wal_bytes"); - // CPU time columns (records) auto col_cpu_user_time_us = exporter->RecordInt64("cpu_user_time_us"); auto col_cpu_sys_time_us = exporter->RecordInt64("cpu_sys_time_us"); - // JIT columns (records — rarely non-zero) auto col_jit_functions = exporter->RecordInt32("jit_functions"); auto col_jit_generation_time_us = exporter->RecordInt32("jit_generation_time_us"); auto col_jit_deform_time_us = exporter->RecordInt32("jit_deform_time_us"); @@ -133,52 +129,31 @@ void ExportEventStats(const std::vector& events, StatsExporter* expor auto col_jit_optimization_time_us = exporter->RecordInt32("jit_optimization_time_us"); auto col_jit_emission_time_us = exporter->RecordInt32("jit_emission_time_us"); - // Parallel worker columns (records) auto col_parallel_workers_planned = exporter->RecordInt16("parallel_workers_planned"); auto col_parallel_workers_launched = exporter->RecordInt16("parallel_workers_launched"); - elog(DEBUG3, "pg_stat_ch: creating error columns"); - // Error columns (records) auto col_err_sqlstate = exporter->MetricFixedString(5, "err_sqlstate"); auto col_err_elevel = exporter->RecordUInt8("err_elevel"); auto col_err_message = exporter->RecordString("err_message"); - elog(DEBUG3, "pg_stat_ch: error columns created"); - // Client context columns; records rather than tags (no histogram in OTel) auto col_app = exporter->RecordString("app"); auto col_client_addr = exporter->RecordString("client_addr"); - elog(DEBUG2, "pg_stat_ch: all columns created, starting event loop"); - size_t event_idx = 0; for (const auto& ev : events) { - elog(DEBUG2, "pg_stat_ch: processing event %zu: pid=%d, query_len=%u", event_idx, ev.pid, - ev.query_len); exporter->BeginRow(); - int64_t unix_us = ev.ts_start + kPostgresEpochOffsetUs; - col_ts_start->Append(unix_us); + col_ts_start->Append(ev.ts_start + kPostgresEpochOffsetUs); col_duration_us->Append(ev.duration_us); - - // Use pre-resolved names from event (resolved at capture time in hooks) col_db->Append(std::string(ev.datname, ev.datname_len)); col_username->Append(std::string(ev.username, ev.username_len)); - col_pid->Append(ev.pid); col_query_id->Append(static_cast(ev.queryid)); col_cmd_type->Append(CmdTypeToString(ev.cmd_type)); col_rows->Append(ev.rows); - // Validate query_len before using it - uint16 safe_query_len = ev.query_len; - if (safe_query_len > PSCH_MAX_QUERY_LEN) { - elog(WARNING, "pg_stat_ch: event %zu has invalid query_len %u, clamping", event_idx, - safe_query_len); - safe_query_len = PSCH_MAX_QUERY_LEN; - } - col_query->Append(std::string(ev.query, safe_query_len)); + auto qlen = ClampFieldLen(ev.query_len, static_cast(PSCH_MAX_QUERY_LEN), "query_len"); + col_query->Append(std::string(ev.query, qlen)); - elog(DEBUG3, "pg_stat_ch: event %zu - buffer usage", event_idx); - // Buffer usage col_shared_blks_hit->Append(ev.shared_blks_hit); col_shared_blks_read->Append(ev.shared_blks_read); col_shared_blks_dirtied->Append(ev.shared_blks_dirtied); @@ -190,8 +165,6 @@ void ExportEventStats(const std::vector& events, StatsExporter* expor col_temp_blks_read->Append(ev.temp_blks_read); col_temp_blks_written->Append(ev.temp_blks_written); - elog(DEBUG3, "pg_stat_ch: event %zu - I/O timing", event_idx); - // I/O timing col_shared_blk_read_time_us->Append(ev.shared_blk_read_time_us); col_shared_blk_write_time_us->Append(ev.shared_blk_write_time_us); col_local_blk_read_time_us->Append(ev.local_blk_read_time_us); @@ -199,19 +172,13 @@ void ExportEventStats(const std::vector& events, StatsExporter* expor col_temp_blk_read_time_us->Append(ev.temp_blk_read_time_us); col_temp_blk_write_time_us->Append(ev.temp_blk_write_time_us); - elog(DEBUG3, "pg_stat_ch: event %zu - WAL usage", event_idx); - // WAL usage col_wal_records->Append(ev.wal_records); col_wal_fpi->Append(ev.wal_fpi); col_wal_bytes->Append(ev.wal_bytes); - elog(DEBUG3, "pg_stat_ch: event %zu - CPU time", event_idx); - // CPU time col_cpu_user_time_us->Append(ev.cpu_user_time_us); col_cpu_sys_time_us->Append(ev.cpu_sys_time_us); - elog(DEBUG3, "pg_stat_ch: event %zu - JIT", event_idx); - // JIT col_jit_functions->Append(ev.jit_functions); col_jit_generation_time_us->Append(ev.jit_generation_time_us); col_jit_deform_time_us->Append(ev.jit_deform_time_us); @@ -219,49 +186,31 @@ void ExportEventStats(const std::vector& events, StatsExporter* expor col_jit_optimization_time_us->Append(ev.jit_optimization_time_us); col_jit_emission_time_us->Append(ev.jit_emission_time_us); - elog(DEBUG3, "pg_stat_ch: event %zu - parallel workers", event_idx); - // Parallel workers col_parallel_workers_planned->Append(ev.parallel_workers_planned); col_parallel_workers_launched->Append(ev.parallel_workers_launched); - elog(DEBUG3, "pg_stat_ch: event %zu - error info", event_idx); - // Error info (5-char SQLSTATE, trimmed) col_err_sqlstate->Append(std::string_view(ev.err_sqlstate, 5)); col_err_elevel->Append(ev.err_elevel); - // Error message (validate length) - uint16 safe_err_msg_len = ev.err_message_len; - if (safe_err_msg_len > PSCH_MAX_ERR_MSG_LEN) { - elog(WARNING, "pg_stat_ch: event %zu has invalid err_message_len %u, clamping", event_idx, - safe_err_msg_len); - safe_err_msg_len = PSCH_MAX_ERR_MSG_LEN; - } - col_err_message->Append(std::string(ev.err_message, safe_err_msg_len)); - - elog(DEBUG3, "pg_stat_ch: event %zu - client context (app_len=%u, addr_len=%u)", event_idx, - ev.application_name_len, ev.client_addr_len); - // Client context - validate lengths - uint8 safe_app_len = ev.application_name_len; - if (safe_app_len > 63) { - elog(WARNING, "pg_stat_ch: event %zu has invalid app_name_len %u, clamping", event_idx, - safe_app_len); - safe_app_len = 63; - } - uint8 safe_addr_len = ev.client_addr_len; - if (safe_addr_len > 45) { - elog(WARNING, "pg_stat_ch: event %zu has invalid client_addr_len %u, clamping", event_idx, - safe_addr_len); - safe_addr_len = 45; - } - col_app->Append(std::string(ev.application_name, safe_app_len)); - col_client_addr->Append(std::string(ev.client_addr, safe_addr_len)); - - event_idx++; + auto elen = ClampFieldLen(ev.err_message_len, static_cast(PSCH_MAX_ERR_MSG_LEN), + "err_message_len"); + col_err_message->Append(std::string(ev.err_message, elen)); + + auto alen = ClampFieldLen(ev.application_name_len, static_cast(PSCH_MAX_APP_NAME_LEN), + "app_name_len"); + auto clen = ClampFieldLen(ev.client_addr_len, static_cast(PSCH_MAX_CLIENT_ADDR_LEN), + "client_addr_len"); + col_app->Append(std::string(ev.application_name, alen)); + col_client_addr->Append(std::string(ev.client_addr, clen)); } - elog(DEBUG1, "pg_stat_ch: finished processing %zu events", event_idx); + elog(DEBUG1, "pg_stat_ch: finished processing %zu events", events.size()); } } // namespace +void LogExporterWarning(const char* context, const char* message) { + ereport(WARNING, errmsg("pg_stat_ch: %s: %s", context, message)); +} + // Used to report negative values, which are not supported by OTel. void LogNegativeValue(const std::string& column_name, int64_t value) { static std::chrono::steady_clock::time_point last_log = {}; diff --git a/src/queue/event.h b/src/queue/event.h index c5e9122..7289836 100644 --- a/src/queue/event.h +++ b/src/queue/event.h @@ -31,6 +31,13 @@ extern "C" { #include "datatype/timestamp.h" +// POSIX defines INET6_ADDRSTRLEN = 46 in , but that header may not +// be available in all build environments (e.g. Docker benchmark images). Define +// a local constant instead so event.h stays self-contained. +#ifndef INET6_ADDRSTRLEN +#define INET6_ADDRSTRLEN 46 +#endif + // Maximum query text length stored in events (truncated if longer) // 2KB is enough for most queries; full query text is available via pg_stat_statements #define PSCH_MAX_QUERY_LEN 2048 @@ -38,6 +45,12 @@ extern "C" { // Maximum error message length (truncated if longer) #define PSCH_MAX_ERR_MSG_LEN 2048 +// Maximum application name length +#define PSCH_MAX_APP_NAME_LEN (NAMEDATALEN - 1) + +// Maximum client address length +#define PSCH_MAX_CLIENT_ADDR_LEN (INET6_ADDRSTRLEN - 1) + // Command type values (matching PostgreSQL's CmdType enum) enum PschCmdType { PSCH_CMD_UNKNOWN = 0, diff --git a/test/unit/hostname_test.cc b/test/unit/hostname_test.cc index d7b8651..86a6358 100644 --- a/test/unit/hostname_test.cc +++ b/test/unit/hostname_test.cc @@ -14,8 +14,9 @@ int psch_otel_log_max_bytes = 4194304; int psch_otel_log_delay_ms = 200; int psch_otel_metric_interval_ms = 5000; -// Stub out the PG-dependent log helper declared in exporter_interface.h +// Stub out the PG-dependent log helpers declared in exporter_interface.h void LogNegativeValue(const std::string&, int64_t) {} +void LogExporterWarning(const char*, const char*) {} // Function under test — defined in otel_exporter.cc with external linkage std::string GetAHostname(const char* fallback);