Skip to content

Latest commit

 

History

History
838 lines (634 loc) · 18.9 KB

File metadata and controls

838 lines (634 loc) · 18.9 KB

11.9 全链路优化案例

📍 导航返回目录 | 上一节:性能分析工具


案例 1:电商秒杀系统优化

背景

某电商平台秒杀系统,初始QPS为 1000,需要优化到 100000 QPS。

初始架构

客户端 → Nginx → API服务(Go) → MySQL

性能瓶颈

  • QPS:1000
  • P99 延迟:500ms
  • MySQL CPU:90%

优化步骤

第 1 步:性能分析

# 1. 使用 wrk 压测
wrk -t4 -c100 -d30s http://localhost:8080/api/seckill

# 结果:1000 QPS,P99=500ms

# 2. 使用 pprof 分析
go tool pprof http://localhost:6060/debug/pprof/profile?seconds=30

# 发现:80% CPU 花在数据库查询

第 2 步:引入 Redis 缓存

// ✅ 优化前:每次查数据库
func getProduct(db *sql.DB, productID int64) (*Product, error) {
    var product Product
    err := db.QueryRow("SELECT * FROM products WHERE id = ?", productID).
        Scan(&product.ID, &product.Name, &product.Stock)
    return &product, err
}

// ✅ 优化后:先查缓存
func getProductOptimized(ctx context.Context, rdb *redis.Client, db *sql.DB, productID int64) (*Product, error) {
    key := fmt.Sprintf("product:%d", productID)
    
    // 1. 查缓存
    val, err := rdb.Get(ctx, key).Result()
    if err == nil {
        var product Product
        json.Unmarshal([]byte(val), &product)
        return &product, nil
    }
    
    // 2. 查数据库
    var product Product
    err = db.QueryRow("SELECT * FROM products WHERE id = ?", productID).
        Scan(&product.ID, &product.Name, &product.Stock)
    if err != nil {
        return nil, err
    }
    
    // 3. 写缓存
    data, _ := json.Marshal(product)
    rdb.Set(ctx, key, data, 5*time.Minute)
    
    return &product, nil
}

效果

  • QPS:5000 → 5x 提升
  • P99 延迟:100ms → 5x 提升
  • MySQL CPU:20%

第 3 步:Redis 预扣库存

// ✅ Lua 脚本原子扣库存
const deductStockScript = `
local stock = redis.call('GET', KEYS[1])
if tonumber(stock) > 0 then
    redis.call('DECR', KEYS[1])
    return 1
else
    return 0
end
`

func deductStock(ctx context.Context, rdb *redis.Client, productID int64) (bool, error) {
    result, err := rdb.Eval(ctx, deductStockScript,
        []string{fmt.Sprintf("stock:%d", productID)}).Int()
    return result == 1, err
}

效果

  • QPS:10000 → 2x 提升
  • P99 延迟:50ms
  • 库存准确性:100%(Lua 原子操作)

第 4 步:消息队列削峰

import "github.com/Shopify/sarama"

// ✅ 异步下单
func seckillHandler(w http.ResponseWriter, r *http.Request) {
    userID := getUserID(r)
    productID := getProductID(r)
    
    // 1. Redis 预扣库存
    ok, _ := deductStock(ctx, rdb, productID)
    if !ok {
        http.Error(w, "sold out", http.StatusGone)
        return
    }
    
    // 2. 发送到消息队列
    order := Order{
        UserID:    userID,
        ProductID: productID,
        Time:      time.Now(),
    }
    data, _ := json.Marshal(order)
    
    producer.SendMessage(&sarama.ProducerMessage{
        Topic: "seckill.orders",
        Value: sarama.ByteEncoder(data),
    })
    
    // 3. 立即返回
    w.Write([]byte("success"))
}

// 消费者:异步创建订单
func consumeOrders() {
    consumer, _ := sarama.NewConsumer([]string{"localhost:9092"}, nil)
    partitionConsumer, _ := consumer.ConsumePartition("seckill.orders", 0, sarama.OffsetNewest)
    
    for msg := range partitionConsumer.Messages() {
        var order Order
        json.Unmarshal(msg.Value, &order)
        
        // 创建订单(写数据库)
        db.Exec("INSERT INTO orders (user_id, product_id, time) VALUES (?, ?, ?)",
            order.UserID, order.ProductID, order.Time)
    }
}

