@@ -28,7 +28,7 @@ use slog::Logger;
2828use slog:: debug;
2929use slog:: trace;
3030use std:: collections:: BTreeMap ;
31- use std:: collections:: HashMap ;
31+ use std:: collections:: BTreeSet ;
3232use std:: time:: Duration ;
3333use std:: time:: Instant ;
3434use uuid:: Uuid ;
@@ -930,120 +930,96 @@ impl Client {
930930 preds : Option < & oxql:: ast:: table_ops:: filter:: Filter > ,
931931 ) -> Result < String , Error > {
932932 // Filter down the fields to those which apply to this timeseries
933- // itself, and rewrite as a DB-safe WHERE clause.
933+ // itself, and rewrite as a DB-safe clause.
934934 let preds_for_fields = preds
935935 . map ( |p| Self :: rewrite_predicate_for_fields ( schema, p) )
936936 . transpose ( ) ?
937937 . flatten ( ) ;
938- let ( already_has_where , mut query) = self . all_fields_query_raw ( schema) ;
938+ let mut query = self . all_fields_query_raw ( schema) ;
939939 if let Some ( preds) = preds_for_fields {
940- // If the raw field has only a single select query, then we've
941- // already added a "WHERE" clause. Simply tack these predicates onto
942- // that one.
943- if already_has_where {
944- query. push_str ( " AND " ) ;
945- } else {
946- query. push_str ( " WHERE " ) ;
947- }
940+ query. push_str ( " HAVING " ) ;
948941 query. push_str ( & preds) ;
949942 }
950943 Ok ( query)
951944 }
952945
953- // Build a reasonably efficient query to retrieve all fields for a given
954- // timeseries. Joins in ClickHouse are expensive, so aggregate all relevant
955- // fields from each relevant fields table in a single subquery, then join
956- // the results together. This results in n - 1 joins, where n is the number
957- // of relevant fields tables. Note that we may be able to improve
958- // performance in future ClickHouse versions, which have better support for
959- // Variant types, better support for the merge() table function, and faster
960- // joins.
961- fn all_fields_query_raw (
962- & self ,
963- schema : & TimeseriesSchema ,
964- ) -> ( bool , String ) {
946+ // Build a query to retrieve all fields for a given timeseries. In
947+ // ClickHouse, JOINs are slow, so we construct a query that doesn't use
948+ // them. We identify all relevant field tables, and for each table, we
949+ // select the relevant field values. We also select NULL for each other
950+ // field type so that we can UNION ALL the results of our per-table
951+ // subqueries. Then we pivot the unioned results to wide rows using anyIf.
952+ fn all_fields_query_raw ( & self , schema : & TimeseriesSchema ) -> String {
965953 match schema. field_schema . len ( ) {
966954 0 => unreachable ! ( ) ,
967955 _ => {
968- // Build a vector of top-level select expressions, as well as a
969- // map from fields to lists of subquery select expressions.
970- let mut top_selects: Vec < String > = Vec :: new ( ) ;
971- let mut select_map: HashMap < oximeter:: FieldType , Vec < String > > =
972- HashMap :: new ( ) ;
973- for field_schema in schema. field_schema . iter ( ) {
974- select_map
975- . entry ( field_schema. field_type )
976- . or_insert_with ( || vec ! [ String :: from( "timeseries_key" ) ] )
977- . push ( format ! (
978- "anyIf(field_value, field_name = '{}') AS {}" ,
979- field_schema. name, field_schema. name
980- ) ) ;
981- top_selects. push ( format ! (
982- "{}_pivot.{} AS {}" ,
983- field_table_name( field_schema. field_type) ,
984- field_schema. name,
985- field_schema. name
986- ) ) ;
987- }
956+ let field_types: BTreeSet < oximeter:: FieldType > =
957+ schema. field_schema . iter ( ) . map ( |f| f. field_type ) . collect ( ) ;
958+
959+ // Build top-level SELECT columns. For each field, we use
960+ // anyIf to extract the value from the appropriate type column.
961+ // We can use anyIf to take the first matching value because a
962+ // given timeseries key is always associated with the same set
963+ // of fields, so all rows with a given (timeseries_key,
964+ // field_name) will have the same field_value.
965+ //
966+ // We wrap the result in assumeNotNull() because the UNION
967+ // ALL with NULL placeholders causes anyIf to return Nullable
968+ // types, but we know fields will always have values for
969+ // matching keys.
970+ let mut top_selects: Vec < String > = schema
971+ . field_schema
972+ . iter ( )
973+ . map ( |f| {
974+ format ! (
975+ "assumeNotNull(anyIf({}, field_name = '{}')) AS \" {}\" " ,
976+ field_table_name( f. field_type) ,
977+ f. name,
978+ f. name
979+ )
980+ } )
981+ . collect ( ) ;
982+ top_selects. push ( String :: from ( "timeseries_key" ) ) ;
988983
989- // Sort field tables by number of columns, descending.
990- // ClickHouse recommends joining larger tables to smaller
991- // tables, and doesn't currently reorder joins automatically.
992- let mut field_types: Vec < oximeter:: FieldType > =
993- select_map. keys ( ) . cloned ( ) . collect ( ) ;
994- field_types. sort_by ( |a, b| {
995- select_map[ b]
996- . len ( )
997- . cmp ( & select_map[ a] . len ( ) )
998- . then ( field_table_name ( * a) . cmp ( & field_table_name ( * b) ) )
999- } ) ;
984+ // Build UNION ALL subqueries, one per field type. Each
985+ // subquery selects timeseries_key, field_name, and a value
986+ // column for each relevant type (field_value for its own
987+ // type, NULL for others). We emit NULLs so that we can UNION
988+ // ALL the resulting subqueries.
1000989
1001- // Build a map from field type to pivot subquery. We filter by
1002- // timeseries_name, group by timeseries_key, and use anyIf to
1003- // pivot fields to a wide table. We can use anyIf to take the
1004- // first matching value because a given timeseries key is
1005- // always associated with the same set of fields, so all rows
1006- // with a given (timeseries_key, field_name) will have the same
1007- // field_value.
1008- let mut query_map: HashMap < oximeter:: FieldType , String > =
1009- HashMap :: new ( ) ;
1010- for field_type in field_types. clone ( ) {
1011- let selects = & select_map[ & field_type] ;
1012- let query = format ! (
1013- "(
1014- SELECT
1015- {select}
1016- FROM {db_name}.{from}
1017- WHERE timeseries_name = '{timeseries_name}'
1018- GROUP BY timeseries_key
1019- ) AS {subquery_name}_pivot" ,
1020- select = selects. join( ", " ) ,
1021- db_name = crate :: DATABASE_NAME ,
1022- from = field_table_name( field_type) ,
1023- timeseries_name = schema. timeseries_name,
1024- subquery_name = field_table_name( field_type) ,
1025- ) ;
1026- query_map. insert ( field_type, query) ;
1027- }
990+ let null_cols: Vec < String > = field_types
991+ . iter ( )
992+ . map ( |& field_type| {
993+ format ! ( "NULL AS {}" , field_table_name( field_type) )
994+ } )
995+ . collect ( ) ;
996+ let union_parts: Vec < String > = field_types
997+ . iter ( )
998+ . enumerate ( )
999+ . map ( |( idx, & field_type) | {
1000+ let mut cols = null_cols. clone ( ) ;
1001+ cols[ idx] = format ! (
1002+ "field_value AS {}" ,
1003+ field_table_name( field_type)
1004+ ) ;
1005+ format ! (
1006+ "SELECT timeseries_key, field_name, {} \
1007+ FROM {}.{} \
1008+ WHERE timeseries_name = '{}'",
1009+ cols. join( ", " ) ,
1010+ crate :: DATABASE_NAME ,
1011+ field_table_name( field_type) ,
1012+ schema. timeseries_name,
1013+ )
1014+ } )
1015+ . collect ( ) ;
10281016
1029- // Assemble the final query.
1030- let mut from = query_map[ & field_types[ 0 ] ] . clone ( ) ;
1031- for field_type in field_types. iter ( ) . skip ( 1 ) {
1032- from = format ! (
1033- "{from} JOIN {query} ON {source}_pivot.timeseries_key = {dest}_pivot.timeseries_key" ,
1034- from = from,
1035- query = query_map[ field_type] ,
1036- source = field_table_name( field_types[ 0 ] ) ,
1037- dest = field_table_name( * field_type) ,
1038- ) ;
1039- }
1040- top_selects. push ( format ! (
1041- "{}_pivot.timeseries_key AS timeseries_key" ,
1042- field_table_name( field_types[ 0 ] )
1043- ) ) ;
1044- let query =
1045- format ! ( "SELECT {} FROM {}" , top_selects. join( ", " ) , from) ;
1046- ( false , query)
1017+ // Assemble final query.
1018+ format ! (
1019+ "SELECT {} FROM ({}) GROUP BY timeseries_key" ,
1020+ top_selects. join( ", " ) ,
1021+ union_parts. join( " UNION ALL " ) ,
1022+ )
10471023 }
10481024 }
10491025 }
@@ -1189,6 +1165,7 @@ mod tests {
11891165 QueryAuthzScope , chunk_consistent_key_groups_impl,
11901166 } ;
11911167 use crate :: oxql:: ast:: grammar:: query_parser;
1168+ use crate :: oxql:: ast:: table_ops:: filter:: Filter ;
11921169 use crate :: { Client , DATABASE_TIMESTAMP_FORMAT , DbWrite } ;
11931170 use crate :: { Metric , Target } ;
11941171 use chrono:: { DateTime , NaiveDate , Utc } ;
@@ -1352,42 +1329,20 @@ mod tests {
13521329 . await
13531330 . unwrap ( )
13541331 . unwrap ( ) ;
1355- let query = ctx. client . all_fields_query ( & schema, None ) . unwrap ( ) ;
1356- let want = "SELECT
1357- fields_i32_pivot.foo AS foo,
1358- fields_u32_pivot.index AS index,
1359- fields_string_pivot.name AS name,
1360- fields_i32_pivot.timeseries_key AS timeseries_key
1361- FROM
1362- (
1363- SELECT
1364- timeseries_key,
1365- anyIf(field_value, field_name = 'foo') AS foo
1366- FROM oximeter.fields_i32
1367- WHERE timeseries_name = 'some_target:some_metric'
1368- GROUP BY timeseries_key
1369- ) AS fields_i32_pivot
1370- JOIN
1371- (
1372- SELECT
1373- timeseries_key,
1374- anyIf(field_value, field_name = 'name') AS name
1375- FROM oximeter.fields_string
1376- WHERE timeseries_name = 'some_target:some_metric'
1377- GROUP BY timeseries_key
1378- ) AS fields_string_pivot ON fields_i32_pivot.timeseries_key = fields_string_pivot.timeseries_key
1379- JOIN
1380- (
1381- SELECT
1382- timeseries_key,
1383- anyIf(field_value, field_name = 'index') AS index
1384- FROM oximeter.fields_u32
1385- WHERE timeseries_name = 'some_target:some_metric'
1386- GROUP BY timeseries_key
1387- ) AS fields_u32_pivot ON fields_i32_pivot.timeseries_key = fields_u32_pivot.timeseries_key" ;
1388- assert_eq ! (
1389- want. split_whitespace( ) . collect:: <Vec <& str >>( ) . join( " " ) ,
1390- query. split_whitespace( ) . collect:: <Vec <& str >>( ) . join( " " )
1332+ let filter: Filter = "index == 0" . parse ( ) . unwrap ( ) ;
1333+ let query =
1334+ ctx. client . all_fields_query ( & schema, Some ( & filter) ) . unwrap ( ) ;
1335+ let formatted = sqlformat:: format (
1336+ & query,
1337+ & sqlformat:: QueryParams :: None ,
1338+ & sqlformat:: FormatOptions {
1339+ uppercase : Some ( true ) ,
1340+ ..Default :: default ( )
1341+ } ,
1342+ ) ;
1343+ expectorate:: assert_contents (
1344+ "test-output/all-fields-query.sql" ,
1345+ & formatted,
13911346 ) ;
13921347
13931348 ctx. cleanup_successful ( ) . await ;
0 commit comments