Skip to content

Latest commit

 

History

History
695 lines (541 loc) · 16.6 KB

File metadata and controls

695 lines (541 loc) · 16.6 KB

11.7 缓存策略

📍 导航返回目录 | 上一节:数据库优化 | 下一节:性能分析工具


缓存更新策略

1. Cache Aside(旁路缓存)

最常用的模式

import (
    "database/sql"
    "github.com/go-redis/redis/v8"
    "encoding/json"
)

// ✅ Cache Aside 模式
func getUserCacheAside(ctx context.Context, rdb *redis.Client, db *sql.DB, userID int64) (*User, error) {
    // 1. 先查缓存
    key := fmt.Sprintf("user:%d", userID)
    val, err := rdb.Get(ctx, key).Result()
    
    if err == nil {
        // 缓存命中
        var user User
        json.Unmarshal([]byte(val), &user)
        return &user, nil
    }
    
    // 2. 缓存未命中,查数据库
    var user User
    err = db.QueryRow("SELECT * FROM users WHERE id = ?", userID).
        Scan(&user.ID, &user.Name, &user.Age)
    if err != nil {
        return nil, err
    }
    
    // 3. 写入缓存
    data, _ := json.Marshal(user)
    rdb.Set(ctx, key, data, 5*time.Minute)
    
    return &user, nil
}

// 更新数据
func updateUserCacheAside(ctx context.Context, rdb *redis.Client, db *sql.DB, user *User) error {
    // 1. 先更新数据库
    _, err := db.Exec("UPDATE users SET name = ?, age = ? WHERE id = ?",
        user.Name, user.Age, user.ID)
    if err != nil {
        return err
    }
    
    // 2. 删除缓存(而非更新)
    key := fmt.Sprintf("user:%d", user.ID)
    rdb.Del(ctx, key)
    
    return nil
}

为什么删除而非更新?

  • 避免并发更新导致的数据不一致
  • 下次读取时自动从数据库加载最新数据

2. Read Through / Write Through

// ✅ Read Through: 缓存层自动加载
type CacheLayer struct {
    rdb *redis.Client
    db  *sql.DB
}

func (c *CacheLayer) Get(ctx context.Context, key string, loader func() (interface{}, error)) (interface{}, error) {
    // 1. 查缓存
    val, err := c.rdb.Get(ctx, key).Result()
    if err == nil {
        return val, nil
    }
    
    // 2. 缓存未命中,调用 loader 加载数据
    data, err := loader()
    if err != nil {
        return nil, err
    }
    
    // 3. 写入缓存
    c.rdb.Set(ctx, key, data, 5*time.Minute)
    return data, nil
}

// 使用示例
func getUser(ctx context.Context, cache *CacheLayer, userID int64) (*User, error) {
    key := fmt.Sprintf("user:%d", userID)
    
    data, err := cache.Get(ctx, key, func() (interface{}, error) {
        var user User
        err := cache.db.QueryRow("SELECT * FROM users WHERE id = ?", userID).
            Scan(&user.ID, &user.Name, &user.Age)
        return user, err
    })
    
    user := data.(User)
    return &user, err
}

3. Write Behind(异步写入)

// ✅ Write Behind: 先写缓存,异步写数据库
type WriteBehindCache struct {
    rdb      *redis.Client
    db       *sql.DB
    writeCh  chan WriteOp
}

type WriteOp struct {
    Key   string
    Value interface{}
}

func NewWriteBehindCache(rdb *redis.Client, db *sql.DB) *WriteBehindCache {
    cache := &WriteBehindCache{
        rdb:     rdb,
        db:      db,
        writeCh: make(chan WriteOp, 1000),
    }
    
    // 启动异步写入 Goroutine
    go cache.asyncWriter()
    
    return cache
}

func (c *WriteBehindCache) Set(ctx context.Context, key string, value interface{}) error {
    // 1. 立即写缓存
    data, _ := json.Marshal(value)
    if err := c.rdb.Set(ctx, key, data, 0).Err(); err != nil {
        return err
    }
    
    // 2. 异步写数据库
    c.writeCh <- WriteOp{Key: key, Value: value}
    return nil
}

func (c *WriteBehindCache) asyncWriter() {
    ticker := time.NewTicker(1 * time.Second)
    defer ticker.Stop()
    
    batch := make([]WriteOp, 0, 100)
    
    for {
        select {
        case op := <-c.writeCh:
            batch = append(batch, op)
            if len(batch) >= 100 {
                c.flushBatch(batch)
                batch = batch[:0]
            }
        case <-ticker.C:
            if len(batch) > 0 {
                c.flushBatch(batch)
                batch = batch[:0]
            }
        }
    }
}