效果

  • QPS:50000 → 5x 提升
  • P99 延迟:10ms
  • 数据库压力:极低(异步写入)

第 5 步:限流保护

import "golang.org/x/time/rate"

// ✅ 令牌桶限流
var limiter = rate.NewLimiter(50000, 10000) // 50000 QPS,桶容量 10000

func rateLimitMiddleware(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        if !limiter.Allow() {
            http.Error(w, "too many requests", http.StatusTooManyRequests)
            return
        }
        next.ServeHTTP(w, r)
    })
}

第 6 步:CDN 静态资源

# Nginx 配置
location /static {
    expires 1h;
    add_header Cache-Control "public";
}

location /api {
    proxy_pass http://backend;
    proxy_cache my_cache;
    proxy_cache_valid 200 1m;
}

最终架构

CDN → Nginx(缓存) → 限流 → API服务 → Redis(预扣库存)
                                          ↓
                                       Kafka(削峰)
                                          ↓
                                      订单服务 → MySQL

最终效果

指标 优化前 优化后 提升
QPS 1000 100000 100x
P99 延迟 500ms 10ms 50x
MySQL CPU 90% 5% 18x 降低
可用性 99% 99.99% 100x 提升

案例 2:推荐系统延迟优化

背景

某推荐系统 P99 延迟为 500ms,需要优化到 50ms。

初始架构

// ❌ 串行调用多个服务
func getRecommendations(userID int64) ([]Item, error) {
    // 1. 获取用户画像(200ms)
    profile := getUserProfile(userID)
    
    // 2. 召回候选(150ms)
    candidates := recallCandidates(profile)
    
    // 3. 排序(100ms)
    ranked := rankItems(candidates, profile)
    
    // 4. 过滤(50ms)
    filtered := filterItems(ranked)
    
    return filtered, nil
}

总延迟:200 + 150 + 100 + 50 = 500ms

优化步骤

第 1 步:并行调用

// ✅ 并行调用(减少延迟)
func getRecommendationsOptimized(userID int64) ([]Item, error) {
    var wg sync.WaitGroup
    var profile UserProfile
    var candidates []Item
    
    // 1. 并行获取用户画像和候选
    wg.Add(2)
    
    go func() {
        defer wg.Done()
        profile = getUserProfile(userID)
    }()
    
    go func() {
        defer wg.Done()
        candidates = recallCandidatesPrecomputed(userID)
    }()
    
    wg.Wait()
    
    // 2. 排序
    ranked := rankItems(candidates, profile)
    
    // 3. 过滤
    filtered := filterItems(ranked)
    
    return filtered, nil
}

延迟:max(200, 150) + 100 + 50 = 350ms(30% 提升)

第 2 步:预计算 + 缓存

// ✅ 离线预计算候选集
func precomputeCandidates() {
    ticker := time.NewTicker(5 * time.Minute)
    defer ticker.Stop()
    
    for range ticker.C {
        users := getAllActiveUsers()
        
        for _, userID := range users {
            candidates := recallCandidates(userID)
            
            // 缓存到 Redis
            data, _ := json.Marshal(candidates)
            rdb.Set(ctx, fmt.Sprintf("candidates:%d", userID), data, 10*time.Minute)
        }
    }
}

// ✅ 在线直接读缓存
func recallCandidatesPrecomputed(userID int64) []Item {
    val, _ := rdb.Get(ctx, fmt.Sprintf("candidates:%d", userID)).Result()
    
    var candidates []Item
    json.Unmarshal([]byte(val), &candidates)
    return candidates
}

延迟:max(200, 10) + 100 + 50 = 360ms(28% 提升,但候选召回从 150ms → 10ms)

第 3 步:GPU 加速排序

import "github.com/tensorflow/tensorflow/tensorflow/go"

