@@ -556,13 +556,153 @@ def get_prometheus_client():
556556 raise
557557
558558
559- def get_query_texts_from_sink (db_name : str = None , truncation_mode : str = 'smart' ,
560- max_age_hours : int = None ) -> dict :
559+ # Used to gate the cross-source-duplicate warning so we do not spam logs once
560+ # per dashboard refresh. Keyed by datname; emits at most one warning per
561+ # (datname, source-set) pair per process.
562+ _logged_cross_source_warnings : set = set ()
563+
564+
565+ def _should_apply_db_filter (db_name : str | None ) -> bool :
566+ """
567+ Decide whether ``db_name`` is a real database name worth filtering on.
568+
569+ Skips empty strings, the literal Grafana ``All`` sentinel, and unresolved
570+ template variables (``$db_name``) that occasionally leak through when a
571+ dashboard panel is rendered before the variable is populated.
572+ """
573+ return bool (db_name ) and db_name .lower () not in ('all' , '' ) and not db_name .startswith ('$' )
574+
575+
576+ def _build_query_text_lookup_sql (* , with_db_filter : bool , time_clause : str | None ) -> str :
577+ """
578+ Build the SQL for queryid → query text lookups against the sink table.
579+
580+ The sink table ``public.pgss_queryid_queries`` is LIST-partitioned by
581+ the ``dbname`` column, which stores the *pgwatch source name* (e.g.
582+ ``my-host-cone-demo``) — NOT the actual Postgres database name. The
583+ actual database name (``datname``) is stored inside the JSONB payload
584+ as ``data->>'real_dbname'``. Dashboards pass the Prometheus
585+ ``datname`` label, so the filter must be on the JSONB key. Filtering
586+ on the partition ``dbname`` column with a ``datname`` value yields
587+ zero rows — that was the root cause of MR !271.
588+
589+ Args:
590+ with_db_filter: when True, append the
591+ ``data->>'real_dbname' = %s`` predicate (caller binds a
592+ ``datname``). The companion ``data ? 'real_dbname'`` is also
593+ emitted so the absent-key case is explicit (older pgwatch
594+ payloads or non-pgss rows missing the key are excluded
595+ deliberately, not by accident).
596+ time_clause: optional ``time > now() - ...`` fragment to bound
597+ the scan, e.g. ``"time > now() - make_interval(hours => %s)"``.
598+
599+ When ``with_db_filter`` is True, the SQL also surfaces a
600+ ``source_dbnames`` column with the array of partition names that
601+ hold a row for the same queryid. Callers use this to detect (and
602+ log) the multi-source-ambiguity scenario flagged in MR !271's
603+ review: two pgwatch sources observing the same physical database
604+ would each contribute a row, and ``DISTINCT ON`` would pick one
605+ arbitrarily by ``time DESC``.
606+
607+ The aggregation cannot use ``array_agg(distinct ...) OVER (...)``
608+ because PostgreSQL does not implement ``DISTINCT`` in window
609+ aggregates (`FeatureNotSupported`). Instead, the filtered scan
610+ is materialized once via a CTE and joined to a separate
611+ ``GROUP BY queryid`` aggregate — same single underlying scan
612+ plan, but only standard SQL constructs.
613+
614+ Performance note: efficient evaluation of the
615+ ``data->>'real_dbname'`` predicate requires the expression index
616+ ``pgss_queryid_queries_real_dbname_time_idx`` declared in
617+ ``config/sink-postgres/init.sql`` (also covers the ``time``
618+ column). Without that index this becomes a per-partition seq scan.
619+ """
620+ where_clauses = [
621+ "data->>'queryid' is not null" ,
622+ "data->>'query' is not null" ,
623+ ]
624+ if with_db_filter :
625+ where_clauses .append ("data ? 'real_dbname'" )
626+ where_clauses .append ("data->>'real_dbname' = %s" )
627+ if time_clause :
628+ where_clauses .append (time_clause )
629+ where_sql = "\n and " .join (where_clauses )
630+
631+ if with_db_filter :
632+ return (
633+ "with filtered as (\n "
634+ " select\n "
635+ " data->>'queryid' as queryid,\n "
636+ " data->>'query' as query,\n "
637+ " dbname,\n "
638+ " time\n "
639+ " from public.pgss_queryid_queries\n "
640+ " where\n "
641+ f" { where_sql } \n "
642+ "),\n "
643+ "sources as (\n "
644+ " select queryid, array_agg(distinct dbname) as source_dbnames\n "
645+ " from filtered\n "
646+ " group by queryid\n "
647+ "),\n "
648+ "latest as (\n "
649+ " select distinct on (queryid) queryid, query\n "
650+ " from filtered\n "
651+ " order by queryid, time desc\n "
652+ ")\n "
653+ "select l.queryid, l.query, s.source_dbnames\n "
654+ "from latest l\n "
655+ "join sources s using (queryid)"
656+ )
657+ return (
658+ "select distinct on (data->>'queryid')\n "
659+ " data->>'queryid' as queryid,\n "
660+ " data->>'query' as query\n "
661+ "from public.pgss_queryid_queries\n "
662+ "where\n "
663+ f" { where_sql } \n "
664+ "order by data->>'queryid', time desc"
665+ )
666+
667+
668+ def _warn_on_cross_source_duplicates (db_name : str , duplicates : list ) -> None :
669+ """
670+ Log a one-shot warning when a single ``datname`` is observed under
671+ more than one pgwatch source partition. ``DISTINCT ON`` silently
672+ picks one row per queryid by ``time DESC``, so without this signal a
673+ cross-tenant collision (two sources monitoring the same physical
674+ database name) would surface query texts from a non-deterministic
675+ source. Each duplicate is a tuple ``(queryid, source_dbnames)``.
676+ """
677+ if not duplicates :
678+ return
679+ # Dedupe by the full (datname, sorted source-set) tuple — the same
680+ # collision will be observed on every refresh, no need to log twice.
681+ fingerprint = (db_name , tuple (sorted ({tuple (sorted (s [1 ])) for s in duplicates })))
682+ if fingerprint in _logged_cross_source_warnings :
683+ return
684+ _logged_cross_source_warnings .add (fingerprint )
685+ queryid_sample , sources_sample = duplicates [0 ]
686+ logger .warning (
687+ "Cross-source ambiguity for datname=%s: %d queryid(s) appear in "
688+ "multiple pgwatch source partitions; DISTINCT ON picks one by "
689+ "time DESC. Example: queryid=%s, sources=%s. Consider scoping "
690+ "by source name as well, or changing the dedup trigger to "
691+ "(real_dbname, queryid)." ,
692+ db_name , len (duplicates ), queryid_sample , sorted (sources_sample ),
693+ )
694+
695+
696+ def get_query_texts_from_sink (db_name : str | None = None , truncation_mode : str = 'smart' ,
697+ max_age_hours : int | None = None ) -> dict :
561698 """
562699 Fetch queryid-to-query text mappings from the PostgreSQL sink database.
563700
564701 Args:
565- db_name: Optional database name to filter results
702+ db_name: Optional Postgres database name (``datname``) to filter
703+ results. Matched against ``data->>'real_dbname'`` in the
704+ JSONB payload, NOT the partition ``dbname`` column (which
705+ holds the pgwatch source name).
566706 truncation_mode: 'smart' for smart truncation, 'raw' for simple truncation
567707 max_age_hours: Only return queryids seen within this many hours (None = use retention window)
568708
@@ -578,51 +718,37 @@ def get_query_texts_from_sink(db_name: str = None, truncation_mode: str = 'smart
578718 try :
579719 conn = psycopg2 .connect (POSTGRES_SINK_URL , connect_timeout = 10 )
580720 with conn .cursor (cursor_factory = psycopg2 .extras .DictCursor ) as cursor :
581- # Skip db_name filter if it's empty, "All", or contains special chars
582- use_db_filter = db_name and db_name .lower () not in ('all' , '' ) and not db_name .startswith ('$' )
721+ use_db_filter = _should_apply_db_filter (db_name )
722+ sql = _build_query_text_lookup_sql (
723+ with_db_filter = use_db_filter ,
724+ time_clause = 'time > now() - make_interval(hours => %s)' ,
725+ )
583726 if use_db_filter :
584- query = """
585- SELECT DISTINCT ON (data->>'queryid')
586- data->>'queryid' as queryid,
587- data->>'query' as query
588- FROM public.pgss_queryid_queries
589- WHERE
590- dbname = %s
591- AND time > now() - make_interval(hours => %s)
592- AND data->>'queryid' IS NOT NULL
593- AND data->>'query' IS NOT NULL
594- ORDER BY data->>'queryid', time DESC
595- """
596- cursor .execute (query , (db_name , max_age_hours ))
727+ cursor .execute (sql , (db_name , max_age_hours ))
597728 else :
598- query = """
599- SELECT DISTINCT ON (data->>'queryid')
600- data->>'queryid' as queryid,
601- data->>'query' as query
602- FROM public.pgss_queryid_queries
603- WHERE
604- time > now() - make_interval(hours => %s)
605- AND data->>'queryid' IS NOT NULL
606- AND data->>'query' IS NOT NULL
607- ORDER BY data->>'queryid', time DESC
608- """
609- cursor .execute (query , (max_age_hours ,))
729+ cursor .execute (sql , (max_age_hours ,))
610730
731+ cross_source_dupes = []
611732 for row in cursor :
612733 queryid = row ['queryid' ]
613734 query_text = row ['query' ]
735+ if use_db_filter :
736+ sources = row ['source_dbnames' ] or []
737+ if len (sources ) > 1 :
738+ cross_source_dupes .append ((queryid , list (sources )))
614739 if queryid :
615740 if query_text :
616741 if truncation_mode == 'raw' :
617- # Raw truncation: normalize whitespace and truncate
618742 normalized = ' ' .join (query_text .split ())
619743 query_text = (normalized [:147 ] + '...' ) if len (normalized ) > 150 else normalized
620744 else :
621- # Smart truncation (extracts table names)
622745 query_text = smart_truncate_query (query_text , 150 )
623746 else :
624747 query_text = ''
625748 query_texts [queryid ] = query_text
749+
750+ if use_db_filter :
751+ _warn_on_cross_source_duplicates (db_name , cross_source_dupes )
626752 except Exception as e :
627753 logger .warning (f"Failed to fetch query texts from sink database: { e } " )
628754 finally :
@@ -1510,42 +1636,34 @@ def get_query_texts():
15101636 try :
15111637 conn = psycopg2 .connect (POSTGRES_SINK_URL , connect_timeout = 10 )
15121638 with conn .cursor (cursor_factory = psycopg2 .extras .DictCursor ) as cursor :
1513- # Skip db_name filter if it's empty, "All", or contains special chars
1514- use_db_filter = db_name and db_name .lower () not in ('all' , '' ) and not db_name .startswith ('$' )
1639+ use_db_filter = _should_apply_db_filter (db_name )
1640+ # Bound the scan by ``QUERYID_RETENTION_HOURS`` — without
1641+ # a time predicate every dashboard load scans every
1642+ # partition end-to-end, which degrades as data accumulates.
1643+ sql = _build_query_text_lookup_sql (
1644+ with_db_filter = use_db_filter ,
1645+ time_clause = 'time > now() - make_interval(hours => %s)' ,
1646+ )
15151647 if use_db_filter :
1516- query = """
1517- SELECT DISTINCT ON (data->>'queryid')
1518- data->>'queryid' as queryid,
1519- data->>'query' as query
1520- FROM public.pgss_queryid_queries
1521- WHERE
1522- dbname = %s
1523- AND data->>'queryid' IS NOT NULL
1524- AND data->>'query' IS NOT NULL
1525- ORDER BY data->>'queryid', time DESC
1526- """
1527- cursor .execute (query , (db_name ,))
1648+ cursor .execute (sql , (db_name , QUERYID_RETENTION_HOURS ))
15281649 else :
1529- query = """
1530- SELECT DISTINCT ON (data->>'queryid')
1531- data->>'queryid' as queryid,
1532- data->>'query' as query
1533- FROM public.pgss_queryid_queries
1534- WHERE
1535- data->>'queryid' IS NOT NULL
1536- AND data->>'query' IS NOT NULL
1537- ORDER BY data->>'queryid', time DESC
1538- """
1539- cursor .execute (query )
1650+ cursor .execute (sql , (QUERYID_RETENTION_HOURS ,))
15401651
1652+ cross_source_dupes = []
15411653 for row in cursor :
15421654 queryid = row ['queryid' ]
15431655 query_text = row ['query' ]
1656+ if use_db_filter :
1657+ sources = row ['source_dbnames' ] or []
1658+ if len (sources ) > 1 :
1659+ cross_source_dupes .append ((queryid , list (sources )))
15441660 if queryid :
1545- # Smart truncation for chart legend display
15461661 # Use defensive check for None (despite SQL filter, drivers may return None)
15471662 query_text = smart_truncate_query (query_text or '' , truncate_len )
15481663 query_texts [queryid ] = query_text or ''
1664+
1665+ if use_db_filter :
1666+ _warn_on_cross_source_duplicates (db_name , cross_source_dupes )
15491667 except Exception as e :
15501668 logger .warning (f"Failed to fetch query texts from sink database: { e } " )
15511669 finally :
@@ -1631,38 +1749,24 @@ def get_query_info_metrics():
16311749 # Only export queryids recently seen in pg_stat_statements.
16321750 # The dedup trigger refreshes the timestamp on each collection,
16331751 # so active queryids have time within the last few minutes.
1634- use_db_filter = db_name and db_name .lower () not in ('all' , '' ) and not db_name .startswith ('$' )
1752+ use_db_filter = _should_apply_db_filter (db_name )
1753+ sql = _build_query_text_lookup_sql (
1754+ with_db_filter = use_db_filter ,
1755+ time_clause = 'time > now() - make_interval(mins => %s)' ,
1756+ )
16351757 if use_db_filter :
1636- query = """
1637- SELECT DISTINCT ON (data->>'queryid')
1638- data->>'queryid' as queryid,
1639- data->>'query' as query
1640- FROM public.pgss_queryid_queries
1641- WHERE
1642- dbname = %s
1643- AND time > now() - make_interval(mins => %s)
1644- AND data->>'queryid' IS NOT NULL
1645- AND data->>'query' IS NOT NULL
1646- ORDER BY data->>'queryid', time DESC
1647- """
1648- cursor .execute (query , (db_name , QUERYID_ACTIVE_MINUTES ))
1758+ cursor .execute (sql , (db_name , QUERYID_ACTIVE_MINUTES ))
16491759 else :
1650- query = """
1651- SELECT DISTINCT ON (data->>'queryid')
1652- data->>'queryid' as queryid,
1653- data->>'query' as query
1654- FROM public.pgss_queryid_queries
1655- WHERE
1656- time > now() - make_interval(mins => %s)
1657- AND data->>'queryid' IS NOT NULL
1658- AND data->>'query' IS NOT NULL
1659- ORDER BY data->>'queryid', time DESC
1660- """
1661- cursor .execute (query , (QUERYID_ACTIVE_MINUTES ,))
1760+ cursor .execute (sql , (QUERYID_ACTIVE_MINUTES ,))
16621761
1762+ cross_source_dupes = []
16631763 for row in cursor :
16641764 queryid = row ['queryid' ]
16651765 query_text = row ['query' ] or '' # Defensive check for None
1766+ if use_db_filter :
1767+ sources = row ['source_dbnames' ] or []
1768+ if len (sources ) > 1 :
1769+ cross_source_dupes .append ((queryid , list (sources )))
16661770 if queryid :
16671771 # Normalize whitespace for raw truncation
16681772 normalized_text = ' ' .join (query_text .split ()) if query_text else ''
@@ -1704,6 +1808,8 @@ def get_query_info_metrics():
17041808 'queryid_only' : queryid_only ,
17051809 }
17061810
1811+ if use_db_filter :
1812+ _warn_on_cross_source_duplicates (db_name , cross_source_dupes )
17071813 logger .info (f"Exported { len (query_data )} active queryids for metrics" )
17081814 except Exception as e :
17091815 logger .warning (f"Failed to fetch query texts from sink database: { e } " )
0 commit comments