Skip to content

Commit 9084f26

Browse files
iskakaushikCopilotserprex
authored
Refactor exporter files: reduce noise and improve structure (#54)
* Refactor exporter files: remove debug noise, DRY up clamping, split OTel init stats_exporter.cc: - Remove 17 DEBUG2/DEBUG3 elog calls (bringup scaffolding) - Add ClampFieldLen<T> template to replace 3 inline clamping blocks - Remove redundant section-header comments and event_idx counter otel_exporter.cc: - Split EstablishNewConnection into CreateResource(), InitMetricsPipeline(), and InitLogPipeline() private helpers - Trim 13-line CommitBatch comment to 2-line summary event.h: - Add PSCH_MAX_APP_NAME_LEN and PSCH_MAX_CLIENT_ADDR_LEN constants Net -61 lines, no behavioral changes. * Log OTel init failures via ereport instead of silently swallowing Add LogExporterWarning() bridge in exporter_interface.h — otel_exporter.cc cannot include postgres.h (macro conflicts with libintl.h via gRPC headers), so this bridges to ereport(WARNING) defined in stats_exporter.cc. * Update src/queue/event.h Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update src/queue/event.h Co-authored-by: Philip Dubé <159546+serprex@users.noreply.github.com> * fix(ci): add INET6_ADDRSTRLEN fallback and LogExporterWarning stub - event.h: define INET6_ADDRSTRLEN (46) if not already provided, avoiding dependency on <arpa/inet.h> which is missing in some build environments - hostname_test.cc: stub LogExporterWarning to fix linker error --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Philip Dubé <159546+serprex@users.noreply.github.com>
1 parent 5d21c2b commit 9084f26

5 files changed

Lines changed: 106 additions & 152 deletions

File tree

src/export/exporter_interface.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,10 @@ class StatsExporter {
7979
virtual ~StatsExporter() = default;
8080
};
8181

82-
// Allows PG logging of exceptional cases without postgres.h
82+
// Bridge functions for PG logging from files that cannot include postgres.h
83+
// (e.g. otel_exporter.cc conflicts with libintl.h via gRPC headers).
8384
void LogNegativeValue(const std::string& column_name, int64_t value);
85+
void LogExporterWarning(const char* context, const char* message);
8486

8587
// Expected usage:
8688
// void ProcessBatch(StatsExporter *exporter) {

src/export/otel_exporter.cc

Lines changed: 61 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,12 @@ class OTelExporter : public StatsExporter {
150150
++exported_count;
151151
}
152152

153+
opentelemetry::sdk::resource::Resource CreateResource();
154+
void InitMetricsPipeline(const std::string& endpoint,
155+
const opentelemetry::sdk::resource::Resource& resource);
156+
void InitLogPipeline(const std::string& endpoint,
157+
const opentelemetry::sdk::resource::Resource& resource);
158+
153159
// =====================================================================
154160
// Column implementation classes (translate OTel concepts to CH Columns)
155161
// =====================================================================
@@ -373,94 +379,77 @@ class OTelExporter : public StatsExporter {
373379
std::vector<shared_ptr<BasicColumn>> columns;
374380
};
375381

382+
opentelemetry::sdk::resource::Resource OTelExporter::CreateResource() {
383+
return opentelemetry::sdk::resource::Resource::Create({
384+
{"service.name", "pg_stat_ch"},
385+
{"service.version", std::string(PG_STAT_CH_VERSION)},
386+
{"host.name", GetAHostname("postgres-primary")},
387+
});
388+
}
389+
390+
void OTelExporter::InitMetricsPipeline(const std::string& endpoint,
391+
const opentelemetry::sdk::resource::Resource& resource) {
392+
otlp::OtlpGrpcMetricExporterOptions metric_opts;
393+
metric_opts.endpoint = endpoint;
394+
395+
metrics_sdk::PeriodicExportingMetricReaderOptions reader_opts;
396+
reader_opts.export_interval_millis = std::chrono::milliseconds(psch_otel_metric_interval_ms);
397+
reader_opts.export_timeout_millis = std::chrono::milliseconds(psch_otel_metric_interval_ms / 2);
398+
399+
metrics_reader = metrics_sdk::PeriodicExportingMetricReaderFactory::Create(
400+
otlp::OtlpGrpcMetricExporterFactory::Create(metric_opts), reader_opts);
401+
402+
metrics_provider = std::make_shared<metrics_sdk::MeterProvider>(
403+
std::make_unique<metrics_sdk::ViewRegistry>(), resource);
404+
metrics_provider->AddMetricReader(metrics_reader);
405+
}
406+
407+
void OTelExporter::InitLogPipeline(const std::string& endpoint,
408+
const opentelemetry::sdk::resource::Resource& resource) {
409+
otlp::OtlpGrpcLogRecordExporterOptions log_opts;
410+
log_opts.endpoint = endpoint;
411+
412+
// Cap batch size by byte budget to stay under gRPC 4 MiB default.
413+
static constexpr size_t kOtelMinBytesPerRecord = 1200;
414+
size_t batch_size_by_bytes =
415+
static_cast<size_t>(psch_otel_log_max_bytes) / kOtelMinBytesPerRecord;
416+
417+
logs_sdk::BatchLogRecordProcessorOptions batch_opts;
418+
batch_opts.max_queue_size = psch_otel_log_queue_size;
419+
batch_opts.max_export_batch_size =
420+
std::min(static_cast<size_t>(psch_otel_log_batch_size), batch_size_by_bytes);
421+
batch_opts.schedule_delay_millis = std::chrono::milliseconds(psch_otel_log_delay_ms);
422+
423+
log_provider = std::make_shared<logs_sdk::LoggerProvider>(
424+
logs_sdk::BatchLogRecordProcessorFactory::Create(
425+
otlp::OtlpGrpcLogRecordExporterFactory::Create(log_opts), batch_opts),
426+
resource);
427+
}
428+
376429
bool OTelExporter::EstablishNewConnection() {
377430
try {
378-
const std::string hostname = GetAHostname("postgres-primary");
379431
const std::string endpoint =
380432
(psch_otel_endpoint && *psch_otel_endpoint) ? psch_otel_endpoint : "localhost:4317";
381-
const std::string pgch_version = PG_STAT_CH_VERSION;
382433

383-
// Resource (The "ID Card" for our service)
384-
auto resource_attributes = opentelemetry::sdk::resource::ResourceAttributes{
385-
{"service.name", "pg_stat_ch"},
386-
{"service.version", pgch_version},
387-
{"host.name", hostname} // Ideally fetch real hostname
388-
};
389-
auto resource = opentelemetry::sdk::resource::Resource::Create(resource_attributes);
390-
391-
// Configure Metrics
392-
// -------------------------------------------------------------------------
393-
otlp::OtlpGrpcMetricExporterOptions metric_opts;
394-
metric_opts.endpoint = endpoint;
395-
396-
// Configure Reader (async periodic export — does not block bgworker)
397-
metrics_sdk::PeriodicExportingMetricReaderOptions reader_opts;
398-
reader_opts.export_interval_millis = std::chrono::milliseconds(psch_otel_metric_interval_ms);
399-
reader_opts.export_timeout_millis = std::chrono::milliseconds(psch_otel_metric_interval_ms / 2);
400-
401-
metrics_reader = metrics_sdk::PeriodicExportingMetricReaderFactory::Create(
402-
otlp::OtlpGrpcMetricExporterFactory::Create(metric_opts), reader_opts);
403-
404-
// Create the Provider with our Resource and add our Reader
405-
// Note: We use the ViewRegistry (default)
406-
metrics_provider = std::make_shared<metrics_sdk::MeterProvider>(
407-
std::make_unique<metrics_sdk::ViewRegistry>(), resource);
408-
metrics_provider->AddMetricReader(metrics_reader);
409-
410-
// Configure Logs
411-
// -------------------------------------------------------------------------
412-
otlp::OtlpGrpcLogRecordExporterOptions log_opts;
413-
log_opts.endpoint = endpoint;
414-
415-
// Create Logger Provider with batch processor for throughput.
416-
// Cap max_export_batch_size by the byte budget: even at the minimum variable-field
417-
// size the fixed overhead alone can push a large batch over the gRPC 4 MiB default.
418-
// DequeueEvents already enforces the byte budget on the producer side; this caps
419-
// the SDK's internal batch size as a second line of defence.
420-
static constexpr size_t kOtelMinBytesPerRecord = 1200; // fixed overhead only
421-
size_t batch_size_by_bytes =
422-
static_cast<size_t>(psch_otel_log_max_bytes) / kOtelMinBytesPerRecord;
423-
logs_sdk::BatchLogRecordProcessorOptions batch_opts;
424-
batch_opts.max_queue_size = psch_otel_log_queue_size;
425-
batch_opts.max_export_batch_size =
426-
std::min(static_cast<size_t>(psch_otel_log_batch_size), batch_size_by_bytes);
427-
batch_opts.schedule_delay_millis = std::chrono::milliseconds(psch_otel_log_delay_ms);
428-
429-
log_provider = std::make_shared<logs_sdk::LoggerProvider>(
430-
logs_sdk::BatchLogRecordProcessorFactory::Create(
431-
otlp::OtlpGrpcLogRecordExporterFactory::Create(log_opts), batch_opts),
432-
resource);
433-
434-
// Get Instruments
435-
// -------------------------------------------------------------------------
434+
auto resource = CreateResource();
435+
InitMetricsPipeline(endpoint, resource);
436+
InitLogPipeline(endpoint, resource);
437+
438+
const std::string pgch_version = PG_STAT_CH_VERSION;
436439
meter = metrics_provider->GetMeter("pg_stat_ch", pgch_version);
437440
logger = log_provider->GetLogger("pg_stat_ch", "pg_stat_ch_logs");
438441

439442
return true;
440443
} catch (const std::exception& e) {
441-
// PschLog(LogLevel::Warning, "pg_stat_ch: OTel init failed: %s", e.what());
444+
LogExporterWarning("OTel init failed", e.what());
442445
return false;
443446
}
444447
}
445448

446449
bool OTelExporter::CommitBatch() {
447450
EndRow();
448-
449-
// Both metrics and logs are exported asynchronously by background threads:
450-
// - PeriodicExportingMetricReader: exports histograms every metric_interval_ms
451-
// - BatchLogRecordProcessor: exports log batches every log_delay_ms
452-
//
453-
// We do NOT call ForceFlush here. ForceFlush blocks until the background
454-
// thread finishes its current gRPC export, which stalls dequeuing for seconds
455-
// and causes shmem queue overflow. Instead, EmitLogRecord() just enqueues to
456-
// the batch processor's internal buffer (non-blocking), and the bgworker
457-
// loops immediately back to dequeue more events.
458-
//
459-
// Trade-off: if the batch processor's internal queue fills up (gRPC slower
460-
// than event rate), it silently drops log records. This is acceptable for
461-
// best-effort telemetry — the alternative (blocking) causes shmem drops which
462-
// lose events before they're even processed.
463-
451+
// No ForceFlush — metrics and logs export asynchronously via background
452+
// threads. Blocking here would stall dequeuing and cause shmem overflow.
464453
ResetFailures();
465454
return true;
466455
}

src/export/stats_exporter.cc

Lines changed: 27 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,15 @@ const char* CmdTypeToString(PschCmdType cmd) {
6060
}
6161
}
6262