// ✅ 使用 TensorFlow GPU 加速排序
func rankItemsGPU(candidates []Item, profile UserProfile) []Item {
    // 1. 准备输入
    features := extractFeatures(candidates, profile)
    
    // 2. GPU 推理
    tensor, _ := tf.NewTensor(features)
    results, _ := model.Run(
        map[tf.Output]*tf.Tensor{inputOp: tensor},
        []tf.Output{outputOp},
        nil,
    )
    
    // 3. 解析结果
    scores := results[0].Value().([]float32)
    
    // 4. 排序
    sort.Slice(candidates, func(i, j int) bool {
        return scores[i] > scores[j]
    })
    
    return candidates
}

延迟:200 + 10 + 20(GPU) + 50 = 280ms(44% 提升)

第 4 步:本地缓存 + 热加载

import "github.com/patrickmn/go-cache"

// ✅ 本地缓存用户画像
var profileCache = cache.New(1*time.Minute, 10*time.Minute)

func getUserProfileCached(userID int64) UserProfile {
    // 1. 查本地缓存
    if val, found := profileCache.Get(fmt.Sprintf("%d", userID)); found {
        return val.(UserProfile)
    }
    
    // 2. 查 Redis
    val, err := rdb.Get(ctx, fmt.Sprintf("profile:%d", userID)).Result()
    if err == nil {
        var profile UserProfile
        json.Unmarshal([]byte(val), &profile)
        
        // 写入本地缓存
        profileCache.Set(fmt.Sprintf("%d", userID), profile, 1*time.Minute)
        return profile
    }
    
    // 3. 查数据库
    profile := getUserProfileFromDB(userID)
    
    // 写入 Redis 和本地缓存
    data, _ := json.Marshal(profile)
    rdb.Set(ctx, fmt.Sprintf("profile:%d", userID), data, 5*time.Minute)
    profileCache.Set(fmt.Sprintf("%d", userID), profile, 1*time.Minute)
    
    return profile
}

延迟:5(本地缓存) + 10 + 20 + 50 = 85ms(83% 提升)

第 5 步:精简过滤逻辑

// ❌ 过滤逻辑复杂(50ms)
func filterItems(items []Item) []Item {
    var result []Item
    for _, item := range items {
        if complexCheck1(item) &&
           complexCheck2(item) &&
           complexCheck3(item) {
            result = append(result, item)
        }
    }
    return result
}

// ✅ 简化过滤(10ms)
func filterItemsSimple(items []Item) []Item {
    var result []Item
    for _, item := range items {
        if simpleCheck(item) {  // 只保留核心检查
            result = append(result, item)
        }
    }
    return result[:100]  // Top 100
}

延迟:5 + 10 + 20 + 10 = 45ms(91% 提升,达到目标!)

最终效果

优化步骤 延迟 提升
初始 500ms -
1. 并行调用 350ms 30%
2. 预计算缓存 280ms 44%
3. GPU 加速 280ms 44%
4. 本地缓存 85ms 83%
5. 简化过滤 45ms 91%

案例 3:日志系统吞吐量优化

背景

某日志系统吞吐量为 10 MB/s,需要优化到 1 GB/s。

优化步骤

第 1 步:批量写入

// ❌ 逐条写入(10 MB/s)
func writeLogSlow(file *os.File, log string) error {
    _, err := file.WriteString(log + "\n")
    return err
}

// ✅ 批量写入(100 MB/s)
type BatchLogger struct {
    file   *os.File
    buffer []string
    mu     sync.Mutex
    ticker *time.Ticker
}

func (l *BatchLogger) Write(log string) {
    l.mu.Lock()
    l.buffer = append(l.buffer, log)
    
    if len(l.buffer) >= 1000 {
        l.flushLocked()
    }
    l.mu.Unlock()
}

func (l *BatchLogger) flushLocked() {
    if len(l.buffer) == 0 {
        return
    }
    
    data := strings.Join(l.buffer, "\n") + "\n"
    l.file.WriteString(data)
    l.buffer = l.buffer[:0]
}

吞吐量:10 MB/s → 100 MB/s(10x)

第 2 步:异步写入

// ✅ 异步写入(500 MB/s)
type AsyncLogger struct {
    ch     chan string
    file   *os.File
    buffer []string
}

