Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions pkg/planner/core/plan_cache_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,57 @@ type PlanCacheStmt struct {
tbls []table.Table
}

// PrepareStmtCacheEntry is stored in the session-level prepare dedup cache.
// It holds the result of a full Prepare flow so that subsequent Prepares of
// the same SQL text can skip the expensive Preprocess + PlanBuilder.Build.
type PrepareStmtCacheEntry struct {
Stmt *PlanCacheStmt
Fields []*resolve.ResultField
ParamCount int
}

// ExtractAndSortParamMarkers extracts ParamMarkerExpr nodes from the AST,
// sorts them by position, assigns their order indices, and initialises each
// marker's Datum to NULL (matching the behaviour of GeneratePlanCacheStmtWithAST
// for prepared statements where the actual parameter values are not yet known).
func ExtractAndSortParamMarkers(stmtNode ast.StmtNode) []ast.ParamMarkerExpr {
var extractor paramMarkerExtractor
stmtNode.Accept(&extractor)
slices.SortFunc(extractor.markers, func(i, j ast.ParamMarkerExpr) int {
return cmp.Compare(i.(*driver.ParamMarkerExpr).Offset, j.(*driver.ParamMarkerExpr).Offset)
})
for i, m := range extractor.markers {
m.SetOrder(i)
p := m.(*driver.ParamMarkerExpr)
p.Datum.SetNull()
p.InExecute = false
}
return extractor.markers
}

// CollectPlanCacheStmtInfo walks the AST to populate the limits, hasSubquery,
// and tables fields of stmt. It must be called on the fresh AST after a
// re-parse so that limit nodes and table references point into the new tree.
func CollectPlanCacheStmtInfo(ctx context.Context, is infoschema.InfoSchema, stmt *PlanCacheStmt, stmtNode ast.StmtNode) {
processor := &planCacheStmtProcessor{ctx: ctx, is: is, stmt: stmt}
stmtNode.Accept(processor)
}

// DBName returns the dbName field (used for metadata lock during Execute).
func (s *PlanCacheStmt) DBName() []model.CIStr { return s.dbName }

// Tbls returns the tbls field (used for metadata lock during Execute).
func (s *PlanCacheStmt) Tbls() []table.Table { return s.tbls }

// SetDBNameAndTbls sets the dbName and tbls fields, cloning the input slices
// so that this PlanCacheStmt owns independent backing arrays. This is required
// because planCachePreprocess replaces tbls[i] in-place during Execute, and
// sharing the backing array with a cached template would cause cross-stmt contamination.
func (s *PlanCacheStmt) SetDBNameAndTbls(dbName []model.CIStr, tbls []table.Table) {
s.dbName = slices.Clone(dbName)
s.tbls = slices.Clone(tbls)
}

