Skip to content

Commit 0ad2926

Browse files
zhuofenglzf575
authored andcommitted
fix: aggregate unsummable metrics with max instead of avg
(cherry picked from commit d88c48f75eb18c4669b68415c08da7d916ce5c24)
1 parent 1a30dca commit 0ad2926

2 files changed

Lines changed: 90 additions & 1 deletion

File tree

server/ingester/pkg/ckwriter/ckwriter.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,18 @@ func initTable(conn clickhouse.Conn, timeZone string, t *ckdb.Table, orgID uint1
247247
if err := ExecSQL(conn, t.MakeAggrTableRenameSQL1H(orgID)); err != nil {
248248
log.Warningf("drop 1h agg table failed: %s", err)
249249
}
250+
} else if t.IsAggrMaxColumnWrong(aggrTableCreateSQL) {
251+
// the aggregation of unsummable(_max) columns changed from avg to max;
252+
// alter the columns in place to keep the 1h aggregated history. the mv is
253+
// dropped first so it stops writing avgState and is recreated with maxState below.
254+
if err := ExecSQL(conn, t.MakeAggrMvTableDropSQL1H(orgID)); err != nil {
255+
log.Warningf("drop 1h mv table failed: %s", err)
256+
}
257+
for _, sql := range t.MakeAggrMaxColumnAlterSQL1H(orgID) {
258+
if err := ExecSQL(conn, sql); err != nil {
259+
log.Warningf("alter 1h agg max column failed: %s", err)
260+
}
261+
}
250262
}
251263