func NewAsyncLogger(filename string) *AsyncLogger {
    file, _ := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
    
    logger := &AsyncLogger{
        ch:     make(chan string, 10000),
        file:   file,
        buffer: make([]string, 0, 1000),
    }
    
    go logger.worker()
    return logger
}

func (l *AsyncLogger) Write(log string) {
    l.ch <- log  // 非阻塞
}

func (l *AsyncLogger) worker() {
    ticker := time.NewTicker(100 * time.Millisecond)
    defer ticker.Stop()
    
    for {
        select {
        case log := <-l.ch:
            l.buffer = append(l.buffer, log)
            
            if len(l.buffer) >= 1000 {
                l.flush()
            }
            
        case <-ticker.C:
            l.flush()
        }
    }
}

func (l *AsyncLogger) flush() {
    if len(l.buffer) == 0 {
        return
    }
    
    data := strings.Join(l.buffer, "\n") + "\n"
    l.file.WriteString(data)
    l.buffer = l.buffer[:0]
}

吞吐量:100 MB/s → 500 MB/s(5x)

第 3 步:零拷贝 + 大缓冲

// ✅ 零拷贝 + 大缓冲(1 GB/s)
type ZeroCopyLogger struct {
    file   *os.File
    writer *bufio.Writer
    ch     chan []byte
}

func NewZeroCopyLogger(filename string) *ZeroCopyLogger {
    file, _ := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
    
    logger := &ZeroCopyLogger{
        file:   file,
        writer: bufio.NewWriterSize(file, 64*1024*1024), // 64MB 缓冲
        ch:     make(chan []byte, 10000),
    }
    
    go logger.worker()
    return logger
}

func (l *ZeroCopyLogger) Write(log []byte) {
    l.ch <- log
}

func (l *ZeroCopyLogger) worker() {
    ticker := time.NewTicker(1 * time.Second)
    defer ticker.Stop()
    
    for {
        select {
        case log := <-l.ch:
            l.writer.Write(log)
            l.writer.WriteByte('\n')
            
        case <-ticker.C:
            l.writer.Flush()
        }
    }
}

吞吐量:500 MB/s → 1 GB/s(2x)✅

最终效果

优化步骤 吞吐量 提升
初始(逐条写入) 10 MB/s -
1. 批量写入 100 MB/s 10x
2. 异步写入 500 MB/s 50x
3. 零拷贝 + 大缓冲 1 GB/s 100x

案例 4:微服务链路优化

背景

某微服务架构,调用链路:API → Service A → Service B → Service C → DB

P99 延迟:800ms,需要优化到 100ms。

优化步骤

第 1 步:链路追踪定位瓶颈

# 使用 Jaeger 查看调用链
# 发现:
# - Service A → Service B: 200ms
# - Service B → Service C: 300ms
# - Service C → DB: 200ms
# 总计:700ms(网络往返)

第 2 步:服务合并

// ❌ 3 次微服务调用
func handleRequest() Response {
    dataA := callServiceA()  // 200ms
    dataB := callServiceB(dataA)  // 300ms
    dataC := callServiceC(dataB)  // 200ms
    return buildResponse(dataA, dataB, dataC)
}

// ✅ 合并为单服务
func handleRequestMerged() Response {
    // 直接在一个服务内完成
    dataA := processA()  // 10ms
    dataB := processB(dataA)  // 15ms
    dataC := processC(dataB)  // 20ms
    return buildResponse(dataA, dataB, dataC)
}

延迟:700ms → 45ms(94% 提升)

第 3 步:并行调用(如果数据无依赖)

// ✅ 并行调用(如果 A、B、C 无依赖)
func handleRequestParallel() Response {
    var wg sync.WaitGroup
    var dataA, dataB, dataC Data
    
    wg.Add(3)
    
    go func() {
        defer wg.Done()
        dataA = callServiceA()
    }()
    
    go func() {
        defer wg.Done()
        dataB = callServiceB()
    }()
    
    go func() {
        defer wg.Done()
        dataC = callServiceC()
    }()
    
    wg.Wait()
    return buildResponse(dataA, dataB, dataC)
}

