📍 导航:返回目录 | 上一节:Flink | 下一节:ClickHouse
- 强一致性:基于 Raft 算法
- 高可用性:支持集群部署
- Watch 机制:监听 key 变化
- TTL 支持:自动过期
- 事务支持:MVCC 多版本并发控制
import (
"context"
"go.etcd.io/etcd/client/v3"
"time"
)
// 创建客户端
func NewEtcdClient() (*clientv3.Client, error) {
return clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
})
}
// Put 操作
func Put(cli *clientv3.Client, key, value string) error {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
_, err := cli.Put(ctx, key, value)
return err
}
// Get 操作
func Get(cli *clientv3.Client, key string) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
resp, err := cli.Get(ctx, key)
if err != nil {
return "", err
}
if len(resp.Kvs) > 0 {
return string(resp.Kvs[0].Value), nil
}
return "", nil
}
// Delete 操作
func Delete(cli *clientv3.Client, key string) error {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
_, err := cli.Delete(ctx, key)
return err
}// Watch 单个 key
func WatchKey(cli *clientv3.Client, key string) {
watchChan := cli.Watch(context.Background(), key)
for resp := range watchChan {
for _, event := range resp.Events {
fmt.Printf("Type: %s, Key: %s, Value: %s\n",
event.Type, event.Kv.Key, event.Kv.Value)
}
}
}
// Watch 前缀
func WatchPrefix(cli *clientv3.Client, prefix string) {
watchChan := cli.Watch(context.Background(), prefix, clientv3.WithPrefix())
for resp := range watchChan {
for _, event := range resp.Events {
switch event.Type {
case clientv3.EventTypePut:
fmt.Printf("修改: %s = %s\n", event.Kv.Key, event.Kv.Value)
case clientv3.EventTypeDelete:
fmt.Printf("删除: %s\n", event.Kv.Key)
}
}
}
}func PutWithTTL(cli *clientv3.Client, key, value string, ttl int64) error {
ctx := context.Background()
// 创建租约
lease, err := cli.Grant(ctx, ttl)
if err != nil {
return err
}
// 绑定租约
_, err = cli.Put(ctx, key, value, clientv3.WithLease(lease.ID))
if err != nil {
return err
}
return nil
}
// 自动续租
func KeepAlive(cli *clientv3.Client, leaseID clientv3.LeaseID) {
keepAliveChan, err := cli.KeepAlive(context.Background(), leaseID)
if err != nil {
return
}
for ka := range keepAliveChan {
fmt.Printf("租约 %x 续租成功\n", ka.ID)
}
}func Transaction(cli *clientv3.Client) error {
ctx := context.Background()
// 事务:如果 key1 的值是 "value1",则设置 key2
_, err := cli.Txn(ctx).
If(clientv3.Compare(clientv3.Value("key1"), "=", "value1")).
Then(clientv3.OpPut("key2", "value2")).
Else(clientv3.OpPut("key2", "default")).
Commit()
return err
}
// CAS 操作
func CompareAndSwap(cli *clientv3.Client, key, oldValue, newValue string) (bool, error) {
ctx := context.Background()
resp, err := cli.Txn(ctx).
If(clientv3.Compare(clientv3.Value(key), "=", oldValue)).
Then(clientv3.OpPut(key, newValue)).
Commit()
if err != nil {
return false, err
}
return resp.Succeeded, nil
}type ServiceRegistry struct {
client *clientv3.Client
leaseID clientv3.LeaseID
}
// 注册服务
func (sr *ServiceRegistry) Register(serviceName, serviceAddr string, ttl int64) error {
ctx := context.Background()
// 创建租约
lease, err := sr.client.Grant(ctx, ttl)
if err != nil {
return err
}
sr.leaseID = lease.ID
// 注册服务
key := fmt.Sprintf("/services/%s/%s", serviceName, serviceAddr)
_, err = sr.client.Put(ctx, key, serviceAddr, clientv3.WithLease(lease.ID))
if err != nil {
return err
}
// 自动续租
go sr.keepAlive()
return nil
}
// 服务发现
func (sr *ServiceRegistry) Discover(serviceName string) ([]string, error) {
ctx := context.Background()
prefix := fmt.Sprintf("/services/%s/", serviceName)
resp, err := sr.client.Get(ctx, prefix, clientv3.WithPrefix())
if err != nil {
return nil, err
}
var addrs []string
for _, kv := range resp.Kvs {
addrs = append(addrs, string(kv.Value))
}
return addrs, nil
}
// 监听服务变化
func (sr *ServiceRegistry) Watch(serviceName string) <-chan []string {
prefix := fmt.Sprintf("/services/%s/", serviceName)
watchChan := sr.client.Watch(context.Background(), prefix, clientv3.WithPrefix())
addrChan := make(chan []string)
go func() {
for range watchChan {
addrs, _ := sr.Discover(serviceName)
addrChan <- addrs
}
}()
return addrChan
}
func (sr *ServiceRegistry) keepAlive() {
keepAliveChan, err := sr.client.KeepAlive(context.Background(), sr.leaseID)
if err != nil {
return
}
for range keepAliveChan {
// 续租成功
}
}type DistributedLock struct {
client *clientv3.Client
session *concurrency.Session
mutex *concurrency.Mutex
}
func NewDistributedLock(cli *clientv3.Client, key string) (*DistributedLock, error) {
session, err := concurrency.NewSession(cli)
if err != nil {
return nil, err
}
mutex := concurrency.NewMutex(session, key)
return &DistributedLock{
client: cli,
session: session,
mutex: mutex,
}, nil
}
func (dl *DistributedLock) Lock(ctx context.Context) error {
return dl.mutex.Lock(ctx)
}
func (dl *DistributedLock) Unlock(ctx context.Context) error {
err := dl.mutex.Unlock(ctx)
if err != nil {
return err
}
return dl.session.Close()
}type ConfigCenter struct {
client *clientv3.Client
cache map[string]string
mu sync.RWMutex
}
func NewConfigCenter(cli *clientv3.Client) *ConfigCenter {
cc := &ConfigCenter{
client: cli,
cache: make(map[string]string),
}
// 加载初始配置
cc.loadConfig()
// 监听配置变化
go cc.watchConfig()
return cc
}
func (cc *ConfigCenter) GetConfig(key string) string {
cc.mu.RLock()
defer cc.mu.RUnlock()
return cc.cache[key]
}
func (cc *ConfigCenter) loadConfig() {
ctx := context.Background()
resp, err := cc.client.Get(ctx, "/config/", clientv3.WithPrefix())
if err != nil {
return
}
cc.mu.Lock()
defer cc.mu.Unlock()
for _, kv := range resp.Kvs {
cc.cache[string(kv.Key)] = string(kv.Value)
}
}
func (cc *ConfigCenter) watchConfig() {
watchChan := cc.client.Watch(context.Background(), "/config/", clientv3.WithPrefix())
for resp := range watchChan {
cc.mu.Lock()
for _, event := range resp.Events {
key := string(event.Kv.Key)
if event.Type == clientv3.EventTypePut {
cc.cache[key] = string(event.Kv.Value)
} else if event.Type == clientv3.EventTypeDelete {
delete(cc.cache, key)
}
}
cc.mu.Unlock()
}
}etcd --name node1 \
--data-dir /var/lib/etcd \
--listen-client-urls http://0.0.0.0:2379 \
--advertise-client-urls http://localhost:2379# 节点1
etcd --name node1 \
--initial-advertise-peer-urls http://192.168.1.1:2380 \
--listen-peer-urls http://0.0.0.0:2380 \
--listen-client-urls http://0.0.0.0:2379 \
--advertise-client-urls http://192.168.1.1:2379 \
--initial-cluster-token etcd-cluster \
--initial-cluster node1=http://192.168.1.1:2380,node2=http://192.168.1.2:2380,node3=http://192.168.1.3:2380 \
--initial-cluster-state new
# 节点2、节点3 类似...etcd 是云原生时代的分布式KV存储,广泛应用于 Kubernetes、微服务配置管理、服务发现等场景。
关键要点:
- ✅ Raft 算法保证强一致性
- ✅ Watch 机制实现实时配置更新
- ✅ Lease 机制支持 TTL 和服务注册
- ✅ 事务支持实现分布式锁
💡 思考题:
- etcd 为什么选择 Raft 而不是 Paxos?
- Watch 机制是如何实现的?
- 如何用 etcd 实现分布式锁?
⏮️ 上一节:Flink | ⏭️ 下一节:ClickHouse