Skip to content

Commit 4e68a10

Browse files
committed
support logging session connect attrs to slow query log
1 parent 21b5bb6 commit 4e68a10

14 files changed

Lines changed: 592 additions & 12 deletions

File tree

pkg/executor/adapter_slow_log.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,9 @@ func SetSlowLogItems(a *ExecStmt, txnTS uint64, hasMoreResults bool, items *vari
264264
items.StorageKV = stmtCtx.IsTiKV.Load()
265265
items.StorageMPP = stmtCtx.IsTiFlash.Load()
266266
items.MemArbitration = stmtCtx.MemTracker.MemArbitration().Seconds()
267+
if sessVars.ConnectionInfo != nil && len(sessVars.ConnectionInfo.Attributes) > 0 {
268+
items.SessionConnectAttrs = sessVars.ConnectionInfo.Attributes
269+
}
267270

268271
if a.retryCount > 0 {
269272
items.ExecRetryTime = items.TimeTotal - sessVars.DurationParse - sessVars.DurationCompile - time.Since(a.retryStartTime)

pkg/executor/slow_query.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -982,6 +982,9 @@ func (e *slowQueryRetriever) parseLog(ctx context.Context, sctx sessionctx.Conte
982982
} else if strings.HasPrefix(line, variable.SlowLogWarnings+variable.SlowLogSpaceMarkStr) {
983983
line = line[len(variable.SlowLogWarnings+variable.SlowLogSpaceMarkStr):]
984984
valid = e.setColumnValue(sctx, row, tz, variable.SlowLogWarnings, line, e.checker, fileLine)
985+
} else if strings.HasPrefix(line, variable.SlowLogSessionConnectAttrs+variable.SlowLogSpaceMarkStr) {
986+
line = line[len(variable.SlowLogSessionConnectAttrs+variable.SlowLogSpaceMarkStr):]
987+
valid = e.setColumnValue(sctx, row, tz, variable.SlowLogSessionConnectAttrs, line, e.checker, fileLine)
985988
} else if strings.HasPrefix(line, variable.SlowLogDBStr+variable.SlowLogSpaceMarkStr) {
986989
line = line[len(variable.SlowLogDBStr+variable.SlowLogSpaceMarkStr):]
987990
valid = e.setColumnValue(sctx, row, tz, variable.SlowLogDBStr, line, e.checker, fileLine)
@@ -1170,6 +1173,18 @@ func getColumnValueFactoryByName(colName string, columnIdx int) (slowQueryColumn
11701173
row[columnIdx] = types.NewDatum(v)
11711174
return true, nil
11721175
}, nil
1176+
case variable.SlowLogSessionConnectAttrs:
1177+
return func(row []types.Datum, value string, _ *time.Location, _ *slowLogChecker) (valid bool, err error) {
1178+
if len(value) == 0 {
1179+
return true, nil
1180+
}
1181+
bj, err := types.ParseBinaryJSONFromString(value)
1182+
if err != nil {
1183+
return false, err
1184+
}
1185+
row[columnIdx] = types.NewDatum(bj)
1186+
return true, nil
1187+
}, nil
11731188
}
11741189
return nil, nil
11751190
}

pkg/executor/slow_query_sql_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -544,3 +544,54 @@ func TestStorageEnginesInSlowQuery(t *testing.T) {
544544
"where query like 'select%tablesample%;'").
545545
Check(testkit.Rows("1 0"))
546546
}
547+
548+
func TestSessionConnectAttrsInSlowQuery(t *testing.T) {
549+
originCfg := config.GetGlobalConfig()
550+
newCfg := *originCfg
551+
f, err := os.CreateTemp("", "tidb-slow-*.log")
552+
require.NoError(t, err)
553+
_, err = f.WriteString(`# Time: 2024-01-15T10:00:00.000000+08:00
554+
# Txn_start_ts: 123456789
555+
# User@Host: root[root] @ localhost [127.0.0.1]
556+
# Query_time: 0.5
557+
# Digest: 42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772
558+
# Is_internal: false
559+
# Succ: true
560+
# Session_connect_attrs: {"_client_name":"Go-MySQL-Driver","_os":"linux","app_name":"test_app"}
561+
select * from t;
562+
`)
563+
require.NoError(t, err)
564+
require.NoError(t, f.Close())
565+
newCfg.Log.SlowQueryFile = f.Name()
566+
config.StoreGlobalConfig(&newCfg)
567+
defer func() {
568+
config.StoreGlobalConfig(originCfg)
569+
require.NoError(t, os.Remove(newCfg.Log.SlowQueryFile))
570+
}()
571+
require.NoError(t, logutil.InitLogger(newCfg.Log.ToLogConfig()))
572+
store := testkit.CreateMockStore(t)
573+
tk := testkit.NewTestKit(t, store)
574+
575+
tk.MustExec("set @@time_zone='+08:00'")
576+
tk.MustExec(fmt.Sprintf("set @@tidb_slow_query_file='%v'", f.Name()))
577+
578+
// Verify Session_connect_attrs column is present and returns the correct JSON value.
579+
rows := tk.MustQuery("select Session_connect_attrs from information_schema.slow_query " +
580+
"where query = 'select * from t;'").Rows()
581+
require.Len(t, rows, 1)
582+
attrsStr := rows[0][0].(string)
583+
require.Contains(t, attrsStr, `"_client_name"`)
584+
require.Contains(t, attrsStr, `"Go-MySQL-Driver"`)
585+
require.Contains(t, attrsStr, `"_os"`)
586+
require.Contains(t, attrsStr, `"linux"`)
587+
require.Contains(t, attrsStr, `"app_name"`)
588+
require.Contains(t, attrsStr, `"test_app"`)
589+
590+
// Verify individual keys are accessible via JSON_EXTRACT.
591+
tk.MustQuery("select JSON_EXTRACT(Session_connect_attrs, '$._client_name') from information_schema.slow_query " +
592+
"where query = 'select * from t;'").
593+
Check(testkit.Rows(`"Go-MySQL-Driver"`))
594+
tk.MustQuery("select JSON_EXTRACT(Session_connect_attrs, '$.app_name') from information_schema.slow_query " +
595+
"where query = 'select * from t;'").
596+
Check(testkit.Rows(`"test_app"`))
597+
}