func (c *WriteBehindCache) flushBatch(batch []WriteOp) {
    tx, _ := c.db.Begin()
    defer tx.Rollback()
    
    stmt, _ := tx.Prepare("UPDATE users SET data = ? WHERE id = ?")
    defer stmt.Close()
    
    for _, op := range batch {
        stmt.Exec(op.Value, op.Key)
    }
    
    tx.Commit()
}

优缺点

  • ✅ 写入性能极高
  • ❌ 可能丢失数据(宕机时)
  • ❌ 数据一致性弱

缓存穿透、击穿、雪崩

1. 缓存穿透(查询不存在的数据)

问题

// ❌ 缓存穿透:恶意查询不存在的 ID
for i := 0; i < 10000; i++ {
    getUserCacheAside(ctx, rdb, db, 999999+i)  // 这些 ID 不存在
}
// 每次都查数据库,缓存失效

解决方案 1:布隆过滤器

import "github.com/bits-and-blooms/bloom/v3"

// ✅ 使用布隆过滤器
var userBloomFilter = bloom.NewWithEstimates(1000000, 0.01) // 100万元素,1%误判率

// 启动时加载所有用户 ID
func initBloomFilter(db *sql.DB) {
    rows, _ := db.Query("SELECT id FROM users")
    defer rows.Close()
    
    for rows.Next() {
        var id int64
        rows.Scan(&id)
        userBloomFilter.Add([]byte(fmt.Sprintf("%d", id)))
    }
}

// 查询前先检查布隆过滤器
func getUserWithBloom(ctx context.Context, rdb *redis.Client, db *sql.DB, userID int64) (*User, error) {
    // 1. 布隆过滤器检查
    if !userBloomFilter.Test([]byte(fmt.Sprintf("%d", userID))) {
        return nil, errors.New("user not exists")  // 肯定不存在
    }
    
    // 2. 正常的缓存查询流程
    return getUserCacheAside(ctx, rdb, db, userID)
}

解决方案 2:缓存空值

// ✅ 缓存空值(短过期时间)
func getUserWithNullCache(ctx context.Context, rdb *redis.Client, db *sql.DB, userID int64) (*User, error) {
    key := fmt.Sprintf("user:%d", userID)
    
    // 1. 查缓存
    val, err := rdb.Get(ctx, key).Result()
    if err == nil {
        if val == "null" {
            return nil, errors.New("user not exists")
        }
        var user User
        json.Unmarshal([]byte(val), &user)
        return &user, nil
    }
    
    // 2. 查数据库
    var user User
    err = db.QueryRow("SELECT * FROM users WHERE id = ?", userID).
        Scan(&user.ID, &user.Name, &user.Age)
    
    if err == sql.ErrNoRows {
        // 缓存空值,过期时间较短
        rdb.Set(ctx, key, "null", 1*time.Minute)
        return nil, errors.New("user not exists")
    }
    
    // 3. 缓存数据
    data, _ := json.Marshal(user)
    rdb.Set(ctx, key, data, 5*time.Minute)
    
    return &user, nil
}

2. 缓存击穿(热点 Key 失效)

问题

// ❌ 热点 Key 失效,大量请求打到数据库
// 某个热门用户缓存过期,瞬间 10000 个请求
for i := 0; i < 10000; i++ {
    go getUserCacheAside(ctx, rdb, db, 12345)
}

解决方案:互斥锁 + 双重检查

import "sync"

// ✅ 使用互斥锁防止击穿
var mutexMap = sync.Map{}

func getUserWithMutex(ctx context.Context, rdb *redis.Client, db *sql.DB, userID int64) (*User, error) {
    key := fmt.Sprintf("user:%d", userID)
    
    // 1. 查缓存
    val, err := rdb.Get(ctx, key).Result()
    if err == nil {
        var user User
        json.Unmarshal([]byte(val), &user)
        return &user, nil
    }
    
    // 2. 获取互斥锁
    mutexKey := fmt.Sprintf("lock:%d", userID)
    mutex, _ := mutexMap.LoadOrStore(mutexKey, &sync.Mutex{})
    mu := mutex.(*sync.Mutex)
    
    mu.Lock()
    defer mu.Unlock()
    
    // 3. Double Check:再次查缓存
    val, err = rdb.Get(ctx, key).Result()
    if err == nil {
        var user User
        json.Unmarshal([]byte(val), &user)
        return &user, nil
    }
    
    // 4. 查数据库并更新缓存
    var user User
    err = db.QueryRow("SELECT * FROM users WHERE id = ?", userID).
        Scan(&user.ID, &user.Name, &user.Age)
    if err != nil {
        return nil, err
    }
    
    data, _ := json.Marshal(user)
    rdb.Set(ctx, key, data, 5*time.Minute)
    
    return &user, nil
}

