📍 导航:返回目录 | 上一节:数据库优化 | 下一节:性能分析工具
最常用的模式
import (
"database/sql"
"github.com/go-redis/redis/v8"
"encoding/json"
)
// ✅ Cache Aside 模式
func getUserCacheAside(ctx context.Context, rdb *redis.Client, db *sql.DB, userID int64) (*User, error) {
// 1. 先查缓存
key := fmt.Sprintf("user:%d", userID)
val, err := rdb.Get(ctx, key).Result()
if err == nil {
// 缓存命中
var user User
json.Unmarshal([]byte(val), &user)
return &user, nil
}
// 2. 缓存未命中,查数据库
var user User
err = db.QueryRow("SELECT * FROM users WHERE id = ?", userID).
Scan(&user.ID, &user.Name, &user.Age)
if err != nil {
return nil, err
}
// 3. 写入缓存
data, _ := json.Marshal(user)
rdb.Set(ctx, key, data, 5*time.Minute)
return &user, nil
}
// 更新数据
func updateUserCacheAside(ctx context.Context, rdb *redis.Client, db *sql.DB, user *User) error {
// 1. 先更新数据库
_, err := db.Exec("UPDATE users SET name = ?, age = ? WHERE id = ?",
user.Name, user.Age, user.ID)
if err != nil {
return err
}
// 2. 删除缓存(而非更新)
key := fmt.Sprintf("user:%d", user.ID)
rdb.Del(ctx, key)
return nil
}为什么删除而非更新?
- 避免并发更新导致的数据不一致
- 下次读取时自动从数据库加载最新数据
// ✅ Read Through: 缓存层自动加载
type CacheLayer struct {
rdb *redis.Client
db *sql.DB
}
func (c *CacheLayer) Get(ctx context.Context, key string, loader func() (interface{}, error)) (interface{}, error) {
// 1. 查缓存
val, err := c.rdb.Get(ctx, key).Result()
if err == nil {
return val, nil
}
// 2. 缓存未命中,调用 loader 加载数据
data, err := loader()
if err != nil {
return nil, err
}
// 3. 写入缓存
c.rdb.Set(ctx, key, data, 5*time.Minute)
return data, nil
}
// 使用示例
func getUser(ctx context.Context, cache *CacheLayer, userID int64) (*User, error) {
key := fmt.Sprintf("user:%d", userID)
data, err := cache.Get(ctx, key, func() (interface{}, error) {
var user User
err := cache.db.QueryRow("SELECT * FROM users WHERE id = ?", userID).
Scan(&user.ID, &user.Name, &user.Age)
return user, err
})
user := data.(User)
return &user, err
}// ✅ Write Behind: 先写缓存,异步写数据库
type WriteBehindCache struct {
rdb *redis.Client
db *sql.DB
writeCh chan WriteOp
}
type WriteOp struct {
Key string
Value interface{}
}
func NewWriteBehindCache(rdb *redis.Client, db *sql.DB) *WriteBehindCache {
cache := &WriteBehindCache{
rdb: rdb,
db: db,
writeCh: make(chan WriteOp, 1000),
}
// 启动异步写入 Goroutine
go cache.asyncWriter()
return cache
}
func (c *WriteBehindCache) Set(ctx context.Context, key string, value interface{}) error {
// 1. 立即写缓存
data, _ := json.Marshal(value)
if err := c.rdb.Set(ctx, key, data, 0).Err(); err != nil {
return err
}
// 2. 异步写数据库
c.writeCh <- WriteOp{Key: key, Value: value}
return nil
}
func (c *WriteBehindCache) asyncWriter() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
batch := make([]WriteOp, 0, 100)
for {
select {
case op := <-c.writeCh:
batch = append(batch, op)
if len(batch) >= 100 {
c.flushBatch(batch)
batch = batch[:0]
}
case <-ticker.C:
if len(batch) > 0 {
c.flushBatch(batch)
batch = batch[:0]
}
}
}
}
func (c *WriteBehindCache) flushBatch(batch []WriteOp) {
tx, _ := c.db.Begin()
defer tx.Rollback()
stmt, _ := tx.Prepare("UPDATE users SET data = ? WHERE id = ?")
defer stmt.Close()
for _, op := range batch {
stmt.Exec(op.Value, op.Key)
}
tx.Commit()
}优缺点:
- ✅ 写入性能极高
- ❌ 可能丢失数据(宕机时)
- ❌ 数据一致性弱
// ❌ 缓存穿透:恶意查询不存在的 ID
for i := 0; i < 10000; i++ {
getUserCacheAside(ctx, rdb, db, 999999+i) // 这些 ID 不存在
}
// 每次都查数据库,缓存失效import "github.com/bits-and-blooms/bloom/v3"
// ✅ 使用布隆过滤器
var userBloomFilter = bloom.NewWithEstimates(1000000, 0.01) // 100万元素,1%误判率
// 启动时加载所有用户 ID
func initBloomFilter(db *sql.DB) {
rows, _ := db.Query("SELECT id FROM users")
defer rows.Close()
for rows.Next() {
var id int64
rows.Scan(&id)
userBloomFilter.Add([]byte(fmt.Sprintf("%d", id)))
}
}
// 查询前先检查布隆过滤器
func getUserWithBloom(ctx context.Context, rdb *redis.Client, db *sql.DB, userID int64) (*User, error) {
// 1. 布隆过滤器检查
if !userBloomFilter.Test([]byte(fmt.Sprintf("%d", userID))) {
return nil, errors.New("user not exists") // 肯定不存在
}
// 2. 正常的缓存查询流程
return getUserCacheAside(ctx, rdb, db, userID)
}// ✅ 缓存空值(短过期时间)
func getUserWithNullCache(ctx context.Context, rdb *redis.Client, db *sql.DB, userID int64) (*User, error) {
key := fmt.Sprintf("user:%d", userID)
// 1. 查缓存
val, err := rdb.Get(ctx, key).Result()
if err == nil {
if val == "null" {
return nil, errors.New("user not exists")
}
var user User
json.Unmarshal([]byte(val), &user)
return &user, nil
}
// 2. 查数据库
var user User
err = db.QueryRow("SELECT * FROM users WHERE id = ?", userID).
Scan(&user.ID, &user.Name, &user.Age)
if err == sql.ErrNoRows {
// 缓存空值,过期时间较短
rdb.Set(ctx, key, "null", 1*time.Minute)
return nil, errors.New("user not exists")
}
// 3. 缓存数据
data, _ := json.Marshal(user)
rdb.Set(ctx, key, data, 5*time.Minute)
return &user, nil
}// ❌ 热点 Key 失效,大量请求打到数据库
// 某个热门用户缓存过期,瞬间 10000 个请求
for i := 0; i < 10000; i++ {
go getUserCacheAside(ctx, rdb, db, 12345)
}import "sync"
// ✅ 使用互斥锁防止击穿
var mutexMap = sync.Map{}
func getUserWithMutex(ctx context.Context, rdb *redis.Client, db *sql.DB, userID int64) (*User, error) {
key := fmt.Sprintf("user:%d", userID)
// 1. 查缓存
val, err := rdb.Get(ctx, key).Result()
if err == nil {
var user User
json.Unmarshal([]byte(val), &user)
return &user, nil
}
// 2. 获取互斥锁
mutexKey := fmt.Sprintf("lock:%d", userID)
mutex, _ := mutexMap.LoadOrStore(mutexKey, &sync.Mutex{})
mu := mutex.(*sync.Mutex)
mu.Lock()
defer mu.Unlock()
// 3. Double Check:再次查缓存
val, err = rdb.Get(ctx, key).Result()
if err == nil {
var user User
json.Unmarshal([]byte(val), &user)
return &user, nil
}
// 4. 查数据库并更新缓存
var user User
err = db.QueryRow("SELECT * FROM users WHERE id = ?", userID).
Scan(&user.ID, &user.Name, &user.Age)
if err != nil {
return nil, err
}
data, _ := json.Marshal(user)
rdb.Set(ctx, key, data, 5*time.Minute)
return &user, nil
}// ❌ 所有缓存同时设置 5 分钟过期
for _, user := range users {
data, _ := json.Marshal(user)
rdb.Set(ctx, fmt.Sprintf("user:%d", user.ID), data, 5*time.Minute)
}
// 5 分钟后,所有缓存同时失效// ✅ 过期时间加随机值(避免同时失效)
func setUserCacheWithRandomTTL(ctx context.Context, rdb *redis.Client, user *User) error {
key := fmt.Sprintf("user:%d", user.ID)
data, _ := json.Marshal(user)
// 5 分钟 + 随机 0-60 秒
ttl := 5*time.Minute + time.Duration(rand.Intn(60))*time.Second
return rdb.Set(ctx, key, data, ttl).Err()
}import (
"github.com/patrickmn/go-cache"
"github.com/go-redis/redis/v8"
)
// ✅ 两级缓存
type TwoLevelCache struct {
localCache *cache.Cache
redisCache *redis.Client
db *sql.DB
}
func NewTwoLevelCache(rdb *redis.Client, db *sql.DB) *TwoLevelCache {
return &TwoLevelCache{
localCache: cache.New(1*time.Minute, 10*time.Minute),
redisCache: rdb,
db: db,
}
}
func (c *TwoLevelCache) Get(ctx context.Context, userID int64) (*User, error) {
key := fmt.Sprintf("user:%d", userID)
// 1. 查本地缓存
if val, found := c.localCache.Get(key); found {
return val.(*User), nil
}
// 2. 查 Redis
val, err := c.redisCache.Get(ctx, key).Result()
if err == nil {
var user User
json.Unmarshal([]byte(val), &user)
// 写入本地缓存
c.localCache.Set(key, &user, 1*time.Minute)
return &user, nil
}
// 3. 查数据库
var user User
err = c.db.QueryRow("SELECT * FROM users WHERE id = ?", userID).
Scan(&user.ID, &user.Name, &user.Age)
if err != nil {
return nil, err
}
// 4. 写入 Redis 和本地缓存
data, _ := json.Marshal(user)
c.redisCache.Set(ctx, key, data, 5*time.Minute)
c.localCache.Set(key, &user, 1*time.Minute)
return &user, nil
}
func (c *TwoLevelCache) Delete(ctx context.Context, userID int64) {
key := fmt.Sprintf("user:%d", userID)
c.localCache.Delete(key)
c.redisCache.Del(ctx, key)
}请求流程:
1. CDN(静态资源)
2. Nginx 本地缓存(OpenResty + Lua)
3. Redis(热点数据)
4. 数据库(冷数据)
Nginx + Lua 缓存:
-- nginx.conf
location /api/user {
content_by_lua_block {
local redis = require "resty.redis"
local red = redis:new()
-- 连接 Redis
local ok, err = red:connect("127.0.0.1", 6379)
-- 查询缓存
local res, err = red:get("user:" .. ngx.var.arg_id)
if res ~= ngx.null then
-- 缓存命中
ngx.say(res)
return
end
-- 缓存未命中,回源到后端服务
ngx.exec("@backend")
}
}
location @backend {
proxy_pass http://backend_servers;
}// ✅ 启动时加载热点数据
func warmupCache(ctx context.Context, rdb *redis.Client, db *sql.DB) error {
// 查询热点用户(例如:最近活跃的 1000 个用户)
rows, err := db.Query(`
SELECT id, name, age
FROM users
ORDER BY last_active_time DESC
LIMIT 1000
`)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var user User
rows.Scan(&user.ID, &user.Name, &user.Age)
// 写入缓存
key := fmt.Sprintf("user:%d", user.ID)
data, _ := json.Marshal(user)
rdb.Set(ctx, key, data, 10*time.Minute)
}
log.Println("Cache warmup completed")
return nil
}// ✅ 定时刷新热点数据
func startCacheRefresher(ctx context.Context, rdb *redis.Client, db *sql.DB) {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for range ticker.C {
warmupCache(ctx, rdb, db)
}
}// ✅ 使用 Redis 统计访问频率
func trackAccess(ctx context.Context, rdb *redis.Client, userID int64) {
key := fmt.Sprintf("access:user:%d", userID)
rdb.Incr(ctx, key)
rdb.Expire(ctx, key, 1*time.Hour)
}
// 获取热点数据
func getHotUsers(ctx context.Context, rdb *redis.Client) ([]int64, error) {
keys, _ := rdb.Keys(ctx, "access:user:*").Result()
type userAccess struct {
UserID int64
Count int64
}
var accesses []userAccess
for _, key := range keys {
count, _ := rdb.Get(ctx, key).Int64()
var userID int64
fmt.Sscanf(key, "access:user:%d", &userID)
accesses = append(accesses, userAccess{UserID: userID, Count: count})
}
// 按访问次数排序
sort.Slice(accesses, func(i, j int) bool {
return accesses[i].Count > accesses[j].Count
})
// 返回 Top 100
var hotUsers []int64
for i := 0; i < 100 && i < len(accesses); i++ {
hotUsers = append(hotUsers, accesses[i].UserID)
}
return hotUsers, nil
}# Redis 配置(LFU:最少使用淘汰)
maxmemory 2gb
maxmemory-policy allkeys-lfu// ✅ 延迟双删策略
func updateUserWithDelayedDelete(ctx context.Context, rdb *redis.Client, db *sql.DB, user *User) error {
key := fmt.Sprintf("user:%d", user.ID)
// 1. 删除缓存
rdb.Del(ctx, key)
// 2. 更新数据库
_, err := db.Exec("UPDATE users SET name = ?, age = ? WHERE id = ?",
user.Name, user.Age, user.ID)
if err != nil {
return err
}
// 3. 延迟再次删除缓存(防止读取到旧数据)
go func() {
time.Sleep(500 * time.Millisecond) // 延迟 500ms
rdb.Del(ctx, key)
}()
return nil
}// ✅ 监听 MySQL Binlog 自动更新缓存
import "github.com/withlin/canal-go/client"
func startCanalSync(ctx context.Context, rdb *redis.Client) {
connector := client.NewSimpleCanalConnector(
"127.0.0.1", 11111, "", "", "test", 60000, 60*60*1000,
)
connector.Connect()
connector.Subscribe("test\\.users") // 订阅 users 表
for {
message, _ := connector.Get(100, nil, nil)
for _, entry := range message.Entries {
if entry.EntryType == client.EntryType_ROWDATA {
// 解析 Binlog
rowChange := &client.RowChange{}
rowChange.UnmarshalBinary(entry.StoreValue)
for _, rowData := range rowChange.RowDatas {
if rowChange.EventType == client.EventType_UPDATE {
// 删除对应缓存
userID := rowData.AfterColumns[0].Value
rdb.Del(ctx, fmt.Sprintf("user:%s", userID))
}
}
}
}
connector.Ack(message.Id)
}
}- 是否选择了合适的缓存更新策略?
- 是否防范了缓存穿透?(布隆过滤器、空值缓存)
- 是否防范了缓存击穿?(互斥锁)
- 是否防范了缓存雪崩?(随机过期时间)
- 是否使用了本地缓存?
- 是否使用了 CDN?
- 是否使用了 Nginx 缓存?
- 是否实现了缓存预热?
- 是否识别了热点数据?
- 是否保证了缓存一致性?
- 是否监控了缓存命中率?
核心要点:
- ✅ 更新策略:Cache Aside 最常用,Write Behind 高性能
- ✅ 三大问题:穿透(布隆过滤器)、击穿(互斥锁)、雪崩(随机过期)
- ✅ 多级缓存:本地缓存 + Redis + CDN 提升命中率
- ✅ 缓存预热:启动时加载热点数据
- ✅ 一致性:延迟双删、Binlog 同步
优化优先级:
防范三大问题 > 多级缓存 > 缓存预热 > 一致性保证
⏮️ 上一节:数据库优化 | ⏭️ 下一节:性能分析工具