// GetPreparedStmt extract the prepared statement from the execute statement.
func GetPreparedStmt(stmt *ast.ExecuteStmt, vars *variable.SessionVars) (*PlanCacheStmt, error) {
if stmt.PrepStmt != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/server/internal/testserverclient/server_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2581,6 +2581,10 @@ func (cli *TestServerClient) RunTestInitConnect(t *testing.T) {
// and not internal SQL statements. Thus, this test is in the server-test suite.
func (cli *TestServerClient) RunTestInfoschemaClientErrors(t *testing.T) {
cli.RunTestsOnNewDB(t, nil, "clientErrors", func(dbt *testkit.DBTestKit) {
dbt.MustExec("set @@tidb_enable_cache_prepare_stmt = off")
defer func() {
dbt.MustExec("set @@tidb_enable_cache_prepare_stmt = default")
}()
clientErrors := []struct {
stmt string
incrementWarnings bool
Expand Down
130 changes: 130 additions & 0 deletions pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
stderrs "errors"
"fmt"
"iter"
"maps"
"math"
"math/rand"
"runtime/pprof"
Expand Down Expand Up @@ -2529,6 +2530,37 @@ func (s *session) PrepareStmt(sql string) (stmtID uint32, paramCount int, fields
if err = sessiontxn.GetTxnManager(s).AdviseWarmup(); err != nil {
return
}

var dedupKey string
if s.sessionVars.EnableCachePrepareStmt {
// Session-level prepare dedup cache: if the same SQL text has been prepared
// before in this session (with the same charset/collation/currentDB), reuse
// the already-built PlanCacheStmt and skip the expensive Preprocess+Build.
charset, collation := s.sessionVars.GetCharsetInfo()
dedupKey = variable.PrepareDedupCacheKey(sql, charset, collation, s.sessionVars.CurrentDB, s.sessionVars.SQLMode)
if v := s.sessionVars.GetPrepareStmtDedupCache(dedupKey); v != nil {
cached := v.(*plannercore.PrepareStmtCacheEntry)
is := sessiontxn.GetTxnManager(s).GetTxnInfoSchema()
if cached.Stmt.SchemaVersion == is.SchemaMetaVersion() {
newStmt, rebuildErr := s.rebuildFromPrepareCache(ctx, cached, sql, charset, collation)
if rebuildErr == nil {
stmtID = s.sessionVars.GetNextPreparedStmtID()
if err = s.sessionVars.AddPreparedStmt(stmtID, newStmt); err != nil {
s.rollbackOnError(ctx)
return
}
paramCount = cached.ParamCount
fields = cached.Fields
s.rollbackOnError(ctx)
return
}
// Re-parse or rebuild failed; fall through to the full prepare path.
logutil.Logger(ctx).Warn("prepare stmt dedup cache rebuild failed, fallback to full prepare", zap.Error(rebuildErr))
}
// Schema version changed; fall through and re-cache below.
}
}

prepareExec := executor.NewPrepareExec(s, sql)
err = prepareExec.Next(ctx, nil)
// Rollback even if err is nil.
Expand All @@ -2537,9 +2569,107 @@ func (s *session) PrepareStmt(sql string) (stmtID uint32, paramCount int, fields
if err != nil {
return
}

// Store the result in the dedup cache for future Prepares of the same SQL.
if s.sessionVars.EnableCachePrepareStmt {
if prepareExec.Stmt != nil {
s.sessionVars.SetPrepareStmtDedupCache(dedupKey, &plannercore.PrepareStmtCacheEntry{
Stmt: prepareExec.Stmt.(*plannercore.PlanCacheStmt),
Fields: prepareExec.Fields,
ParamCount: prepareExec.ParamCount,
})
}
}
return prepareExec.ID, prepareExec.ParamCount, prepareExec.Fields, nil
}

// rebuildFromPrepareCache constructs a new PlanCacheStmt from a cached entry,
// re-parsing the SQL to obtain an independent AST (with fresh ParamMarkerExpr
// nodes) while skipping only the expensive PlanBuilder.Build step.
// Preprocess is still executed to build a fresh ResolveCtx whose tableNames map
// is keyed by the new AST's TableName pointers; reusing the cached ResolveCtx
// would cause nil-deref panics on plan-cache miss because the old pointer keys
// would not match the newly-parsed AST nodes.
func (s *session) rebuildFromPrepareCache(
ctx context.Context,
cached *plannercore.PrepareStmtCacheEntry,
sql, charset, collation string,
) (*plannercore.PlanCacheStmt, error) {
stmts, _, err := s.ParseSQL(ctx, sql,
parser.CharsetConnection(charset),
parser.CollationConnection(collation),
)
if err != nil {
return nil, err
}
if len(stmts) != 1 {
return nil, errors.New("unexpected statement count after re-parse")
}
stmtNode := stmts[0]

// Extract fresh param markers from the new AST and initialise them to NULL.
markers := plannercore.ExtractAndSortParamMarkers(stmtNode)

is := sessiontxn.GetTxnManager(s).GetTxnInfoSchema()

// Run Preprocess to build a fresh ResolveCtx aligned with the new AST.
// This is the only way to populate ResolveCtx.tableNames with the new
// AST's *ast.TableName pointer keys without re-running the full Build.
ret := &plannercore.PreprocessorReturn{InfoSchema: is}
nodeW := resolve.NewNodeW(stmtNode)
if err = plannercore.Preprocess(ctx, s, nodeW, plannercore.InPrepare,
plannercore.WithPreprocessorReturn(ret)); err != nil {
return nil, err
}
// Defensive: if schema changed between our earlier check and Preprocess,
// fall through to the full prepare path.
if ret.InfoSchema.SchemaMetaVersion() != cached.Stmt.SchemaVersion {
return nil, errors.New("schema version changed during rebuild")
}

newStmt := &plannercore.PlanCacheStmt{
// Fields derived from the new AST:
PreparedAst: &ast.Prepared{
Stmt: stmtNode,
StmtType: cached.Stmt.PreparedAst.StmtType,
},
Params: markers,

// Fresh ResolveCtx whose tableNames keys match the new AST pointers.
ResolveCtx: nodeW.GetResolveContext(),

// Immutable fields – safe to share with the cached template:
StmtDB: cached.Stmt.StmtDB,
StmtText: cached.Stmt.StmtText,
VisitInfos: cached.Stmt.VisitInfos,
NormalizedSQL: cached.Stmt.NormalizedSQL,
SQLDigest: cached.Stmt.SQLDigest,
ForUpdateRead: cached.Stmt.ForUpdateRead,
SnapshotTSEvaluator: cached.Stmt.SnapshotTSEvaluator,
StmtCacheable: cached.Stmt.StmtCacheable,
UncacheableReason: cached.Stmt.UncacheableReason,
SchemaVersion: cached.Stmt.SchemaVersion,

// Mutable containers – clone so each stmt has independent state:
RelateVersion: maps.Clone(cached.Stmt.RelateVersion),
// PointGet is zeroed (per-execution executor state must not leak).
// NormalizedPlan / PlanDigest are left as zero values; they will be
// populated on the first plan-cache miss during Execute.
}

// Walk the new AST to populate limits, hasSubquery, and tables.
// These fields hold pointers into the AST, so they must refer to the
// newly-parsed tree rather than the cached one.
plannercore.CollectPlanCacheStmtInfo(ctx, is, newStmt, stmtNode)

// dbName and tbls are only read during Execute (not written to by
// CollectPlanCacheStmtInfo since that populates tables, not tbls).
// Clone them so that planCachePreprocess can safely replace tbls[i].
newStmt.SetDBNameAndTbls(cached.Stmt.DBName(), cached.Stmt.Tbls())

return newStmt, nil
}

// ExecutePreparedStmt executes a prepared statement.
func (s *session) ExecutePreparedStmt(ctx context.Context, stmtID uint32, params []expression.Expression) (sqlexec.RecordSet, error) {
prepStmt, err := s.sessionVars.GetPreparedStmtByID(stmtID)
Expand Down
3 changes: 2 additions & 1 deletion pkg/session/test/common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ go_test(
srcs = [
"common_test.go",
"main_test.go",
"prepare_dedup_cache_test.go",
],
flaky = True,
shard_count = 10,
shard_count = 15,
deps = [
"//pkg/config",
"//pkg/expression",
Expand Down
Loading