Skip to content

Commit 5d21c2b

Browse files
authored
Normalize executor and utility query text with $N placeholders (#51)
* Normalize query text: replace literal constants with $N placeholders Port pg_stat_statements' query normalization to pg_stat_ch so that captured queries never contain raw literal values (passwords, PII, tokens, etc.). Both ClickHouse and OTel exporters now receive normalized text like: SELECT * FROM users WHERE email = $1 AND age > $2 Instead of: SELECT * FROM users WHERE email = 'alice@example.com' AND age > 30 Implementation: - Add post_parse_analyze_hook to capture JumbleState at parse time - Port generate_normalized_query / fill_in_constant_lengths from pg_stat_statements (static functions, not exported by PG) - Store normalized text per-backend in TopMemoryContext, consumed by ExecutorEnd/ProcessUtility/emit_log_hook - Schema names, table names, column names, operators, and SQL structure are preserved; only literal constants are replaced Includes TAP test (027_query_normalization.pl) with 16 subtests covering strings, numbers, negatives, floats, IN lists, LIKE, BETWEEN, subqueries, multi-statement, INSERT/UPDATE/DELETE, and schema-qualified names. * Fix clang-format violations in query_normalize.cc * Fix normalization state handling and harden TAP waits * Fix nested query normalization reuse * Add PG16/17 normalization compatibility * Scope query normalization out of error capture * Hash normalized statement identity lookups * Use statement key in query text copy helpers
1 parent 4a36c53 commit 5d21c2b

11 files changed

Lines changed: 1168 additions & 61 deletions

src/hooks/hooks.cc

Lines changed: 139 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ extern "C" {
2121
#include "utils/lsyscache.h"
2222
#include "utils/timestamp.h"
2323

24+
#include "parser/analyze.h"
25+
2426
#if PG_VERSION_NUM >= 140000
2527
#include "nodes/queryjumble.h"
2628
#endif
@@ -32,10 +34,13 @@ extern "C" {
3234

3335
#include "config/guc.h"
3436
#include "hooks/hooks.h"
37+
#include "hooks/query_normalize.h"
38+
#include "hooks/query_normalize_state.h"
3539
#include "queue/event.h"
3640
#include "queue/shmem.h"
3741

3842
// Previous hook values for chaining
43+
static post_parse_analyze_hook_type prev_post_parse_analyze = nullptr;
3944
static ExecutorStart_hook_type prev_executor_start = nullptr;
4045
static ExecutorRun_hook_type prev_executor_run = nullptr;
4146
static ExecutorFinish_hook_type prev_executor_finish = nullptr;
@@ -72,8 +77,10 @@ static uint8 CopyName(char* dst, size_t dst_size, const char* src) {
7277
// Cache for session-stable values to avoid repeated catalog lookups on every query.
7378
// Following pg_stat_monitor's pattern of caching client IP (pg_stat_monitor.c:73-96).
7479
// Database name and client address never change within a session. Username is
75-
// re-resolved when userid changes (handles SET ROLE).
76-
struct BackendCache {
80+
// re-resolved when userid changes (handles SET ROLE). This also carries the
81+
// session-local registry of normalized statements waiting to be consumed by
82+
// ExecutorEnd, ProcessUtility, or emit_log_hook.
83+
struct PschBackendState {
7784
bool initialized;
7885
char datname[NAMEDATALEN];
7986
uint8 datname_len;
@@ -82,8 +89,9 @@ struct BackendCache {
8289
uint8 username_len;
8390
char client_addr[46]; // INET6_ADDRSTRLEN
8491
uint8 client_addr_len;
92+
PschNormalizedQueryState normalized_queries;
8593
};
86-
static BackendCache backend_cache = {};
94+
static PschBackendState backend_state = {};
8795

8896
// Resolve and cache the current username. On initial resolve, falls back to
8997
// "<unknown>" if resolution fails. On SET ROLE re-resolve, keeps the existing
@@ -92,13 +100,13 @@ static BackendCache backend_cache = {};
92100
static void CacheUsername(Oid userid, bool fallback_on_null) {
93101
const char* username = GetUserNameFromId(userid, true);
94102
if (username != nullptr) {
95-
backend_cache.username_len =
96-
CopyName(backend_cache.username, sizeof(backend_cache.username), username);
97-
backend_cache.cached_userid = userid;
103+
backend_state.username_len =
104+
CopyName(backend_state.username, sizeof(backend_state.username), username);
105+
backend_state.cached_userid = userid;
98106
} else if (fallback_on_null) {
99-
backend_cache.username_len =
100-
CopyName(backend_cache.username, sizeof(backend_cache.username), "<unknown>");
101-
backend_cache.cached_userid = userid;
107+
backend_state.username_len =
108+
CopyName(backend_state.username, sizeof(backend_state.username), "<unknown>");
109+
backend_state.cached_userid = userid;
102110
}
103111
}
104112

@@ -108,30 +116,30 @@ static void CacheUsername(Oid userid, bool fallback_on_null) {
108116
static void EnsureBackendCache(void) {
109117
Oid userid = GetUserId();
110118

111-
if (!backend_cache.initialized) {
119+
if (!backend_state.initialized) {
112120
// Can't resolve catalog names outside a transaction
113121
if (!IsTransactionState()) {
114122
return;
115123
}
116124

117125
// Database name (session-stable)
118126
const char* datname = get_database_name(MyDatabaseId);
119-
backend_cache.datname_len = CopyName(backend_cache.datname, sizeof(backend_cache.datname),
127+
backend_state.datname_len = CopyName(backend_state.datname, sizeof(backend_state.datname),
120128
datname != nullptr ? datname : "<unknown>");
121129

122130
// Client address (session-stable)
123-
backend_cache.client_addr_len = static_cast<uint8>(
124-
GetClientAddress(backend_cache.client_addr, sizeof(backend_cache.client_addr)));
131+
backend_state.client_addr_len = static_cast<uint8>(
132+
GetClientAddress(backend_state.client_addr, sizeof(backend_state.client_addr)));
125133

126134
// Username (may change via SET ROLE)
127135
CacheUsername(userid, true);
128136

129-
backend_cache.initialized = true;
137+
backend_state.initialized = true;
130138
return;
131139
}
132140

133141
// Re-resolve username if userid changed (SET ROLE)
134-
if (backend_cache.cached_userid != userid) {
142+
if (backend_state.cached_userid != userid) {
135143
if (IsTransactionState()) {
136144
CacheUsername(userid, false);
137145
}
@@ -335,20 +343,52 @@ static void CopyClientContext(PschEvent* event) {
335343
GetApplicationName(event->application_name, sizeof(event->application_name)));
336344

337345
EnsureBackendCache();
338-
if (backend_cache.initialized) {
339-
memcpy(event->client_addr, backend_cache.client_addr, backend_cache.client_addr_len + 1);
340-
event->client_addr_len = backend_cache.client_addr_len;
346+
if (backend_state.initialized) {
347+
memcpy(event->client_addr, backend_state.client_addr, backend_state.client_addr_len + 1);
348+
event->client_addr_len = backend_state.client_addr_len;
341349
} else {
342350
event->client_addr_len =
343351
static_cast<uint8>(GetClientAddress(event->client_addr, sizeof(event->client_addr)));
344352
}
345353
}
346354

347-
static void CopyQueryText(PschEvent* event, const char* query_text) {
348-
if (query_text != nullptr) {
349-
event->query_len =
350-
static_cast<uint16>(CopyTrimmed(event->query, PSCH_MAX_QUERY_LEN, query_text));
355+
// Copy the original SQL text for one statement into the event buffer.
356+
//
357+
// PostgreSQL can hand us a multi-statement source string plus stmt_location /
358+
// stmt_len. CleanQuerytext trims that down to just the current statement, but
359+
// it does not parameterize literals. This helper is the raw-text fallback when
360+
// we have no normalized entry for the statement.
361+
static void CopyRawStatementText(PschEvent* event, const PschStatementKey& statement_key) {
362+
if (statement_key.source_text == nullptr) {
363+
return;
364+
}
365+
366+
const char* query_text = statement_key.source_text;
367+
if (statement_key.stmt_location >= 0) {
368+
int query_loc = statement_key.stmt_location;
369+
int query_len = statement_key.stmt_len;
370+
query_text = CleanQuerytext(query_text, &query_loc, &query_len);
371+
}
372+
373+
event->query_len = static_cast<uint16>(CopyTrimmed(event->query, PSCH_MAX_QUERY_LEN, query_text));
374+
}
375+
376+
// Copy query text into the event buffer, preferring a previously normalized
377+
// form from post_parse_analyze_hook.
378+
//
379+
// The normalized registry is keyed by statement identity and reused across
380+
// repeated executions of cached plans, so this helper first looks up the
381+
// normalized entry stashed at parse time. If no match exists, it falls back to
382+
// CopyRawStatementText, which preserves the literal SQL text for the current
383+
// statement only.
384+
static void CopyQueryText(PschEvent* event, const PschStatementKey& statement_key) {
385+
if (PschCopyNormalizedQueryForStatement(&backend_state.normalized_queries, event->query,
386+
sizeof(event->query), &event->query_len, statement_key,
387+
false)) {
388+
return;
351389
}
390+
391+
CopyRawStatementText(event, statement_key);
352392
}
353393

354394
// Resolve database and user names, using the session cache when available.
@@ -357,11 +397,11 @@ static void CopyQueryText(PschEvent* event, const char* query_text) {
357397
static void ResolveNames(PschEvent* event) {
358398
EnsureBackendCache();
359399

360-
if (backend_cache.initialized) {
361-
memcpy(event->datname, backend_cache.datname, backend_cache.datname_len + 1);
362-
event->datname_len = backend_cache.datname_len;
363-
memcpy(event->username, backend_cache.username, backend_cache.username_len + 1);
364-
event->username_len = backend_cache.username_len;
400+
if (backend_state.initialized) {
401+
memcpy(event->datname, backend_state.datname, backend_state.datname_len + 1);
402+
event->datname_len = backend_state.datname_len;
403+
memcpy(event->username, backend_state.username, backend_state.username_len + 1);
404+
event->username_len = backend_state.username_len;
365405
return;
366406
}
367407

@@ -440,11 +480,58 @@ static void BuildEventFromQueryDesc(QueryDesc* query_desc, PschEvent* event, int
440480
CopyJitInstrumentation(event, query_desc);
441481
CopyParallelWorkerInfo(event, query_desc);
442482
CopyClientContext(event);
443-
CopyQueryText(event, query_desc->sourceText);
483+
const PschStatementKey statement_key =
484+
PschMakeStatementKey(query_desc->sourceText, query_desc->plannedstmt->stmt_location,
485+
query_desc->plannedstmt->stmt_len);
486+
CopyQueryText(event, statement_key);
444487
}
445488

446489
extern "C" {
447490

491+
// Remove a pending normalized entry for one statement when execution exits
492+
// without building a normal executor/utility event from it.
493+
static void ForgetNormalizedStatement(const char* source_text, int stmt_location, int stmt_len) {
494+
const PschStatementKey statement_key = PschMakeStatementKey(source_text, stmt_location, stmt_len);
495+
PschForgetNormalizedQueryForStatement(&backend_state.normalized_queries, statement_key);
496+
}
497+
498+
// post_parse_analyze_hook — normalize query text at parse time.
499+
// The JumbleState (with constant locations) is only available here, so we
500+
// must generate the normalized text now and stash it for ExecutorEnd.
501+
static void PschPostParseAnalyze(ParseState* pstate, Query* query, JumbleState* jstate) {
502+
if (prev_post_parse_analyze != nullptr) {
503+
prev_post_parse_analyze(pstate, query, jstate);
504+
}
505+
506+
// Only normalize if enabled and the query has constants to replace.
507+
if (!psch_enabled || IsParallelWorker() || jstate == nullptr || jstate->clocations_count <= 0) {
508+
return;
509+
}
510+
511+
const char* source_text = pstate->p_sourcetext;
512+
const int stmt_location = query->stmt_location;
513+
const int stmt_len = query->stmt_len;
514+
const char* query_text = source_text;
515+
int query_loc = stmt_location;
516+
int query_len = stmt_len;
517+
518+
// CleanQuerytext slices a multi-statement source string down to the current
519+
// statement before we replace literal constants with placeholders.
520+
query_text = CleanQuerytext(query_text, &query_loc, &query_len);
521+
522+
// Allocate in TopMemoryContext so the normalized text survives until
523+
// ExecutorEnd or ProcessUtility copies it into the exported event.
524+
MemoryContext oldcxt = MemoryContextSwitchTo(TopMemoryContext);
525+
char* normalized_query = PschNormalizeQuery(query_text, query_loc, &query_len, jstate);
526+
if (normalized_query != nullptr) {
527+
const PschStatementKey statement_key =
528+
PschMakeStatementKey(source_text, stmt_location, stmt_len);
529+
PschRememberNormalizedQuery(&backend_state.normalized_queries, statement_key, normalized_query,
530+
query_len);
531+
}
532+
MemoryContextSwitchTo(oldcxt);
533+
}
534+
448535
static void PschExecutorStart(QueryDesc* query_desc, int eflags) {
449536
if (IsParallelWorker()) {
450537
if (prev_executor_start != nullptr) {
@@ -557,6 +644,8 @@ static void PschExecutorFinish(QueryDesc* query_desc) {
557644

558645
static void PschExecutorEnd(QueryDesc* query_desc) {
559646
if (!psch_enabled || IsParallelWorker() || query_desc->plannedstmt->queryId == UINT64CONST(0)) {
647+
ForgetNormalizedStatement(query_desc->sourceText, query_desc->plannedstmt->stmt_location,
648+
query_desc->plannedstmt->stmt_len);
560649
if (prev_executor_end != nullptr) {
561650
prev_executor_end(query_desc);
562651
} else {
@@ -591,9 +680,9 @@ static void PschExecutorEnd(QueryDesc* query_desc) {
591680

592681
// Build a PschEvent for utility statements (no QueryDesc available)
593682
static void BuildEventForUtility(PschEvent* event, const char* queryString, TimestampTz start_ts,
594-
uint64 duration_us, bool is_top_level, uint64 rows,
595-
BufferUsage* bufusage, WalUsage* walusage, int64 cpu_user_us,
596-
int64 cpu_sys_us) {
683+
int stmt_location, int stmt_len, uint64 duration_us,
684+
bool is_top_level, uint64 rows, BufferUsage* bufusage,
685+
WalUsage* walusage, int64 cpu_user_us, int64 cpu_sys_us) {
597686
InitBaseEvent(event, start_ts, is_top_level, PSCH_CMD_UTILITY);
598687
event->duration_us = duration_us;
599688
event->rows = rows;
@@ -604,7 +693,8 @@ static void BuildEventForUtility(PschEvent* event, const char* queryString, Time
604693
CopyIoTiming(event, bufusage);
605694
CopyWalUsage(event, walusage);
606695
CopyClientContext(event);
607-
CopyQueryText(event, queryString);
696+
const PschStatementKey statement_key = PschMakeStatementKey(queryString, stmt_location, stmt_len);
697+
CopyQueryText(event, statement_key);
608698
}
609699

610700
// Helper macro to call ProcessUtility (previous hook or standard)
@@ -684,13 +774,18 @@ static void PschProcessUtility(PlannedStmt* pstmt, const char* queryString,
684774
QueryCompletion* qc) {
685775
#endif
686776
if (!ShouldTrackUtility(pstmt->utilityStmt)) {
777+
int stmt_location = pstmt->stmt_location;
778+
int stmt_len = pstmt->stmt_len;
687779
CALL_PROCESS_UTILITY();
780+
ForgetNormalizedStatement(queryString, stmt_location, stmt_len);
688781
return;
689782
}
690783

691784
// Capture state before execution
692785
bool is_top_level = (nesting_level == 0);
693786
TimestampTz start_ts = GetCurrentTimestamp();
787+
int stmt_location = pstmt->stmt_location;
788+
int stmt_len = pstmt->stmt_len;
694789
BufferUsage bufusage_start = pgBufferUsage;
695790
WalUsage walusage_start = pgWalUsage;
696791
struct rusage rusage_util_start;
@@ -724,9 +819,9 @@ static void PschProcessUtility(PlannedStmt* pstmt, const char* queryString,
724819
}
725820

726821
PschEvent event;
727-
BuildEventForUtility(&event, queryString, start_ts, INSTR_TIME_GET_MICROSEC(duration),
728-
is_top_level, GetUtilityRowCount(qc), &bufusage_delta, &walusage_delta,
729-
cpu_user_us, cpu_sys_us);
822+
BuildEventForUtility(&event, queryString, start_ts, stmt_location, stmt_len,
823+
INSTR_TIME_GET_MICROSEC(duration), is_top_level, GetUtilityRowCount(qc),
824+
&bufusage_delta, &walusage_delta, cpu_user_us, cpu_sys_us);
730825
PschEnqueueEvent(&event);
731826
}
732827

@@ -763,14 +858,14 @@ static bool ShouldCaptureLog(ErrorData* edata) {
763858
return true;
764859
}
765860

766-
// Build and enqueue an error event from ErrorData
861+
// Build and enqueue an error event from ErrorData.
767862
//
768-
// NOTE: Query text captured here may contain sensitive data (passwords in CREATE USER,
769-
// connection strings, etc.). Consider this PII concern when configuring ClickHouse
770-
// retention policies. Future enhancement: GUC to disable query capture in error events.
863+
// We intentionally leave event.query empty here. emit_log_hook only exposes
864+
// debug_query_string and cursor position, not the exact statement identity used
865+
// by ExecutorEnd/ProcessUtility, so reconstructing normalized SQL required
866+
// fuzzy matching and extra backend-local state. Error events still carry the
867+
// message, SQLSTATE, and client/session metadata.
771868
static void CaptureLogEvent(ErrorData* edata) {
772-
const char* query = (debug_query_string != nullptr) ? debug_query_string : "";
773-
774869
PschEvent event;
775870
InitBaseEvent(&event, GetCurrentTimestamp(), (nesting_level == 0), PSCH_CMD_UNKNOWN);
776871

@@ -782,10 +877,6 @@ static void CaptureLogEvent(ErrorData* edata) {
782877
static_cast<uint16>(CopyTrimmed(event.err_message, PSCH_MAX_ERR_MSG_LEN, edata->message));
783878
}
784879

785-
if (query[0] != '\0') {
786-
event.query_len = static_cast<uint16>(CopyTrimmed(event.query, PSCH_MAX_QUERY_LEN, query));
787-
}
788-
789880
CopyClientContext(&event);
790881

791882
// Enqueue with recursion guard
@@ -825,6 +916,9 @@ void PschInstallHooks(void) {
825916
EnableQueryId();
826917
#endif
827918

919+
prev_post_parse_analyze = post_parse_analyze_hook;
920+
post_parse_analyze_hook = PschPostParseAnalyze;
921+
828922
prev_executor_start = ExecutorStart_hook;
829923
ExecutorStart_hook = PschExecutorStart;
830924

0 commit comments

Comments
 (0)