63+
// Clamp a field length to its buffer maximum, warning on overflow.
64+
template <typename LenT>
65+
LenT ClampFieldLen(LenT len, LenT max, const char* field_name) {
66+
if (len <= max)
67+
return len;
68+
elog(WARNING, "pg_stat_ch: invalid %s %u, clamping", field_name, static_cast<unsigned>(len));
69+
return max;
70+
}
71+
6372
// Dequeue events from the shared memory queue
6473
std::vector<PschEvent> DequeueEvents(int max_events) {
6574
std::vector<PschEvent> events;
@@ -78,25 +87,16 @@ void ExportEventStats(const std::vector<PschEvent>& events, StatsExporter* expor
7887

7988
exporter->BeginBatch();
8089

81-
elog(DEBUG2, "pg_stat_ch: creating column objects");
82-
83-
// Basic columns
84-
elog(DEBUG3, "pg_stat_ch: creating col_ts_start");
8590
auto col_ts_start = exporter->RecordDateTime("ts_start");
86-
elog(DEBUG3, "pg_stat_ch: col_ts_start created");
8791
auto col_duration_us = exporter->DbDurationColumn();
88-
// Use pre-resolved names from event (resolved at capture time in hooks)
8992
auto col_db = exporter->DbNameColumn();
9093
auto col_username = exporter->DbUserColumn();
91-
elog(DEBUG3, "pg_stat_ch: basic columns created");
9294
auto col_pid = exporter->RecordInt32("pid");
9395
auto col_query_id = exporter->RecordInt64("query_id");
9496
auto col_cmd_type = exporter->DbOperationColumn();
9597
auto col_rows = exporter->MetricUInt64("rows");
9698
auto col_query = exporter->DbQueryTextColumn();
97-
elog(DEBUG3, "pg_stat_ch: all basic columns created");
9899

99-
// Buffer usage columns (hit/read are histograms, rest are records)
100100
auto col_shared_blks_hit = exporter->MetricInt64("shared_blks_hit");
101101
auto col_shared_blks_read = exporter->MetricInt64("shared_blks_read");
102102
auto col_shared_blks_dirtied = exporter->RecordInt64("shared_blks_dirtied");
@@ -108,77 +108,52 @@ void ExportEventStats(const std::vector<PschEvent>& events, StatsExporter* expor
108108
auto col_temp_blks_read = exporter->RecordInt64("temp_blks_read");
109109
auto col_temp_blks_written = exporter->RecordInt64("temp_blks_written");
110110

111-
// I/O timing columns (records — rarely non-zero)
112111
auto col_shared_blk_read_time_us = exporter->RecordInt64("shared_blk_read_time_us");
113112
auto col_shared_blk_write_time_us = exporter->RecordInt64("shared_blk_write_time_us");
114113
auto col_local_blk_read_time_us = exporter->RecordInt64("local_blk_read_time_us");
115114
auto col_local_blk_write_time_us = exporter->RecordInt64("local_blk_write_time_us");
116115
auto col_temp_blk_read_time_us = exporter->RecordInt64("temp_blk_read_time_us");
117116
auto col_temp_blk_write_time_us = exporter->RecordInt64("temp_blk_write_time_us");
118117

119-
// WAL usage columns (records — rarely non-zero for reads)
120118
auto col_wal_records = exporter->RecordInt64("wal_records");
121119
auto col_wal_fpi = exporter->RecordInt64("wal_fpi");
122120
auto col_wal_bytes = exporter->RecordUInt64("wal_bytes");
123121

124-
// CPU time columns (records)
125122
auto col_cpu_user_time_us = exporter->RecordInt64("cpu_user_time_us");
126123
auto col_cpu_sys_time_us = exporter->RecordInt64("cpu_sys_time_us");
127124

128-
// JIT columns (records — rarely non-zero)
129125
auto col_jit_functions = exporter->RecordInt32("jit_functions");
130126
auto col_jit_generation_time_us = exporter->RecordInt32("jit_generation_time_us");
131127
auto col_jit_deform_time_us = exporter->RecordInt32("jit_deform_time_us");
132128
auto col_jit_inlining_time_us = exporter->RecordInt32("jit_inlining_time_us");
133129
auto col_jit_optimization_time_us = exporter->RecordInt32("jit_optimization_time_us");
134130
auto col_jit_emission_time_us = exporter->RecordInt32("jit_emission_time_us");
135131

136-
// Parallel worker columns (records)
137132
auto col_parallel_workers_planned = exporter->RecordInt16("parallel_workers_planned");
138133
auto col_parallel_workers_launched = exporter->RecordInt16("parallel_workers_launched");
139134

140-
elog(DEBUG3, "pg_stat_ch: creating error columns");
141-
// Error columns (records)
142135
auto col_err_sqlstate = exporter->MetricFixedString(5, "err_sqlstate");
143136
auto col_err_elevel = exporter->RecordUInt8("err_elevel");
144137
auto col_err_message = exporter->RecordString("err_message");
145-
elog(DEBUG3, "pg_stat_ch: error columns created");
146138

147-
// Client context columns; records rather than tags (no histogram in OTel)
148139
auto col_app = exporter->RecordString("app");
149140
auto col_client_addr = exporter->RecordString("client_addr");
150141

151-
elog(DEBUG2, "pg_stat_ch: all columns created, starting event loop");
152-
size_t event_idx = 0;
153142
for (const auto& ev : events) {
154-
elog(DEBUG2, "pg_stat_ch: processing event %zu: pid=%d, query_len=%u", event_idx, ev.pid,
155-
ev.query_len);
156143
exporter->BeginRow();
157144

158-
int64_t unix_us = ev.ts_start + kPostgresEpochOffsetUs;
159-
col_ts_start->Append(unix_us);
145+
col_ts_start->Append(ev.ts_start + kPostgresEpochOffsetUs);
160146
col_duration_us->Append(ev.duration_us);
161-
162-
// Use pre-resolved names from event (resolved at capture time in hooks)
163147
col_db->Append(std::string(ev.datname, ev.datname_len));
164148
col_username->Append(std::string(ev.username, ev.username_len));
165-
166149
col_pid->Append(ev.pid);
167150
col_query_id->Append(static_cast<int64_t>(ev.queryid));
168151
col_cmd_type->Append(CmdTypeToString(ev.cmd_type));
169152
col_rows->Append(ev.rows);
170153

171-
// Validate query_len before using it
172-
uint16 safe_query_len = ev.query_len;
173-
if (safe_query_len > PSCH_MAX_QUERY_LEN) {
174-
elog(WARNING, "pg_stat_ch: event %zu has invalid query_len %u, clamping", event_idx,
175-
safe_query_len);
176-
safe_query_len = PSCH_MAX_QUERY_LEN;
177-
}
178-
col_query->Append(std::string(ev.query, safe_query_len));
154+
auto qlen = ClampFieldLen(ev.query_len, static_cast<uint16>(PSCH_MAX_QUERY_LEN), "query_len");
155+
col_query->Append(std::string(ev.query, qlen));
179156

180-
elog(DEBUG3, "pg_stat_ch: event %zu - buffer usage", event_idx);
181-
// Buffer usage
182157
col_shared_blks_hit->Append(ev.shared_blks_hit);
183158
col_shared_blks_read->Append(ev.shared_blks_read);
184159
col_shared_blks_dirtied->Append(ev.shared_blks_dirtied);
@@ -190,78 +165,52 @@ void ExportEventStats(const std::vector<PschEvent>& events, StatsExporter* expor
190165
col_temp_blks_read->Append(ev.temp_blks_read);
191166
col_temp_blks_written->Append(ev.temp_blks_written);
192167

193-
elog(DEBUG3, "pg_stat_ch: event %zu - I/O timing", event_idx);
194-
// I/O timing
195168
col_shared_blk_read_time_us->Append(ev.shared_blk_read_time_us);
196169
col_shared_blk_write_time_us->Append(ev.shared_blk_write_time_us);
197170
col_local_blk_read_time_us->Append(ev.local_blk_read_time_us);
198171
col_local_blk_write_time_us->Append(ev.local_blk_write_time_us);
199172
col_temp_blk_read_time_us->Append(ev.temp_blk_read_time_us);
200173
col_temp_blk_write_time_us->Append(ev.temp_blk_write_time_us);
201174

202-
elog(DEBUG3, "pg_stat_ch: event %zu - WAL usage", event_idx);
203-
// WAL usage
204175
col_wal_records->Append(ev.wal_records);
205176
col_wal_fpi->Append(ev.wal_fpi);
206177
col_wal_bytes->Append(ev.wal_bytes);
207178

208-
elog(DEBUG3, "pg_stat_ch: event %zu - CPU time", event_idx);
209-
// CPU time
210179
col_cpu_user_time_us->Append(ev.cpu_user_time_us);
211180
col_cpu_sys_time_us->Append(ev.cpu_sys_time_us);
212181

213-
elog(DEBUG3, "pg_stat_ch: event %zu - JIT", event_idx);
214-
// JIT
215182
col_jit_functions->Append(ev.jit_functions);
216183
col_jit_generation_time_us->Append(ev.jit_generation_time_us);
217184
col_jit_deform_time_us->Append(ev.jit_deform_time_us);
218185
col_jit_inlining_time_us->Append(ev.jit_inlining_time_us);
219186
col_jit_optimization_time_us->Append(ev.jit_optimization_time_us);
220187
col_jit_emission_time_us->Append(ev.jit_emission_time_us);
221188

222-
elog(DEBUG3, "pg_stat_ch: event %zu - parallel workers", event_idx);
223-
// Parallel workers
224189
col_parallel_workers_planned->Append(ev.parallel_workers_planned);
225190
col_parallel_workers_launched->Append(ev.parallel_workers_launched);
226191

227-
elog(DEBUG3, "pg_stat_ch: event %zu - error info", event_idx);
228-
// Error info (5-char SQLSTATE, trimmed)
229192
col_err_sqlstate->Append(std::string_view(ev.err_sqlstate, 5));
230193
col_err_elevel->Append(ev.err_elevel);
231-
// Error message (validate length)
232-
uint16 safe_err_msg_len = ev.err_message_len;
233-
if (safe_err_msg_len > PSCH_MAX_ERR_MSG_LEN) {
234-
elog(WARNING, "pg_stat_ch: event %zu has invalid err_message_len %u, clamping", event_idx,
235-
safe_err_msg_len);
236-
safe_err_msg_len = PSCH_MAX_ERR_MSG_LEN;
237-
}
238-
col_err_message->Append(std::string(ev.err_message, safe_err_msg_len));
239-
240-
elog(DEBUG3, "pg_stat_ch: event %zu - client context (app_len=%u, addr_len=%u)", event_idx,
241-
ev.application_name_len, ev.client_addr_len);
242-
// Client context - validate lengths
243-
uint8 safe_app_len = ev.application_name_len;
244-
if (safe_app_len > 63) {
245-
elog(WARNING, "pg_stat_ch: event %zu has invalid app_name_len %u, clamping", event_idx,
246-
safe_app_len);
247-
safe_app_len = 63;
248-
}
249-
uint8 safe_addr_len = ev.client_addr_len;
250-
if (safe_addr_len > 45) {
251-
elog(WARNING, "pg_stat_ch: event %zu has invalid client_addr_len %u, clamping", event_idx,
252-
safe_addr_len);
253-
safe_addr_len = 45;
254-
}
255-
col_app->Append(std::string(ev.application_name, safe_app_len));
256-
col_client_addr->Append(std::string(ev.client_addr, safe_addr_len));
257-
258-
event_idx++;
194+
auto elen = ClampFieldLen(ev.err_message_len, static_cast<uint16>(PSCH_MAX_ERR_MSG_LEN),
195+
"err_message_len");
196+
col_err_message->Append(std::string(ev.err_message, elen));
197+
198+
auto alen = ClampFieldLen(ev.application_name_len, static_cast<uint8>(PSCH_MAX_APP_NAME_LEN),
199+
"app_name_len");
200+
auto clen = ClampFieldLen(ev.client_addr_len, static_cast<uint8>(PSCH_MAX_CLIENT_ADDR_LEN),
201+
"client_addr_len");
202+
col_app->Append(std::string(ev.application_name, alen));
203+
col_client_addr->Append(std::string(ev.client_addr, clen));
259204
}
260-
elog(DEBUG1, "pg_stat_ch: finished processing %zu events", event_idx);
205+
elog(DEBUG1, "pg_stat_ch: finished processing %zu events", events.size());
261206
}
262207

263208
} // namespace
264209

210+
void LogExporterWarning(const char* context, const char* message) {
211+
ereport(WARNING, errmsg("pg_stat_ch: %s: %s", context, message));
212+
}
213+
265214
// Used to report negative values, which are not supported by OTel.
266215
void LogNegativeValue(const std::string& column_name, int64_t value) {
267216
static std::chrono::steady_clock::time_point last_log = {};

0 commit comments

Comments
 (0)