B+树索引(MySQL InnoDB):
- 叶子节点存储数据
- 非叶子节点存储索引
- 所有叶子节点形成链表
-- 创建索引
CREATE INDEX idx_user_name ON users(name);
-- 复合索引
CREATE INDEX idx_user_name_age ON users(name, age);
-- 唯一索引
CREATE UNIQUE INDEX idx_user_email ON users(email);-- 索引:(name, age, city)
-- ✅ 使用索引
SELECT * FROM users WHERE name = 'Alice';
SELECT * FROM users WHERE name = 'Alice' AND age = 25;
SELECT * FROM users WHERE name = 'Alice' AND age = 25 AND city = 'Beijing';
-- ❌ 不使用索引
SELECT * FROM users WHERE age = 25;
SELECT * FROM users WHERE city = 'Beijing';-- 索引:(name, age)
-- ✅ 覆盖索引(无需回表)
SELECT name, age FROM users WHERE name = 'Alice';
-- ❌ 非覆盖索引(需要回表)
SELECT name, age, city FROM users WHERE name = 'Alice';-- ❌ 索引列使用函数
SELECT * FROM users WHERE UPPER(name) = 'ALICE';
-- ✅ 改为
SELECT * FROM users WHERE name = 'Alice';
-- ❌ 索引列参与计算
SELECT * FROM users WHERE age + 1 = 26;
-- ✅ 改为
SELECT * FROM users WHERE age = 25;
-- ❌ 使用 OR(可能失效)
SELECT * FROM users WHERE name = 'Alice' OR city = 'Beijing';
-- ✅ 改为 UNION
SELECT * FROM users WHERE name = 'Alice'
UNION
SELECT * FROM users WHERE city = 'Beijing';
-- ❌ 使用 LIKE '%xxx'
SELECT * FROM users WHERE name LIKE '%Alice';
-- ✅ 改为
SELECT * FROM users WHERE name LIKE 'Alice%';EXPLAIN SELECT * FROM users WHERE name = 'Alice';
-- 输出关键字段:
-- type: ALL(全表扫描) < index < range < ref < eq_ref < const
-- key: 使用的索引
-- rows: 扫描行数
-- Extra: 额外信息(Using index, Using filesort等)type 类型对比:
ALL:全表扫描,最慢index:索引全扫描range:范围扫描(BETWEEN、>、<)ref:非唯一索引扫描eq_ref:唯一索引扫描const:常量查询,最快
// ❌ N+1 查询
func getOrdersWithItems(db *sql.DB, userID int) ([]Order, error) {
// 查询订单(1次)
rows, _ := db.Query("SELECT * FROM orders WHERE user_id = ?", userID)
defer rows.Close()
var orders []Order
for rows.Next() {
var order Order
rows.Scan(&order.ID, &order.UserID)
// 查询订单项(N次)
items, _ := db.Query("SELECT * FROM order_items WHERE order_id = ?", order.ID)
// ...
orders = append(orders, order)
}
return orders, nil
}
// ✅ JOIN 查询
func getOrdersWithItemsOptimized(db *sql.DB, userID int) ([]Order, error) {
query := `
SELECT o.id, o.user_id, oi.id, oi.product_id, oi.quantity
FROM orders o
LEFT JOIN order_items oi ON o.id = oi.order_id
WHERE o.user_id = ?
`
rows, _ := db.Query(query, userID)
defer rows.Close()
ordersMap := make(map[int]*Order)
for rows.Next() {
var orderID, userID, itemID, productID, quantity int
rows.Scan(&orderID, &userID, &itemID, &productID, &quantity)
if order, ok := ordersMap[orderID]; ok {
order.Items = append(order.Items, OrderItem{
ID: itemID, ProductID: productID, Quantity: quantity,
})
} else {
ordersMap[orderID] = &Order{
ID: orderID, UserID: userID,
Items: []OrderItem{{ID: itemID, ProductID: productID, Quantity: quantity}},
}
}
}
// 转换为切片
var orders []Order
for _, order := range ordersMap {
orders = append(orders, *order)
}
return orders, nil
}性能对比:
- N+1 查询:100 次查询,500ms
- JOIN 查询:1 次查询,50ms
- 性能提升:10x
-- ❌ 深分页(扫描大量行)
SELECT * FROM users ORDER BY id LIMIT 1000000, 10;
-- ✅ 使用索引优化
SELECT * FROM users WHERE id > 1000000 ORDER BY id LIMIT 10;
-- ✅ 延迟关联(先查 ID,再查数据)
SELECT u.* FROM users u
INNER JOIN (
SELECT id FROM users ORDER BY id LIMIT 1000000, 10
) AS tmp ON u.id = tmp.id;-- ❌ 子查询(每行都执行一次)
SELECT * FROM orders
WHERE user_id IN (
SELECT id FROM users WHERE city = 'Beijing'
);
-- ✅ JOIN 替代子查询
SELECT o.* FROM orders o
INNER JOIN users u ON o.user_id = u.id
WHERE u.city = 'Beijing';import (
"database/sql"
"time"
_ "github.com/go-sql-driver/mysql"
)
// ✅ 优化的连接池配置
func createOptimizedDB(dsn string) (*sql.DB, error) {
db, err := sql.Open("mysql", dsn)
if err != nil {
return nil, err
}
// 最大打开连接数
db.SetMaxOpenConns(100)
// 最大空闲连接数
db.SetMaxIdleConns(10)
// 连接最大生命周期
db.SetConnMaxLifetime(time.Hour)
// 连接最大空闲时间
db.SetConnMaxIdleTime(10 * time.Minute)
// 验证连接
if err := db.Ping(); err != nil {
return nil, err
}
return db, nil
}配置说明:
MaxOpenConns:防止数据库连接数过多MaxIdleConns:减少连接创建开销ConnMaxLifetime:避免长连接问题ConnMaxIdleTime:释放空闲连接
import "database/sql"
// ✅ 读写分离
type DBCluster struct {
master *sql.DB
slaves []*sql.DB
next int
}
func NewDBCluster(masterDSN string, slaveDSNs []string) (*DBCluster, error) {
master, err := sql.Open("mysql", masterDSN)
if err != nil {
return nil, err
}
var slaves []*sql.DB
for _, dsn := range slaveDSNs {
slave, err := sql.Open("mysql", dsn)
if err != nil {
return nil, err
}
slaves = append(slaves, slave)
}
return &DBCluster{
master: master,
slaves: slaves,
}, nil
}
// 写操作:使用主库
func (c *DBCluster) Exec(query string, args ...interface{}) (sql.Result, error) {
return c.master.Exec(query, args...)
}
// 读操作:使用从库(轮询)
func (c *DBCluster) Query(query string, args ...interface{}) (*sql.Rows, error) {
slave := c.getSlave()
return slave.Query(query, args...)
}
func (c *DBCluster) getSlave() *sql.DB {
if len(c.slaves) == 0 {
return c.master
}
slave := c.slaves[c.next%len(c.slaves)]
c.next++
return slave
}// ✅ 强制主库查询(避免主从延迟)
func (c *DBCluster) QueryMaster(query string, args ...interface{}) (*sql.Rows, error) {
return c.master.Query(query, args...)
}
// 使用示例
func createAndGetUser(db *DBCluster, name string) (*User, error) {
// 写入主库
result, _ := db.Exec("INSERT INTO users (name) VALUES (?)", name)
userID, _ := result.LastInsertId()
// 立即查询,使用主库(避免主从延迟)
row := db.master.QueryRow("SELECT * FROM users WHERE id = ?", userID)
var user User
row.Scan(&user.ID, &user.Name)
return &user, nil
}// ✅ 按 user_id 分表(1024 张表)
func getTableName(userID int64) string {
tableIndex := userID % 1024
return fmt.Sprintf("users_%04d", tableIndex)
}
func insertUser(db *sql.DB, user User) error {
tableName := getTableName(user.ID)
query := fmt.Sprintf("INSERT INTO %s (id, name, age) VALUES (?, ?, ?)", tableName)
_, err := db.Exec(query, user.ID, user.Name, user.Age)
return err
}
func getUser(db *sql.DB, userID int64) (*User, error) {
tableName := getTableName(userID)
query := fmt.Sprintf("SELECT * FROM %s WHERE id = ?", tableName)
var user User
err := db.QueryRow(query, userID).Scan(&user.ID, &user.Name, &user.Age)
return &user, err
}// ✅ 按业务分库
type DBManager struct {
userDB *sql.DB // 用户库
orderDB *sql.DB // 订单库
productDB *sql.DB // 商品库
}
func (m *DBManager) GetUserDB() *sql.DB {
return m.userDB
}
func (m *DBManager) GetOrderDB() *sql.DB {
return m.orderDB
}
func (m *DBManager) GetProductDB() *sql.DB {
return m.productDB
}// ❌ 逐条插入
func insertUsersSlow(db *sql.DB, users []User) error {
for _, user := range users {
_, err := db.Exec("INSERT INTO users (name, age) VALUES (?, ?)",
user.Name, user.Age)
if err != nil {
return err
}
}
return nil
}
// ✅ 批量插入
func insertUsersFast(db *sql.DB, users []User) error {
if len(users) == 0 {
return nil
}
// 构建批量插入语句
query := "INSERT INTO users (name, age) VALUES "
values := make([]interface{}, 0, len(users)*2)
for i, user := range users {
if i > 0 {
query += ","
}
query += "(?, ?)"
values = append(values, user.Name, user.Age)
}
_, err := db.Exec(query, values...)
return err
}
// ✅ 分批插入(避免单次过大)
func insertUsersBatch(db *sql.DB, users []User, batchSize int) error {
for i := 0; i < len(users); i += batchSize {
end := i + batchSize
if end > len(users) {
end = len(users)
}
if err := insertUsersFast(db, users[i:end]); err != nil {
return err
}
}
return nil
}性能对比(插入 10000 条):
- 逐条插入:10000 次网络往返,10s
- 批量插入:1 次网络往返,100ms
- 性能提升:100x
// ✅ 使用事务批量更新
func batchUpdate(db *sql.DB, updates []Update) error {
tx, err := db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
stmt, err := tx.Prepare("UPDATE users SET name = ? WHERE id = ?")
if err != nil {
return err
}
defer stmt.Close()
for _, update := range updates {
_, err := stmt.Exec(update.Name, update.ID)
if err != nil {
return err
}
}
return tx.Commit()
}// ❌ 事务范围过大
func badTransaction(db *sql.DB) error {
tx, _ := db.Begin()
defer tx.Rollback()
// 耗时操作(不应在事务中)
data := fetchExternalAPI() // 1秒
// 数据库操作
tx.Exec("INSERT INTO logs (data) VALUES (?)", data)
return tx.Commit()
}
// ✅ 缩小事务范围
func goodTransaction(db *sql.DB) error {
// 耗时操作在事务外
data := fetchExternalAPI() // 1秒
// 快速事务
tx, _ := db.Begin()
defer tx.Rollback()
tx.Exec("INSERT INTO logs (data) VALUES (?)", data)
return tx.Commit()
}// ✅ 乐观锁(版本号)
func updateWithOptimisticLock(db *sql.DB, userID int64, newBalance int, version int) error {
result, err := db.Exec(`
UPDATE users
SET balance = ?, version = version + 1
WHERE id = ? AND version = ?
`, newBalance, userID, version)
if err != nil {
return err
}
affected, _ := result.RowsAffected()
if affected == 0 {
return errors.New("version conflict")
}
return nil
}
// ✅ 悲观锁(FOR UPDATE)
func updateWithPessimisticLock(db *sql.DB, userID int64) error {
tx, _ := db.Begin()
defer tx.Rollback()
// 锁定行
var balance int
tx.QueryRow("SELECT balance FROM users WHERE id = ? FOR UPDATE", userID).
Scan(&balance)
// 更新
newBalance := balance - 100
tx.Exec("UPDATE users SET balance = ? WHERE id = ?", newBalance, userID)
return tx.Commit()
}# my.cnf 配置
[mysqld]
# 缓冲池大小(物理内存的 70-80%)
innodb_buffer_pool_size = 8G
# 日志文件大小
innodb_log_file_size = 1G
# 刷新策略(1=最安全,2=折中,0=最快)
innodb_flush_log_at_trx_commit = 2
# 每次刷新页数
innodb_flush_neighbors = 0 # SSD 设为 0
# 并发线程数
innodb_thread_concurrency = 0 # 0 表示无限制
# IO 线程数
innodb_read_io_threads = 4
innodb_write_io_threads = 4
# 缓冲池实例数(减少锁竞争)
innodb_buffer_pool_instances = 8# 查询缓存(慎用,高并发写入场景反而降低性能)
query_cache_type = 0 # 关闭
query_cache_size = 0-- 开启慢查询日志
SET GLOBAL slow_query_log = 'ON';
SET GLOBAL long_query_time = 1; -- 1秒
-- 查看慢查询统计
SHOW GLOBAL STATUS LIKE 'Slow_queries';
-- 分析慢查询日志
mysqldumpslow -s t -t 10 /var/log/mysql/slow.log-- 查看当前连接数
SHOW PROCESSLIST;
-- 查看连接统计
SHOW GLOBAL STATUS LIKE 'Threads_%';
-- Threads_connected: 当前连接数
-- Threads_running: 正在运行的线程数
-- Max_used_connections: 历史最大连接数import (
"database/sql"
"time"
)
// ✅ 监控数据库连接池
func monitorDBStats(db *sql.DB) {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for range ticker.C {
stats := db.Stats()
log.Printf("DB Stats: "+
"MaxOpenConns=%d, OpenConns=%d, InUse=%d, Idle=%d, "+
"WaitCount=%d, WaitDuration=%v",
stats.MaxOpenConnections,
stats.OpenConnections,
stats.InUse,
stats.Idle,
stats.WaitCount,
stats.WaitDuration,
)
}
}- 是否为查询条件添加了索引?
- 是否利用了覆盖索引?
- 是否避免了索引失效?(函数、计算、LIKE '%x')
- 是否定期清理无用索引?
- 是否使用 EXPLAIN 分析?
- 是否避免了 N+1 查询?
- 是否优化了分页查询?
- 是否用 JOIN 替代子查询?
- 是否配置了合适的连接池大小?
- 是否设置了连接超时?
- 是否监控了连接池状态?
- 是否实现了读写分离?
- 是否实现了分库分表?
- 是否使用了批量操作?
- 是否缩小了事务范围?
核心要点:
- ✅ 索引优化:合理创建索引,避免索引失效
- ✅ 查询优化:避免 N+1、优化分页、使用 JOIN
- ✅ 连接池:合理配置连接数,监控状态
- ✅ 批量操作:减少网络往返,提升吞吐量
- ✅ 读写分离:分散读压力,提升性能
优化优先级:
索引优化 > 查询优化 > 批量操作 > 连接池 > 读写分离