3. 缓存雪崩(大量 Key 同时失效)

问题

// ❌ 所有缓存同时设置 5 分钟过期
for _, user := range users {
    data, _ := json.Marshal(user)
    rdb.Set(ctx, fmt.Sprintf("user:%d", user.ID), data, 5*time.Minute)
}
// 5 分钟后,所有缓存同时失效

解决方案:过期时间加随机值

// ✅ 过期时间加随机值(避免同时失效)
func setUserCacheWithRandomTTL(ctx context.Context, rdb *redis.Client, user *User) error {
    key := fmt.Sprintf("user:%d", user.ID)
    data, _ := json.Marshal(user)
    
    // 5 分钟 + 随机 0-60 秒
    ttl := 5*time.Minute + time.Duration(rand.Intn(60))*time.Second
    return rdb.Set(ctx, key, data, ttl).Err()
}

多级缓存架构

1. 本地缓存 + Redis

import (
    "github.com/patrickmn/go-cache"
    "github.com/go-redis/redis/v8"
)

// ✅ 两级缓存
type TwoLevelCache struct {
    localCache *cache.Cache
    redisCache *redis.Client
    db         *sql.DB
}

func NewTwoLevelCache(rdb *redis.Client, db *sql.DB) *TwoLevelCache {
    return &TwoLevelCache{
        localCache: cache.New(1*time.Minute, 10*time.Minute),
        redisCache: rdb,
        db:         db,
    }
}

func (c *TwoLevelCache) Get(ctx context.Context, userID int64) (*User, error) {
    key := fmt.Sprintf("user:%d", userID)
    
    // 1. 查本地缓存
    if val, found := c.localCache.Get(key); found {
        return val.(*User), nil
    }
    
    // 2. 查 Redis
    val, err := c.redisCache.Get(ctx, key).Result()
    if err == nil {
        var user User
        json.Unmarshal([]byte(val), &user)
        
        // 写入本地缓存
        c.localCache.Set(key, &user, 1*time.Minute)
        return &user, nil
    }
    
    // 3. 查数据库
    var user User
    err = c.db.QueryRow("SELECT * FROM users WHERE id = ?", userID).
        Scan(&user.ID, &user.Name, &user.Age)
    if err != nil {
        return nil, err
    }
    
    // 4. 写入 Redis 和本地缓存
    data, _ := json.Marshal(user)
    c.redisCache.Set(ctx, key, data, 5*time.Minute)
    c.localCache.Set(key, &user, 1*time.Minute)
    
    return &user, nil
}

func (c *TwoLevelCache) Delete(ctx context.Context, userID int64) {
    key := fmt.Sprintf("user:%d", userID)
    c.localCache.Delete(key)
    c.redisCache.Del(ctx, key)
}

2. CDN + Nginx + Redis + DB

请求流程:
1. CDN(静态资源)
2. Nginx 本地缓存(OpenResty + Lua)
3. Redis(热点数据)
4. 数据库(冷数据)

Nginx + Lua 缓存

-- nginx.conf
location /api/user {
    content_by_lua_block {
        local redis = require "resty.redis"
        local red = redis:new()
        
        -- 连接 Redis
        local ok, err = red:connect("127.0.0.1", 6379)
        
        -- 查询缓存
        local res, err = red:get("user:" .. ngx.var.arg_id)
        
        if res ~= ngx.null then
            -- 缓存命中
            ngx.say(res)
            return
        end
        
        -- 缓存未命中,回源到后端服务
        ngx.exec("@backend")
    }
}

location @backend {
    proxy_pass http://backend_servers;
}

缓存预热

1. 启动时预热

// ✅ 启动时加载热点数据
func warmupCache(ctx context.Context, rdb *redis.Client, db *sql.DB) error {
    // 查询热点用户(例如:最近活跃的 1000 个用户)
    rows, err := db.Query(`
        SELECT id, name, age 
        FROM users 
        ORDER BY last_active_time DESC 
        LIMIT 1000
    `)
    if err != nil {
        return err
    }
    defer rows.Close()
    
    for rows.Next() {
        var user User
        rows.Scan(&user.ID, &user.Name, &user.Age)
        
        // 写入缓存
        key := fmt.Sprintf("user:%d", user.ID)
        data, _ := json.Marshal(user)
        rdb.Set(ctx, key, data, 10*time.Minute)
    }
    
    log.Println("Cache warmup completed")
    return nil
}

2. 定时预热

