Skip to content

Latest commit

 

History

History
416 lines (321 loc) · 9.26 KB

File metadata and controls

416 lines (321 loc) · 9.26 KB

3.5 etcd 分布式KV存储

📍 导航返回目录 | 上一节:Flink | 下一节:ClickHouse


核心特性

  • 强一致性:基于 Raft 算法
  • 高可用性:支持集群部署
  • Watch 机制:监听 key 变化
  • TTL 支持:自动过期
  • 事务支持:MVCC 多版本并发控制

基本操作(Go SDK)

import (
    "context"
    "go.etcd.io/etcd/client/v3"
    "time"
)

// 创建客户端
func NewEtcdClient() (*clientv3.Client, error) {
    return clientv3.New(clientv3.Config{
        Endpoints:   []string{"localhost:2379"},
        DialTimeout: 5 * time.Second,
    })
}

// Put 操作
func Put(cli *clientv3.Client, key, value string) error {
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()
    
    _, err := cli.Put(ctx, key, value)
    return err
}

// Get 操作
func Get(cli *clientv3.Client, key string) (string, error) {
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()
    
    resp, err := cli.Get(ctx, key)
    if err != nil {
        return "", err
    }
    
    if len(resp.Kvs) > 0 {
        return string(resp.Kvs[0].Value), nil
    }
    return "", nil
}

// Delete 操作
func Delete(cli *clientv3.Client, key string) error {
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()
    
    _, err := cli.Delete(ctx, key)
    return err
}

Watch 监听

// Watch 单个 key
func WatchKey(cli *clientv3.Client, key string) {
    watchChan := cli.Watch(context.Background(), key)
    
    for resp := range watchChan {
        for _, event := range resp.Events {
            fmt.Printf("Type: %s, Key: %s, Value: %s\n",
                event.Type, event.Kv.Key, event.Kv.Value)
        }
    }
}

// Watch 前缀
func WatchPrefix(cli *clientv3.Client, prefix string) {
    watchChan := cli.Watch(context.Background(), prefix, clientv3.WithPrefix())
    
    for resp := range watchChan {
        for _, event := range resp.Events {
            switch event.Type {
            case clientv3.EventTypePut:
                fmt.Printf("修改: %s = %s\n", event.Kv.Key, event.Kv.Value)
            case clientv3.EventTypeDelete:
                fmt.Printf("删除: %s\n", event.Kv.Key)
            }
        }
    }
}

Lease 租约(TTL)

func PutWithTTL(cli *clientv3.Client, key, value string, ttl int64) error {
    ctx := context.Background()
    
    // 创建租约
    lease, err := cli.Grant(ctx, ttl)
    if err != nil {
        return err
    }
    
    // 绑定租约
    _, err = cli.Put(ctx, key, value, clientv3.WithLease(lease.ID))
    if err != nil {
        return err
    }
    
    return nil
}

// 自动续租
func KeepAlive(cli *clientv3.Client, leaseID clientv3.LeaseID) {
    keepAliveChan, err := cli.KeepAlive(context.Background(), leaseID)
    if err != nil {
        return
    }
    
    for ka := range keepAliveChan {
        fmt.Printf("租约 %x 续租成功\n", ka.ID)
    }
}

事务操作

func Transaction(cli *clientv3.Client) error {
    ctx := context.Background()
    
    // 事务:如果 key1 的值是 "value1",则设置 key2
    _, err := cli.Txn(ctx).
        If(clientv3.Compare(clientv3.Value("key1"), "=", "value1")).
        Then(clientv3.OpPut("key2", "value2")).
        Else(clientv3.OpPut("key2", "default")).
        Commit()
    
    return err
}

// CAS 操作
func CompareAndSwap(cli *clientv3.Client, key, oldValue, newValue string) (bool, error) {
    ctx := context.Background()
    
    resp, err := cli.Txn(ctx).
        If(clientv3.Compare(clientv3.Value(key), "=", oldValue)).
        Then(clientv3.OpPut(key, newValue)).
        Commit()
    
    if err != nil {
        return false, err
    }
    
    return resp.Succeeded, nil
}

实战应用

服务注册与发现

type ServiceRegistry struct {
    client  *clientv3.Client
    leaseID clientv3.LeaseID
}

// 注册服务
func (sr *ServiceRegistry) Register(serviceName, serviceAddr string, ttl int64) error {
    ctx := context.Background()
    
    // 创建租约
    lease, err := sr.client.Grant(ctx, ttl)
    if err != nil {
        return err
    }
    sr.leaseID = lease.ID
    
    // 注册服务
    key := fmt.Sprintf("/services/%s/%s", serviceName, serviceAddr)
    _, err = sr.client.Put(ctx, key, serviceAddr, clientv3.WithLease(lease.ID))
    if err != nil {
        return err
    }
    
    // 自动续租
    go sr.keepAlive()
    
    return nil
}