252264
if err := ExecSQL(conn, t.MakeAggrTableCreateSQL1H(orgID)); err != nil {
@@ -270,6 +282,18 @@ func initTable(conn clickhouse.Conn, timeZone string, t *ckdb.Table, orgID uint1
270282
if err := ExecSQL(conn, t.MakeAggrTableRenameSQL1D(orgID)); err != nil {
271283
log.Warningf("drop 1d agg table failed: %s", err)
272284
}
285+
} else if t.IsAggrMaxColumnWrong(aggrTableCreateSQL) {
286+
// the aggregation of unsummable(_max) columns changed from avg to max;
287+
// alter the columns in place to keep the 1d aggregated history. the mv is
288+
// dropped first so it stops writing avgState and is recreated with maxState below.
289+
if err := ExecSQL(conn, t.MakeAggrMvTableDropSQL1D(orgID)); err != nil {
290+
log.Warningf("drop 1d mv table failed: %s", err)
291+
}
292+
for _, sql := range t.MakeAggrMaxColumnAlterSQL1D(orgID) {
293+
if err := ExecSQL(conn, sql); err != nil {
294+
log.Warningf("alter 1d agg max column failed: %s", err)
295+
}
296+
}
273297
}
274298
if err := ExecSQL(conn, t.MakeAggrTableCreateSQL1D(orgID)); err != nil {
275299
log.Warningf("create 1d agg table failed: %s", err)

server/libs/ckdb/table.go

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ func isUnsummable(column *Column) bool {
361361

362362
func getAggr(column *Column) string {
363363
if isUnsummable(column) {
364-
return "avg"
364+
return "max"
365365
}
366366
return "sum"
367367
}
@@ -389,6 +389,51 @@ func (t *Table) IsAggrTableWrong(createTableSQL string) bool {
389389
return len(orderKeys) != t.OrderKeysCount()
390390
}
391391

392+
// IsAggrMaxColumnWrong reports whether an existing aggregate table still
393+
// aggregates unsummable(`_max`) columns with `avg` instead of `max`. Such
394+
// tables only need their aggregate columns altered in place (see
395+
// MakeAggrMaxColumnAlterSQLs), not a full rebuild, so historical 1h/1d data
396+
// is preserved.
397+
func (t *Table) IsAggrMaxColumnWrong(createTableSQL string) bool {
398+
return strings.Contains(createTableSQL, "_max__agg` AggregateFunction(avg")
399+
}
400+
401+
// MakeAggrMaxColumnAlterSQLs returns the ALTER statements that migrate the
402+
// unsummable aggregate columns of an existing aggregate table from the old
403+
// `avg` aggregation to `max`. Each column is dropped and re-added as a
404+
// `max` AggregateFunction column; only the data of these specific columns is
405+
// lost, all other columns and rows are kept.
406+
func (t *Table) MakeAggrMaxColumnAlterSQLs(orgID uint16, aggrInterval AggregationInterval) []string {
407+
tableAgg := t.AggrTable(orgID, aggrInterval)
408+
sqls := []string{}
409+
for _, c := range t.Columns {
410+
// ignore fields starting with '_', such as _tid, _id
411+
if strings.HasPrefix(c.Name, "_") || c.IgnoredInAggrTable || c.GroupBy {
412+
continue
413+
}
414+
// only columns that fall into the default branch of
415+
// MakeAggrTableCreateSQL are created via getAggr(); those are the
416+
// ones whose aggregation switched from avg to max.
417+
if c.Aggr != AggrNone || !isUnsummable(c) {
418+
continue
419+
}
420+
aggrColumn := fmt.Sprintf("%s__agg", c.Name)
421+
sqls = append(sqls,
422+
fmt.Sprintf("ALTER TABLE %s DROP COLUMN IF EXISTS `%s`", tableAgg, aggrColumn),
423+
fmt.Sprintf("ALTER TABLE %s ADD COLUMN IF NOT EXISTS `%s` AggregateFunction(max, %s)", tableAgg, aggrColumn, c.Type.String()),
424+
)
425+
}
426+
return sqls
427+
}
428+
429+
func (t *Table) MakeAggrMaxColumnAlterSQL1H(orgID uint16) []string {
430+
return t.MakeAggrMaxColumnAlterSQLs(orgID, AggregationHour)
431+
}
432+
433+
func (t *Table) MakeAggrMaxColumnAlterSQL1D(orgID uint16) []string {
434+
return t.MakeAggrMaxColumnAlterSQLs(orgID, AggregationDay)
435+
}
436+
392437
func (t *Table) IsLocalTableWrong(createTableSql string) bool {
393438
return strings.Contains(createTableSql, "mem-inuse")
394439
}
@@ -397,6 +442,10 @@ func (t *Table) AggrTable(orgID uint16, aggrInterval AggregationInterval) string
397442
return fmt.Sprintf("%s.`%s.%s_agg`", t.OrgDatabase(orgID), t.tableAggrPrefix(), aggrInterval.String())
398443
}
399444

445+
func (t *Table) MvTable(orgID uint16, aggrInterval AggregationInterval) string {
446+
return fmt.Sprintf("%s.`%s.%s_mv`", t.OrgDatabase(orgID), t.tableAggrPrefix(), aggrInterval.String())
447+
}
448+
400449
func (t *Table) AggrTable1S(orgID uint16) string {
401450
return t.AggrTable(orgID, AggregationSecond)
402451
}
@@ -434,6 +483,14 @@ func (t *Table) AggrTable1H(orgID uint16) string {
434483
return t.AggrTable(orgID, AggregationHour)
435484
}
436485

486+
func (t *Table) MvTable1H(orgID uint16) string {
487+
return t.MvTable(orgID, AggregationHour)
488+
}
489+
490+
func (t *Table) MakeAggrMvTableDropSQL1H(orgID uint16) string {
491+
return fmt.Sprintf("DROP TABLE IF EXISTS %s", t.MvTable1H(orgID))
492+
}
493+
437494
func (t *Table) MakeAggrTableRenameSQL1H(orgID uint16) string {
438495
table := t.AggrTable1H(orgID)
439496
return fmt.Sprintf("RENAME TABLE %s to %s_%d`", table, table[:len(table)-1], time.Now().Unix())
@@ -459,6 +516,14 @@ func (t *Table) AggrTable1D(orgID uint16) string {
459516
return t.AggrTable(orgID, AggregationDay)
460517
}
461518

519+
func (t *Table) MvTable1D(orgID uint16) string {
520+
return t.MvTable(orgID, AggregationDay)
521+
}
522+
523+
func (t *Table) MakeAggrMvTableDropSQL1D(orgID uint16) string {
524+
return fmt.Sprintf("DROP TABLE IF EXISTS %s", t.MvTable1D(orgID))
525+
}
526+
462527
func (t *Table) MakeAggrTableRenameSQL1D(orgID uint16) string {
463528
table := t.AggrTable1D(orgID)
464529
return fmt.Sprintf("RENAME TABLE %s to %s_%d`", table, table[:len(table)-1], time.Now().Unix())

0 commit comments

Comments
 (0)