Skip to content

Latest commit

 

History

History
382 lines (293 loc) · 8.89 KB

File metadata and controls

382 lines (293 loc) · 8.89 KB

5.4 分布式事务

📍 导航返回目录 | 上一节:分布式ID | 下一节:Paxos


问题背景

单机事务:ACID保证
分布式事务:跨多个服务/数据库,如何保证一致性?

场景:电商下单

  1. 订单服务:创建订单
  2. 库存服务:扣减库存
  3. 支付服务:扣款
  4. 积分服务:增加积分

2PC(两阶段提交)

原理

阶段1:准备阶段(Prepare)

  • 协调者询问参与者:能否提交?
  • 参与者执行事务但不提交,返回Yes/No

阶段2:提交阶段(Commit)

  • 所有参与者Yes → 协调者发送Commit
  • 任一参与者No → 协调者发送Rollback
协调者          参与者1    参与者2    参与者3
   |               |          |          |
   |---Prepare---->|          |          |
   |               |---Yes--->|          |
   |---Prepare-------------------->|     |
   |               |          |---Yes--->|
   |---Prepare-------------------------------->|
   |               |          |          |---Yes--->
   |               |          |          |
   |---Commit----->|          |          |
   |---Commit---------------->|          |
   |---Commit---------------------------->|

问题

  1. 同步阻塞:参与者等待协调者指令期间锁定资源
  2. 单点故障:协调者宕机导致参与者长期阻塞
  3. 数据不一致:网络分区可能导致部分提交

应用

  • XA协议(MySQL XA)
  • 数据库分布式事务

TCC(Try-Confirm-Cancel)

原理

三阶段

  1. Try:预留资源
  2. Confirm:确认提交
  3. Cancel:取消,释放资源

Go实现

type TCCTransaction struct {
    txID        string
    services    []TCCService
    tryResults  map[string]interface{}
}

type TCCService interface {
    Try(ctx context.Context, params interface{}) (interface{}, error)
    Confirm(ctx context.Context, txID string) error
    Cancel(ctx context.Context, txID string) error
}

// 订单服务TCC实现
type OrderService struct {
    db *sql.DB
}

func (os *OrderService) Try(ctx context.Context, params interface{}) (interface{}, error) {
    order := params.(Order)
    
    // 创建订单,状态为"待确认"
    _, err := os.db.ExecContext(ctx,
        "INSERT INTO orders (id, user_id, amount, status) VALUES (?, ?, ?, 'PENDING')",
        order.ID, order.UserID, order.Amount)
    
    return order.ID, err
}

func (os *OrderService) Confirm(ctx context.Context, txID string) error {
    // 更新订单状态为"已确认"
    _, err := os.db.ExecContext(ctx,
        "UPDATE orders SET status = 'CONFIRMED' WHERE id = ?", txID)
    return err
}

func (os *OrderService) Cancel(ctx context.Context, txID string) error {
    // 删除订单或标记为取消
    _, err := os.db.ExecContext(ctx,
        "UPDATE orders SET status = 'CANCELLED' WHERE id = ?", txID)
    return err
}

// TCC协调器
func (tcc *TCCTransaction) Execute(ctx context.Context) error {
    // 阶段1:Try
    for _, service := range tcc.services {
        result, err := service.Try(ctx, nil)
        if err != nil {
            // Try失败,取消所有
            tcc.cancelAll(ctx)
            return err
        }
        tcc.tryResults[reflect.TypeOf(service).Name()] = result
    }
    
    // 阶段2:Confirm
    for _, service := range tcc.services {
        if err := service.Confirm(ctx, tcc.txID); err != nil {
            // Confirm失败,需要重试或人工介入
            return err
        }
    }
    
    return nil
}

func (tcc *TCCTransaction) cancelAll(ctx context.Context) {
    for _, service := range tcc.services {
        service.Cancel(ctx, tcc.txID)
    }
}

优点

  • 不锁定资源
  • 性能好

缺点

  • 业务侵入性强
  • 需要实现三个接口

SAGA模式

原理

将长事务拆分为多个本地短事务,每个事务都有对应的补偿操作。

两种实现方式

  1. 编排(Choreography):事件驱动
  2. 协调(Orchestration):中央协调器

编排模式

// 事件驱动
type EventBus struct {
    subscribers map[string][]func(Event)
}