延迟:700ms → 300ms(最慢的一个服务)

第 4 步:gRPC 替换 HTTP

// ❌ HTTP/JSON(300ms)
resp, _ := http.Post("http://serviceB/api", "application/json", body)

// ✅ gRPC(50ms)
conn, _ := grpc.Dial("serviceB:50051", grpc.WithInsecure())
client := pb.NewServiceBClient(conn)
resp, _ := client.Process(ctx, &pb.Request{Data: data})

延迟:300ms → 50ms(6x 提升)

最终效果

优化方案 延迟 提升
初始(3次HTTP调用) 700ms -
1. 服务合并 45ms 94% ✅
2. 并行调用 300ms 57%
3. gRPC 替换 HTTP 50ms 93%

案例 5:数据库慢查询优化

背景

某查询 P99 延迟 5 秒,需要优化到 50ms。

-- ❌ 慢查询
SELECT o.*, u.name, p.name
FROM orders o
JOIN users u ON o.user_id = u.id
JOIN products p ON o.product_id = p.id
WHERE o.created_at >= '2024-01-01'
ORDER BY o.created_at DESC
LIMIT 10;

优化步骤

第 1 步:EXPLAIN 分析

EXPLAIN SELECT ...;

-- 输出:
-- type: ALL(全表扫描)
-- rows: 1000000(扫描 100 万行)
-- Extra: Using filesort(文件排序)

第 2 步:添加索引

-- ✅ 添加复合索引
CREATE INDEX idx_created_at ON orders(created_at DESC);

延迟:5s → 500ms(10x 提升)

第 3 步:覆盖索引

-- ✅ 覆盖索引(包含所有查询字段)
CREATE INDEX idx_orders_covering ON orders(created_at DESC, user_id, product_id);

-- 查询改为先查 ID,再关联
SELECT o.*, u.name, p.name
FROM orders o
INNER JOIN (
    SELECT id, user_id, product_id
    FROM orders
    WHERE created_at >= '2024-01-01'
    ORDER BY created_at DESC
    LIMIT 10
) AS tmp ON o.id = tmp.id
JOIN users u ON tmp.user_id = u.id
JOIN products p ON tmp.product_id = p.id;

延迟:500ms → 50ms(10x 提升)✅

最终效果

优化步骤 延迟 提升
初始(全表扫描) 5s -
1. 添加索引 500ms 10x
2. 覆盖索引 + 延迟关联 50ms 100x

优化总结

通用优化策略

  1. 架构层

    • 引入缓存(本地缓存、Redis、CDN)
    • 读写分离、分库分表
    • 异步处理(消息队列)
  2. 算法层

    • 降低时间复杂度(O(n²) → O(n log n) → O(n))
    • 空间换时间(哈希表、预计算)
  3. 并发层

    • 并行处理(多核 CPU)
    • 批量操作(减少网络往返)
  4. 数据库层

    • 添加索引
    • 优化查询(避免 N+1、使用 JOIN)
    • 连接池
  5. 网络层

    • 连接复用
    • gRPC 替换 HTTP
    • 零拷贝传输

优化流程

1. 压测 → 发现性能瓶颈
2. 分析 → 定位具体原因(pprof、EXPLAIN)
3. 优化 → 实施针对性优化
4. 验证 → 对比优化前后数据
5. 监控 → 持续观察性能指标

关键指标对比

案例 初始指标 最终指标 提升
秒杀系统 1000 QPS 100000 QPS 100x
推荐系统 500ms P99 45ms P99 11x
日志系统 10 MB/s 1 GB/s 100x
微服务链路 700ms 45ms 15x
数据库查询 5s 50ms 100x

本章小结

核心要点

  1. 测量驱动:先压测,再分析,后优化
  2. 找准瓶颈:80/20 法则,优化最大瓶颈
  3. 量化评估:用数据证明优化效果
  4. 全链路思维:从架构到代码,系统性优化
  5. 持续监控:优化后持续观察,防止性能退化

优化优先级

架构优化(收益最大) > 算法优化 > 并发优化 > 数据库优化 > 网络优化 > 代码细节优化

⏮️ 上一节:性能分析工具 | 🏠 返回目录