单机事务:ACID保证
分布式事务:跨多个服务/数据库,如何保证一致性?
场景:电商下单
- 订单服务:创建订单
- 库存服务:扣减库存
- 支付服务:扣款
- 积分服务:增加积分
阶段1:准备阶段(Prepare)
- 协调者询问参与者:能否提交?
- 参与者执行事务但不提交,返回Yes/No
阶段2:提交阶段(Commit)
- 所有参与者Yes → 协调者发送Commit
- 任一参与者No → 协调者发送Rollback
协调者 参与者1 参与者2 参与者3
| | | |
|---Prepare---->| | |
| |---Yes--->| |
|---Prepare-------------------->| |
| | |---Yes--->|
|---Prepare-------------------------------->|
| | | |---Yes--->
| | | |
|---Commit----->| | |
|---Commit---------------->| |
|---Commit---------------------------->|
- 同步阻塞:参与者等待协调者指令期间锁定资源
- 单点故障:协调者宕机导致参与者长期阻塞
- 数据不一致:网络分区可能导致部分提交
- XA协议(MySQL XA)
- 数据库分布式事务
三阶段:
- Try:预留资源
- Confirm:确认提交
- Cancel:取消,释放资源
type TCCTransaction struct {
txID string
services []TCCService
tryResults map[string]interface{}
}
type TCCService interface {
Try(ctx context.Context, params interface{}) (interface{}, error)
Confirm(ctx context.Context, txID string) error
Cancel(ctx context.Context, txID string) error
}
// 订单服务TCC实现
type OrderService struct {
db *sql.DB
}
func (os *OrderService) Try(ctx context.Context, params interface{}) (interface{}, error) {
order := params.(Order)
// 创建订单,状态为"待确认"
_, err := os.db.ExecContext(ctx,
"INSERT INTO orders (id, user_id, amount, status) VALUES (?, ?, ?, 'PENDING')",
order.ID, order.UserID, order.Amount)
return order.ID, err
}
func (os *OrderService) Confirm(ctx context.Context, txID string) error {
// 更新订单状态为"已确认"
_, err := os.db.ExecContext(ctx,
"UPDATE orders SET status = 'CONFIRMED' WHERE id = ?", txID)
return err
}
func (os *OrderService) Cancel(ctx context.Context, txID string) error {
// 删除订单或标记为取消
_, err := os.db.ExecContext(ctx,
"UPDATE orders SET status = 'CANCELLED' WHERE id = ?", txID)
return err
}
// TCC协调器
func (tcc *TCCTransaction) Execute(ctx context.Context) error {
// 阶段1:Try
for _, service := range tcc.services {
result, err := service.Try(ctx, nil)
if err != nil {
// Try失败,取消所有
tcc.cancelAll(ctx)
return err
}
tcc.tryResults[reflect.TypeOf(service).Name()] = result
}
// 阶段2:Confirm
for _, service := range tcc.services {
if err := service.Confirm(ctx, tcc.txID); err != nil {
// Confirm失败,需要重试或人工介入
return err
}
}
return nil
}
func (tcc *TCCTransaction) cancelAll(ctx context.Context) {
for _, service := range tcc.services {
service.Cancel(ctx, tcc.txID)
}
}- 不锁定资源
- 性能好
- 业务侵入性强
- 需要实现三个接口
将长事务拆分为多个本地短事务,每个事务都有对应的补偿操作。
两种实现方式:
- 编排(Choreography):事件驱动
- 协调(Orchestration):中央协调器
// 事件驱动
type EventBus struct {
subscribers map[string][]func(Event)
}
// 订单创建成功 → 触发扣库存
func (eb *EventBus) OnOrderCreated(handler func(Event)) {
eb.subscribers["order.created"] = append(
eb.subscribers["order.created"], handler)
}
// 示例
func main() {
bus := NewEventBus()
// 订单服务
bus.OnOrderCreated(func(e Event) {
orderID := e.Data["order_id"]
if err := createOrder(orderID); err != nil {
bus.Publish(Event{Type: "order.failed"})
} else {
bus.Publish(Event{Type: "inventory.deduct"})
}
})
// 库存服务
bus.Subscribe("inventory.deduct", func(e Event) {
if err := deductInventory(e.Data); err != nil {
bus.Publish(Event{Type: "order.cancel"}) // 补偿
} else {
bus.Publish(Event{Type: "payment.process"})
}
})
}type SagaOrchestrator struct {
steps []SagaStep
}
type SagaStep struct {
Action func(context.Context) error
Compensate func(context.Context) error
}
func (so *SagaOrchestrator) Execute(ctx context.Context) error {
executed := []int{}
// 执行所有步骤
for i, step := range so.steps {
if err := step.Action(ctx); err != nil {
// 失败,执行补偿
so.compensate(ctx, executed)
return err
}
executed = append(executed, i)
}
return nil
}
func (so *SagaOrchestrator) compensate(ctx context.Context, executed []int) {
// 逆序执行补偿
for i := len(executed) - 1; i >= 0; i-- {
step := so.steps[executed[i]]
step.Compensate(ctx)
}
}
// 使用示例
func OrderSaga() *SagaOrchestrator {
return &SagaOrchestrator{
steps: []SagaStep{
{
Action: createOrder,
Compensate: cancelOrder,
},
{
Action: deductInventory,
Compensate: restoreInventory,
},
{
Action: processPayment,
Compensate: refundPayment,
},
},
}
}将分布式事务转化为本地事务 + 消息队列。
func CreateOrderWithLocalMessage(order Order) error {
tx, _ := db.Begin()
// 1. 创建订单(本地事务)
_, err := tx.Exec("INSERT INTO orders (...) VALUES (...)")
if err != nil {
tx.Rollback()
return err
}
// 2. 写入本地消息表
_, err = tx.Exec(
"INSERT INTO local_messages (id, topic, payload, status) VALUES (?, ?, ?, 'PENDING')",
uuid.New(), "inventory.deduct", orderJSON, "PENDING")
if err != nil {
tx.Rollback()
return err
}
return tx.Commit()
}
// 定时任务:扫描本地消息表,发送到MQ
func MessageRelay() {
ticker := time.NewTicker(1 * time.Second)
for range ticker.C {
messages := queryPendingMessages()
for _, msg := range messages {
if err := mq.Publish(msg.Topic, msg.Payload); err == nil {
updateMessageStatus(msg.ID, "SENT")
}
}
}
}- 实现简单
- 最终一致性
- 需要额外的消息表
- 需要定时任务
// 使用@GlobalTransactional注解
@GlobalTransactional
public void purchase(String userId, String productId, int count) {
// 1. 扣减库存
inventoryService.deduct(productId, count);
// 2. 扣减余额
accountService.deduct(userId, price * count);
// 3. 创建订单
orderService.create(userId, productId, count);
}原理:
- 生成前置镜像(BeforeImage)
- 生成后置镜像(AfterImage)
- 回滚时对比镜像,生成反向SQL
| 方案 | 一致性 | 性能 | 复杂度 | 适用场景 |
|---|---|---|---|---|
| 2PC | 强一致 | 低 | 中 | 强一致性要求 |
| TCC | 最终一致 | 高 | 高 | 高性能场景 |
| SAGA | 最终一致 | 高 | 中 | 长事务 |
| 本地消息表 | 最终一致 | 中 | 低 | 简单场景 |
关键要点:
- ✅ 分布式事务牺牲性能换一致性
- ✅ 2PC强一致但性能差
- ✅ TCC/SAGA适合最终一致性场景
- ✅ 本地消息表实现简单
💡 思考题:
- 2PC和3PC的区别?
- TCC和SAGA如何选择?
- 如何保证幂等性?