// ✅ 定时刷新热点数据
func startCacheRefresher(ctx context.Context, rdb *redis.Client, db *sql.DB) {
    ticker := time.NewTicker(5 * time.Minute)
    defer ticker.Stop()
    
    for range ticker.C {
        warmupCache(ctx, rdb, db)
    }
}

热点数据识别

1. 访问计数

// ✅ 使用 Redis 统计访问频率
func trackAccess(ctx context.Context, rdb *redis.Client, userID int64) {
    key := fmt.Sprintf("access:user:%d", userID)
    rdb.Incr(ctx, key)
    rdb.Expire(ctx, key, 1*time.Hour)
}

// 获取热点数据
func getHotUsers(ctx context.Context, rdb *redis.Client) ([]int64, error) {
    keys, _ := rdb.Keys(ctx, "access:user:*").Result()
    
    type userAccess struct {
        UserID int64
        Count  int64
    }
    
    var accesses []userAccess
    for _, key := range keys {
        count, _ := rdb.Get(ctx, key).Int64()
        var userID int64
        fmt.Sscanf(key, "access:user:%d", &userID)
        accesses = append(accesses, userAccess{UserID: userID, Count: count})
    }
    
    // 按访问次数排序
    sort.Slice(accesses, func(i, j int) bool {
        return accesses[i].Count > accesses[j].Count
    })
    
    // 返回 Top 100
    var hotUsers []int64
    for i := 0; i < 100 && i < len(accesses); i++ {
        hotUsers = append(hotUsers, accesses[i].UserID)
    }
    return hotUsers, nil
}

2. LFU 淘汰策略

# Redis 配置(LFU:最少使用淘汰)
maxmemory 2gb
maxmemory-policy allkeys-lfu

缓存一致性

1. 延迟双删

// ✅ 延迟双删策略
func updateUserWithDelayedDelete(ctx context.Context, rdb *redis.Client, db *sql.DB, user *User) error {
    key := fmt.Sprintf("user:%d", user.ID)
    
    // 1. 删除缓存
    rdb.Del(ctx, key)
    
    // 2. 更新数据库
    _, err := db.Exec("UPDATE users SET name = ?, age = ? WHERE id = ?",
        user.Name, user.Age, user.ID)
    if err != nil {
        return err
    }
    
    // 3. 延迟再次删除缓存(防止读取到旧数据)
    go func() {
        time.Sleep(500 * time.Millisecond)  // 延迟 500ms
        rdb.Del(ctx, key)
    }()
    
    return nil
}

2. Canal 监听 Binlog

// ✅ 监听 MySQL Binlog 自动更新缓存
import "github.com/withlin/canal-go/client"

func startCanalSync(ctx context.Context, rdb *redis.Client) {
    connector := client.NewSimpleCanalConnector(
        "127.0.0.1", 11111, "", "", "test", 60000, 60*60*1000,
    )
    
    connector.Connect()
    connector.Subscribe("test\\.users")  // 订阅 users 表
    
    for {
        message, _ := connector.Get(100, nil, nil)
        
        for _, entry := range message.Entries {
            if entry.EntryType == client.EntryType_ROWDATA {
                // 解析 Binlog
                rowChange := &client.RowChange{}
                rowChange.UnmarshalBinary(entry.StoreValue)
                
                for _, rowData := range rowChange.RowDatas {
                    if rowChange.EventType == client.EventType_UPDATE {
                        // 删除对应缓存
                        userID := rowData.AfterColumns[0].Value
                        rdb.Del(ctx, fmt.Sprintf("user:%s", userID))
                    }
                }
            }
        }
        
        connector.Ack(message.Id)
    }
}

缓存优化检查清单

缓存策略

  • 是否选择了合适的缓存更新策略?
  • 是否防范了缓存穿透?(布隆过滤器、空值缓存)
  • 是否防范了缓存击穿?(互斥锁)
  • 是否防范了缓存雪崩?(随机过期时间)

多级缓存

  • 是否使用了本地缓存?
  • 是否使用了 CDN?
  • 是否使用了 Nginx 缓存?

缓存管理

  • 是否实现了缓存预热?
  • 是否识别了热点数据?
  • 是否保证了缓存一致性?
  • 是否监控了缓存命中率?

本章小结

核心要点

  1. 更新策略:Cache Aside 最常用,Write Behind 高性能
  2. 三大问题:穿透(布隆过滤器)、击穿(互斥锁)、雪崩(随机过期)
  3. 多级缓存:本地缓存 + Redis + CDN 提升命中率
  4. 缓存预热:启动时加载热点数据
  5. 一致性:延迟双删、Binlog 同步

优化优先级

防范三大问题 > 多级缓存 > 缓存预热 > 一致性保证

⏮️ 上一节:数据库优化 | ⏭️ 下一节:性能分析工具