pkg/executor/slow_query_test.go

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ select * from t;`
191191
`0.1,0.2,0.03,127.0.0.1:20160,0.05,0.6,0.8,0.0.0.0:20160,70724,23333,65536,0,0,0,30000,3000,10000,1000,500000,500005,300000,300005,0,0,,` +
192192
`Cop_backoff_regionMiss_total_times: 200 Cop_backoff_regionMiss_total_time: 0.2 Cop_backoff_regionMiss_max_time: 0.2 Cop_backoff_regionMiss_max_addr: 127.0.0.1 Cop_backoff_regionMiss_avg_time: 0.2 Cop_backoff_regionMiss_p90_time: 0.2 Cop_backoff_rpcPD_total_times: 200 Cop_backoff_rpcPD_total_time: 0.2 Cop_backoff_rpcPD_max_time: 0.2 Cop_backoff_rpcPD_max_addr: 127.0.0.1 Cop_backoff_rpcPD_avg_time: 0.2 Cop_backoff_rpcPD_p90_time: 0.2 Cop_backoff_rpcTiKV_total_times: 200 Cop_backoff_rpcTiKV_total_time: 0.2 Cop_backoff_rpcTiKV_max_time: 0.2 Cop_backoff_rpcTiKV_max_addr: 127.0.0.1 Cop_backoff_rpcTiKV_avg_time: 0.2 Cop_backoff_rpcTiKV_p90_time: 0.2,` +
193193
`0,0,1,0,1,1,0,default,2.158,2.123,0.05,0.01,0.021,1,1,,60e9378c746d9a2be1c791047e008967cf252eb6de9167ad3aa6098fa2d523f4,` +
194-
`,update t set i = 1;,select * from t;`
194+
`,update t set i = 1;,null,select * from t;`
195195
require.Equal(t, expectRecordString, recordString)
196196

197197
// Issue 20928
@@ -214,7 +214,7 @@ select * from t;`
214214
`0.1,0.2,0.03,127.0.0.1:20160,0.05,0.6,0.8,0.0.0.0:20160,70724,23333,65536,0,0,0,30000,3000,10000,1000,500000,500005,300000,300005,0,0,,` +
215215
`Cop_backoff_regionMiss_total_times: 200 Cop_backoff_regionMiss_total_time: 0.2 Cop_backoff_regionMiss_max_time: 0.2 Cop_backoff_regionMiss_max_addr: 127.0.0.1 Cop_backoff_regionMiss_avg_time: 0.2 Cop_backoff_regionMiss_p90_time: 0.2 Cop_backoff_rpcPD_total_times: 200 Cop_backoff_rpcPD_total_time: 0.2 Cop_backoff_rpcPD_max_time: 0.2 Cop_backoff_rpcPD_max_addr: 127.0.0.1 Cop_backoff_rpcPD_avg_time: 0.2 Cop_backoff_rpcPD_p90_time: 0.2 Cop_backoff_rpcTiKV_total_times: 200 Cop_backoff_rpcTiKV_total_time: 0.2 Cop_backoff_rpcTiKV_max_time: 0.2 Cop_backoff_rpcTiKV_max_addr: 127.0.0.1 Cop_backoff_rpcTiKV_avg_time: 0.2 Cop_backoff_rpcTiKV_p90_time: 0.2,` +
216216
`0,0,1,0,1,1,0,default,2.158,2.123,0.05,0.01,0.021,1,1,,60e9378c746d9a2be1c791047e008967cf252eb6de9167ad3aa6098fa2d523f4,` +
217-
`,update t set i = 1;,select * from t;`
217+
`,update t set i = 1;,null,select * from t;`
218218
require.Equal(t, expectRecordString, recordString)
219219

