Skip to content

Latest commit

 

History

History
710 lines (538 loc) · 15.5 KB

File metadata and controls

710 lines (538 loc) · 15.5 KB

11.6 数据库优化

📍 导航返回目录 | 上一节:网络优化 | 下一节:缓存策略


索引优化

1. 索引原理

B+树索引(MySQL InnoDB):

  • 叶子节点存储数据
  • 非叶子节点存储索引
  • 所有叶子节点形成链表
-- 创建索引
CREATE INDEX idx_user_name ON users(name);

-- 复合索引
CREATE INDEX idx_user_name_age ON users(name, age);

-- 唯一索引
CREATE UNIQUE INDEX idx_user_email ON users(email);

2. 索引选择原则

最左前缀匹配

-- 索引:(name, age, city)

-- ✅ 使用索引
SELECT * FROM users WHERE name = 'Alice';
SELECT * FROM users WHERE name = 'Alice' AND age = 25;
SELECT * FROM users WHERE name = 'Alice' AND age = 25 AND city = 'Beijing';

-- ❌ 不使用索引
SELECT * FROM users WHERE age = 25;
SELECT * FROM users WHERE city = 'Beijing';

覆盖索引

-- 索引:(name, age)

-- ✅ 覆盖索引(无需回表)
SELECT name, age FROM users WHERE name = 'Alice';

-- ❌ 非覆盖索引(需要回表)
SELECT name, age, city FROM users WHERE name = 'Alice';

3. 索引失效场景

-- ❌ 索引列使用函数
SELECT * FROM users WHERE UPPER(name) = 'ALICE';

-- ✅ 改为
SELECT * FROM users WHERE name = 'Alice';

-- ❌ 索引列参与计算
SELECT * FROM users WHERE age + 1 = 26;

-- ✅ 改为
SELECT * FROM users WHERE age = 25;

-- ❌ 使用 OR(可能失效)
SELECT * FROM users WHERE name = 'Alice' OR city = 'Beijing';

-- ✅ 改为 UNION
SELECT * FROM users WHERE name = 'Alice'
UNION
SELECT * FROM users WHERE city = 'Beijing';

-- ❌ 使用 LIKE '%xxx'
SELECT * FROM users WHERE name LIKE '%Alice';

-- ✅ 改为
SELECT * FROM users WHERE name LIKE 'Alice%';

查询优化

1. EXPLAIN 分析

EXPLAIN SELECT * FROM users WHERE name = 'Alice';

-- 输出关键字段:
-- type: ALL(全表扫描) < index < range < ref < eq_ref < const
-- key: 使用的索引
-- rows: 扫描行数
-- Extra: 额外信息(Using index, Using filesort等)

type 类型对比

  • ALL:全表扫描,最慢
  • index:索引全扫描
  • range:范围扫描(BETWEEN、>、<)
  • ref:非唯一索引扫描
  • eq_ref:唯一索引扫描
  • const:常量查询,最快

2. 慢查询优化

案例 1:N+1 查询问题

// ❌ N+1 查询
func getOrdersWithItems(db *sql.DB, userID int) ([]Order, error) {
    // 查询订单(1次)
    rows, _ := db.Query("SELECT * FROM orders WHERE user_id = ?", userID)
    defer rows.Close()
    
    var orders []Order
    for rows.Next() {
        var order Order
        rows.Scan(&order.ID, &order.UserID)
        
        // 查询订单项(N次)
        items, _ := db.Query("SELECT * FROM order_items WHERE order_id = ?", order.ID)
        // ...
        orders = append(orders, order)
    }
    return orders, nil
}

// ✅ JOIN 查询
func getOrdersWithItemsOptimized(db *sql.DB, userID int) ([]Order, error) {
    query := `
        SELECT o.id, o.user_id, oi.id, oi.product_id, oi.quantity
        FROM orders o
        LEFT JOIN order_items oi ON o.id = oi.order_id
        WHERE o.user_id = ?
    `
    
    rows, _ := db.Query(query, userID)
    defer rows.Close()
    
    ordersMap := make(map[int]*Order)
    for rows.Next() {
        var orderID, userID, itemID, productID, quantity int
        rows.Scan(&orderID, &userID, &itemID, &productID, &quantity)
        
        if order, ok := ordersMap[orderID]; ok {
            order.Items = append(order.Items, OrderItem{
                ID: itemID, ProductID: productID, Quantity: quantity,
            })
        } else {
            ordersMap[orderID] = &Order{
                ID: orderID, UserID: userID,
                Items: []OrderItem{{ID: itemID, ProductID: productID, Quantity: quantity}},
            }
        }
    }
    
    // 转换为切片
    var orders []Order
    for _, order := range ordersMap {
        orders = append(orders, *order)
    }
    return orders, nil
}