// 服务发现
func (sr *ServiceRegistry) Discover(serviceName string) ([]string, error) {
    ctx := context.Background()
    prefix := fmt.Sprintf("/services/%s/", serviceName)
    
    resp, err := sr.client.Get(ctx, prefix, clientv3.WithPrefix())
    if err != nil {
        return nil, err
    }
    
    var addrs []string
    for _, kv := range resp.Kvs {
        addrs = append(addrs, string(kv.Value))
    }
    
    return addrs, nil
}

// 监听服务变化
func (sr *ServiceRegistry) Watch(serviceName string) <-chan []string {
    prefix := fmt.Sprintf("/services/%s/", serviceName)
    watchChan := sr.client.Watch(context.Background(), prefix, clientv3.WithPrefix())
    
    addrChan := make(chan []string)
    
    go func() {
        for range watchChan {
            addrs, _ := sr.Discover(serviceName)
            addrChan <- addrs
        }
    }()
    
    return addrChan
}

func (sr *ServiceRegistry) keepAlive() {
    keepAliveChan, err := sr.client.KeepAlive(context.Background(), sr.leaseID)
    if err != nil {
        return
    }
    
    for range keepAliveChan {
        // 续租成功
    }
}

分布式锁

type DistributedLock struct {
    client  *clientv3.Client
    session *concurrency.Session
    mutex   *concurrency.Mutex
}

func NewDistributedLock(cli *clientv3.Client, key string) (*DistributedLock, error) {
    session, err := concurrency.NewSession(cli)
    if err != nil {
        return nil, err
    }
    
    mutex := concurrency.NewMutex(session, key)
    
    return &DistributedLock{
        client:  cli,
        session: session,
        mutex:   mutex,
    }, nil
}

func (dl *DistributedLock) Lock(ctx context.Context) error {
    return dl.mutex.Lock(ctx)
}

func (dl *DistributedLock) Unlock(ctx context.Context) error {
    err := dl.mutex.Unlock(ctx)
    if err != nil {
        return err
    }
    return dl.session.Close()
}

配置中心

type ConfigCenter struct {
    client *clientv3.Client
    cache  map[string]string
    mu     sync.RWMutex
}

func NewConfigCenter(cli *clientv3.Client) *ConfigCenter {
    cc := &ConfigCenter{
        client: cli,
        cache:  make(map[string]string),
    }
    
    // 加载初始配置
    cc.loadConfig()
    
    // 监听配置变化
    go cc.watchConfig()
    
    return cc
}

func (cc *ConfigCenter) GetConfig(key string) string {
    cc.mu.RLock()
    defer cc.mu.RUnlock()
    return cc.cache[key]
}

func (cc *ConfigCenter) loadConfig() {
    ctx := context.Background()
    resp, err := cc.client.Get(ctx, "/config/", clientv3.WithPrefix())
    if err != nil {
        return
    }
    
    cc.mu.Lock()
    defer cc.mu.Unlock()
    
    for _, kv := range resp.Kvs {
        cc.cache[string(kv.Key)] = string(kv.Value)
    }
}

func (cc *ConfigCenter) watchConfig() {
    watchChan := cc.client.Watch(context.Background(), "/config/", clientv3.WithPrefix())
    
    for resp := range watchChan {
        cc.mu.Lock()
        for _, event := range resp.Events {
            key := string(event.Kv.Key)
            if event.Type == clientv3.EventTypePut {
                cc.cache[key] = string(event.Kv.Value)
            } else if event.Type == clientv3.EventTypeDelete {
                delete(cc.cache, key)
            }
        }
        cc.mu.Unlock()
    }
}

集群部署

单机启动

etcd --name node1 \
  --data-dir /var/lib/etcd \
  --listen-client-urls http://0.0.0.0:2379 \
  --advertise-client-urls http://localhost:2379

集群启动

# 节点1
etcd --name node1 \
  --initial-advertise-peer-urls http://192.168.1.1:2380 \
  --listen-peer-urls http://0.0.0.0:2380 \
  --listen-client-urls http://0.0.0.0:2379 \
  --advertise-client-urls http://192.168.1.1:2379 \
  --initial-cluster-token etcd-cluster \
  --initial-cluster node1=http://192.168.1.1:2380,node2=http://192.168.1.2:2380,node3=http://192.168.1.3:2380 \
  --initial-cluster-state new

# 节点2、节点3 类似...

本章小结

etcd 是云原生时代的分布式KV存储,广泛应用于 Kubernetes、微服务配置管理、服务发现等场景。

关键要点

  • ✅ Raft 算法保证强一致性
  • ✅ Watch 机制实现实时配置更新
  • ✅ Lease 机制支持 TTL 和服务注册
  • ✅ 事务支持实现分布式锁

扩展阅读


💡 思考题

  1. etcd 为什么选择 Raft 而不是 Paxos?
  2. Watch 机制是如何实现的?
  3. 如何用 etcd 实现分布式锁?

⏮️ 上一节:Flink | ⏭️ 下一节:ClickHouse