220220
// fix sql contain '# ' bug
@@ -284,6 +284,62 @@ select * from t;
284284
require.Equal(t, value, "[t:i: a]")
285285
}
286286

287+
func TestParseSlowLogSessionConnectAttrs(t *testing.T) {
288+
// Slow log entry that includes Session_connect_attrs JSON.
289+
slowLogStr := `# Time: 2019-04-28T15:24:04.309074+08:00
290+
# Txn_start_ts: 405888132465033227
291+
# User@Host: root[root] @ localhost [127.0.0.1]
292+
# Query_time: 0.216905
293+
# Digest: 42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772
294+
# Is_internal: false
295+
# Succ: true
296+
# Session_connect_attrs: {"_client_name":"Go-MySQL-Driver","_os":"linux","app_name":"test_app"}
297+
# Prev_stmt: begin;
298+
select * from t;
299+
`
300+
loc, err := time.LoadLocation("Asia/Shanghai")
301+
require.NoError(t, err)
302+
ctx := mock.NewContext()
303+
ctx.ResetSessionAndStmtTimeZone(loc)
304+
305+
// Use the retriever directly (without initialize) to avoid reading
306+
// from actual slow log files on disk, which can produce extra rows.
307+
retriever, err := newSlowQueryRetriever()
308+
require.NoError(t, err)
309+
retriever.columnValueFactoryMap = make(map[string]slowQueryColumnValueFactory, len(retriever.outputCols))
310+
for idx, col := range retriever.outputCols {
311+
factory, err := getColumnValueFactoryByName(col.Name.O, idx)
312+
require.NoError(t, err)
313+
require.NotNil(t, factory, "column %s should have a factory", col.Name.O)
314+
retriever.columnValueFactoryMap[col.Name.O] = factory
315+
}
316+
317+
reader := bufio.NewReader(bytes.NewBufferString(slowLogStr))
318+
rows, err := parseLog(retriever, ctx, reader)
319+
require.NoError(t, err)
320+
require.Len(t, rows, 1)
321+
322+
// Find the Session_connect_attrs column.
323+
colIdx := -1
324+
for i, col := range retriever.outputCols {
325+
if col.Name.L == strings.ToLower(variable.SlowLogSessionConnectAttrs) {
326+
colIdx = i
327+
break
328+
}
329+
}
330+
require.NotEqual(t, -1, colIdx, "Session_connect_attrs column should exist")
331+
332+
// Verify the parsed JSON contains the expected keys.
333+
bj := rows[0][colIdx].GetMysqlJSON()
334+
bjStr := bj.String()
335+
require.Contains(t, bjStr, `"_client_name"`)
336+
require.Contains(t, bjStr, `"Go-MySQL-Driver"`)
337+
require.Contains(t, bjStr, `"_os"`)
338+
require.Contains(t, bjStr, `"linux"`)
339+
require.Contains(t, bjStr, `"app_name"`)
340+
require.Contains(t, bjStr, `"test_app"`)
341+
}
342+
287343
// It changes variable.MaxOfMaxAllowedPacket, so must be stayed in SerialSuite.
288344
func TestParseSlowLogFileSerial(t *testing.T) {
289345
loc, err := time.LoadLocation("Asia/Shanghai")

pkg/infoschema/tables.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -983,6 +983,7 @@ var slowQueryCols = []columnInfo{
983983
{name: variable.SlowLogPlanDigest, tp: mysql.TypeVarchar, size: 128},
984984
{name: variable.SlowLogBinaryPlan, tp: mysql.TypeLongBlob, size: types.UnspecifiedLength},
985985
{name: variable.SlowLogPrevStmt, tp: mysql.TypeLongBlob, size: types.UnspecifiedLength},
986+
{name: variable.SlowLogSessionConnectAttrs, tp: mysql.TypeJSON, size: types.UnspecifiedLength},
986987
{name: variable.SlowLogQuerySQLStr, tp: mysql.TypeLongBlob, size: types.UnspecifiedLength},
987988
}
988989

0 commit comments

Comments
 (0)