性能对比

  • N+1 查询:100 次查询,500ms
  • JOIN 查询:1 次查询,50ms
  • 性能提升:10x

案例 2:分页优化

-- ❌ 深分页(扫描大量行)
SELECT * FROM users ORDER BY id LIMIT 1000000, 10;

-- ✅ 使用索引优化
SELECT * FROM users WHERE id > 1000000 ORDER BY id LIMIT 10;

-- ✅ 延迟关联(先查 ID,再查数据)
SELECT u.* FROM users u
INNER JOIN (
    SELECT id FROM users ORDER BY id LIMIT 1000000, 10
) AS tmp ON u.id = tmp.id;

3. 子查询优化

-- ❌ 子查询(每行都执行一次)
SELECT * FROM orders
WHERE user_id IN (
    SELECT id FROM users WHERE city = 'Beijing'
);

-- ✅ JOIN 替代子查询
SELECT o.* FROM orders o
INNER JOIN users u ON o.user_id = u.id
WHERE u.city = 'Beijing';

连接池优化

Go 数据库连接池配置

import (
    "database/sql"
    "time"
    _ "github.com/go-sql-driver/mysql"
)

// ✅ 优化的连接池配置
func createOptimizedDB(dsn string) (*sql.DB, error) {
    db, err := sql.Open("mysql", dsn)
    if err != nil {
        return nil, err
    }
    
    // 最大打开连接数
    db.SetMaxOpenConns(100)
    
    // 最大空闲连接数
    db.SetMaxIdleConns(10)
    
    // 连接最大生命周期
    db.SetConnMaxLifetime(time.Hour)
    
    // 连接最大空闲时间
    db.SetConnMaxIdleTime(10 * time.Minute)
    
    // 验证连接
    if err := db.Ping(); err != nil {
        return nil, err
    }
    
    return db, nil
}

配置说明

  • MaxOpenConns:防止数据库连接数过多
  • MaxIdleConns:减少连接创建开销
  • ConnMaxLifetime:避免长连接问题
  • ConnMaxIdleTime:释放空闲连接

读写分离

1. 主从架构

import "database/sql"

// ✅ 读写分离
type DBCluster struct {
    master *sql.DB
    slaves []*sql.DB
    next   int
}

func NewDBCluster(masterDSN string, slaveDSNs []string) (*DBCluster, error) {
    master, err := sql.Open("mysql", masterDSN)
    if err != nil {
        return nil, err
    }
    
    var slaves []*sql.DB
    for _, dsn := range slaveDSNs {
        slave, err := sql.Open("mysql", dsn)
        if err != nil {
            return nil, err
        }
        slaves = append(slaves, slave)
    }
    
    return &DBCluster{
        master: master,
        slaves: slaves,
    }, nil
}

// 写操作:使用主库
func (c *DBCluster) Exec(query string, args ...interface{}) (sql.Result, error) {
    return c.master.Exec(query, args...)
}

// 读操作:使用从库(轮询)
func (c *DBCluster) Query(query string, args ...interface{}) (*sql.Rows, error) {
    slave := c.getSlave()
    return slave.Query(query, args...)
}

func (c *DBCluster) getSlave() *sql.DB {
    if len(c.slaves) == 0 {
        return c.master
    }
    
    slave := c.slaves[c.next%len(c.slaves)]
    c.next++
    return slave
}

2. 主从延迟处理

// ✅ 强制主库查询(避免主从延迟)
func (c *DBCluster) QueryMaster(query string, args ...interface{}) (*sql.Rows, error) {
    return c.master.Query(query, args...)
}

// 使用示例
func createAndGetUser(db *DBCluster, name string) (*User, error) {
    // 写入主库
    result, _ := db.Exec("INSERT INTO users (name) VALUES (?)", name)
    userID, _ := result.LastInsertId()
    
    // 立即查询,使用主库(避免主从延迟)
    row := db.master.QueryRow("SELECT * FROM users WHERE id = ?", userID)
    
    var user User
    row.Scan(&user.ID, &user.Name)
    return &user, nil
}