// 订单创建成功 → 触发扣库存
func (eb *EventBus) OnOrderCreated(handler func(Event)) {
    eb.subscribers["order.created"] = append(
        eb.subscribers["order.created"], handler)
}

// 示例
func main() {
    bus := NewEventBus()
    
    // 订单服务
    bus.OnOrderCreated(func(e Event) {
        orderID := e.Data["order_id"]
        if err := createOrder(orderID); err != nil {
            bus.Publish(Event{Type: "order.failed"})
        } else {
            bus.Publish(Event{Type: "inventory.deduct"})
        }
    })
    
    // 库存服务
    bus.Subscribe("inventory.deduct", func(e Event) {
        if err := deductInventory(e.Data); err != nil {
            bus.Publish(Event{Type: "order.cancel"})  // 补偿
        } else {
            bus.Publish(Event{Type: "payment.process"})
        }
    })
}

协调模式

type SagaOrchestrator struct {
    steps []SagaStep
}

type SagaStep struct {
    Action      func(context.Context) error
    Compensate  func(context.Context) error
}

func (so *SagaOrchestrator) Execute(ctx context.Context) error {
    executed := []int{}
    
    // 执行所有步骤
    for i, step := range so.steps {
        if err := step.Action(ctx); err != nil {
            // 失败,执行补偿
            so.compensate(ctx, executed)
            return err
        }
        executed = append(executed, i)
    }
    
    return nil
}

func (so *SagaOrchestrator) compensate(ctx context.Context, executed []int) {
    // 逆序执行补偿
    for i := len(executed) - 1; i >= 0; i-- {
        step := so.steps[executed[i]]
        step.Compensate(ctx)
    }
}

// 使用示例
func OrderSaga() *SagaOrchestrator {
    return &SagaOrchestrator{
        steps: []SagaStep{
            {
                Action:     createOrder,
                Compensate: cancelOrder,
            },
            {
                Action:     deductInventory,
                Compensate: restoreInventory,
            },
            {
                Action:     processPayment,
                Compensate: refundPayment,
            },
        },
    }
}

本地消息表

原理

将分布式事务转化为本地事务 + 消息队列。

func CreateOrderWithLocalMessage(order Order) error {
    tx, _ := db.Begin()
    
    // 1. 创建订单(本地事务)
    _, err := tx.Exec("INSERT INTO orders (...) VALUES (...)")
    if err != nil {
        tx.Rollback()
        return err
    }
    
    // 2. 写入本地消息表
    _, err = tx.Exec(
        "INSERT INTO local_messages (id, topic, payload, status) VALUES (?, ?, ?, 'PENDING')",
        uuid.New(), "inventory.deduct", orderJSON, "PENDING")
    if err != nil {
        tx.Rollback()
        return err
    }
    
    return tx.Commit()
}

// 定时任务:扫描本地消息表,发送到MQ
func MessageRelay() {
    ticker := time.NewTicker(1 * time.Second)
    for range ticker.C {
        messages := queryPendingMessages()
        for _, msg := range messages {
            if err := mq.Publish(msg.Topic, msg.Payload); err == nil {
                updateMessageStatus(msg.ID, "SENT")
            }
        }
    }
}

优点

  • 实现简单
  • 最终一致性

缺点

  • 需要额外的消息表
  • 需要定时任务

Seata框架

AT模式(自动)

// 使用@GlobalTransactional注解
@GlobalTransactional
public void purchase(String userId, String productId, int count) {
    // 1. 扣减库存
    inventoryService.deduct(productId, count);
    
    // 2. 扣减余额
    accountService.deduct(userId, price * count);
    
    // 3. 创建订单
    orderService.create(userId, productId, count);
}

原理

  • 生成前置镜像(BeforeImage)
  • 生成后置镜像(AfterImage)
  • 回滚时对比镜像,生成反向SQL

方案对比

方案 一致性 性能 复杂度 适用场景
2PC 强一致 强一致性要求
TCC 最终一致 高性能场景
SAGA 最终一致 长事务
本地消息表 最终一致 简单场景

本章小结

关键要点

  • ✅ 分布式事务牺牲性能换一致性
  • ✅ 2PC强一致但性能差
  • ✅ TCC/SAGA适合最终一致性场景
  • ✅ 本地消息表实现简单

扩展阅读


💡 思考题

  1. 2PC和3PC的区别?
  2. TCC和SAGA如何选择?
  3. 如何保证幂等性?

⏮️ 上一节:分布式ID | ⏭️ 下一节:Paxos