分库分表

1. 水平分表

// ✅ 按 user_id 分表(1024 张表)
func getTableName(userID int64) string {
    tableIndex := userID % 1024
    return fmt.Sprintf("users_%04d", tableIndex)
}

func insertUser(db *sql.DB, user User) error {
    tableName := getTableName(user.ID)
    query := fmt.Sprintf("INSERT INTO %s (id, name, age) VALUES (?, ?, ?)", tableName)
    _, err := db.Exec(query, user.ID, user.Name, user.Age)
    return err
}

func getUser(db *sql.DB, userID int64) (*User, error) {
    tableName := getTableName(userID)
    query := fmt.Sprintf("SELECT * FROM %s WHERE id = ?", tableName)
    
    var user User
    err := db.QueryRow(query, userID).Scan(&user.ID, &user.Name, &user.Age)
    return &user, err
}

2. 垂直分库

// ✅ 按业务分库
type DBManager struct {
    userDB    *sql.DB  // 用户库
    orderDB   *sql.DB  // 订单库
    productDB *sql.DB  // 商品库
}

func (m *DBManager) GetUserDB() *sql.DB {
    return m.userDB
}

func (m *DBManager) GetOrderDB() *sql.DB {
    return m.orderDB
}

func (m *DBManager) GetProductDB() *sql.DB {
    return m.productDB
}

批量操作

1. 批量插入

// ❌ 逐条插入
func insertUsersSlow(db *sql.DB, users []User) error {
    for _, user := range users {
        _, err := db.Exec("INSERT INTO users (name, age) VALUES (?, ?)",
            user.Name, user.Age)
        if err != nil {
            return err
        }
    }
    return nil
}

// ✅ 批量插入
func insertUsersFast(db *sql.DB, users []User) error {
    if len(users) == 0 {
        return nil
    }
    
    // 构建批量插入语句
    query := "INSERT INTO users (name, age) VALUES "
    values := make([]interface{}, 0, len(users)*2)
    
    for i, user := range users {
        if i > 0 {
            query += ","
        }
        query += "(?, ?)"
        values = append(values, user.Name, user.Age)
    }
    
    _, err := db.Exec(query, values...)
    return err
}

// ✅ 分批插入(避免单次过大)
func insertUsersBatch(db *sql.DB, users []User, batchSize int) error {
    for i := 0; i < len(users); i += batchSize {
        end := i + batchSize
        if end > len(users) {
            end = len(users)
        }
        
        if err := insertUsersFast(db, users[i:end]); err != nil {
            return err
        }
    }
    return nil
}

性能对比(插入 10000 条):

  • 逐条插入:10000 次网络往返,10s
  • 批量插入:1 次网络往返,100ms
  • 性能提升:100x

2. 批量更新

// ✅ 使用事务批量更新
func batchUpdate(db *sql.DB, updates []Update) error {
    tx, err := db.Begin()
    if err != nil {
        return err
    }
    defer tx.Rollback()
    
    stmt, err := tx.Prepare("UPDATE users SET name = ? WHERE id = ?")
    if err != nil {
        return err
    }
    defer stmt.Close()
    
    for _, update := range updates {
        _, err := stmt.Exec(update.Name, update.ID)
        if err != nil {
            return err
        }
    }
    
    return tx.Commit()
}

事务优化

1. 减少事务范围

// ❌ 事务范围过大
func badTransaction(db *sql.DB) error {
    tx, _ := db.Begin()
    defer tx.Rollback()
    
    // 耗时操作(不应在事务中)
    data := fetchExternalAPI()  // 1秒
    
    // 数据库操作
    tx.Exec("INSERT INTO logs (data) VALUES (?)", data)
    
    return tx.Commit()
}

// ✅ 缩小事务范围
func goodTransaction(db *sql.DB) error {
    // 耗时操作在事务外
    data := fetchExternalAPI()  // 1秒
    
    // 快速事务
    tx, _ := db.Begin()
    defer tx.Rollback()
    
    tx.Exec("INSERT INTO logs (data) VALUES (?)", data)
    
    return tx.Commit()
}

2. 乐观锁 vs 悲观锁

// ✅ 乐观锁(版本号)
func updateWithOptimisticLock(db *sql.DB, userID int64, newBalance int, version int) error {
    result, err := db.Exec(`
        UPDATE users
        SET balance = ?, version = version + 1
        WHERE id = ? AND version = ?
    `, newBalance, userID, version)
    
    if err != nil {
        return err
    }
    
    affected, _ := result.RowsAffected()
    if affected == 0 {
        return errors.New("version conflict")
    }
    return nil
}

// ✅ 悲观锁(FOR UPDATE)
func updateWithPessimisticLock(db *sql.DB, userID int64) error {
    tx, _ := db.Begin()
    defer tx.Rollback()
    
    // 锁定行
    var balance int
    tx.QueryRow("SELECT balance FROM users WHERE id = ? FOR UPDATE", userID).
        Scan(&balance)
    
    // 更新
    newBalance := balance - 100
    tx.Exec("UPDATE users SET balance = ? WHERE id = ?", newBalance, userID)
    
    return tx.Commit()
}

MySQL 参数调优

1. InnoDB 参数

# my.cnf 配置

[mysqld]
# 缓冲池大小(物理内存的 70-80%)
innodb_buffer_pool_size = 8G

# 日志文件大小
innodb_log_file_size = 1G

# 刷新策略(1=最安全,2=折中,0=最快)
innodb_flush_log_at_trx_commit = 2

# 每次刷新页数
innodb_flush_neighbors = 0  # SSD 设为 0

# 并发线程数
innodb_thread_concurrency = 0  # 0 表示无限制

# IO 线程数
innodb_read_io_threads = 4
innodb_write_io_threads = 4

# 缓冲池实例数(减少锁竞争)
innodb_buffer_pool_instances = 8

2. 查询缓存(MySQL 5.7,8.0 已移除)

# 查询缓存(慎用,高并发写入场景反而降低性能)
query_cache_type = 0  # 关闭
query_cache_size = 0

数据库监控

1. 慢查询监控

-- 开启慢查询日志
SET GLOBAL slow_query_log = 'ON';
SET GLOBAL long_query_time = 1;  -- 1秒

-- 查看慢查询统计
SHOW GLOBAL STATUS LIKE 'Slow_queries';

-- 分析慢查询日志
mysqldumpslow -s t -t 10 /var/log/mysql/slow.log

2. 连接数监控

-- 查看当前连接数
SHOW PROCESSLIST;

-- 查看连接统计
SHOW GLOBAL STATUS LIKE 'Threads_%';

-- Threads_connected: 当前连接数
-- Threads_running: 正在运行的线程数
-- Max_used_connections: 历史最大连接数

3. Go 监控指标

import (
    "database/sql"
    "time"
)

// ✅ 监控数据库连接池
func monitorDBStats(db *sql.DB) {
    ticker := time.NewTicker(10 * time.Second)
    defer ticker.Stop()
    
    for range ticker.C {
        stats := db.Stats()
        
        log.Printf("DB Stats: "+
            "MaxOpenConns=%d, OpenConns=%d, InUse=%d, Idle=%d, "+
            "WaitCount=%d, WaitDuration=%v",
            stats.MaxOpenConnections,
            stats.OpenConnections,
            stats.InUse,
            stats.Idle,
            stats.WaitCount,
            stats.WaitDuration,
        )
    }
}

数据库优化检查清单

索引优化

  • 是否为查询条件添加了索引?
  • 是否利用了覆盖索引?
  • 是否避免了索引失效?(函数、计算、LIKE '%x')
  • 是否定期清理无用索引?

查询优化

  • 是否使用 EXPLAIN 分析?
  • 是否避免了 N+1 查询?
  • 是否优化了分页查询?
  • 是否用 JOIN 替代子查询?

连接池

  • 是否配置了合适的连接池大小?
  • 是否设置了连接超时?
  • 是否监控了连接池状态?

架构优化

  • 是否实现了读写分离?
  • 是否实现了分库分表?
  • 是否使用了批量操作?
  • 是否缩小了事务范围?

本章小结

核心要点

  1. 索引优化:合理创建索引,避免索引失效
  2. 查询优化:避免 N+1、优化分页、使用 JOIN
  3. 连接池:合理配置连接数,监控状态
  4. 批量操作:减少网络往返,提升吞吐量
  5. 读写分离:分散读压力,提升性能

优化优先级

索引优化 > 查询优化 > 批量操作 > 连接池 > 读写分离

⏮️ 上一节:网络优化 | ⏭️ 下一节:缓存策略