-
-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathmonitor.py
More file actions
150 lines (114 loc) · 3.63 KB
/
monitor.py
File metadata and controls
150 lines (114 loc) · 3.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import redis
import os
import time
import json
import sys
from kazoo.client import KazooClient
from kazoo.exceptions import NoNodeError
from kazoo.exceptions import NodeExistsError
from config import Config
from zoo import init_kazoo
conf = Config(sys.argv[1])
ZK_PATH = conf.section("zookeeper")["path"]
print(ZK_PATH)
primary_addr = None
def connect_to_redis(host):
try:
return redis.from_url(f"redis://{host}/")
except Exception as e:
print(str(e))
return None
def set_primary(host):
try:
rconn = connect_to_redis(host)
rconn.slaveof()
return rconn
except Exception as e:
print(str(e))
return None
def info(conn):
try:
value = conn.info()
return True, value
except redis.exceptions.ConnectionError as e:
print(str(e))
return False, None
def set_replicas(primary, hosts):
parts = primary.split(":")
for h in hosts:
rconn = connect_to_redis(h)
ok, value = info(rconn)
if ok:
set_as_replica = False
role = value["role"]
if role != "slave":
set_as_replica = True
else:
master_host = value["master_host"]
master_port = value["master_port"]
master_addr = f"{master_host}:{master_port}"
if master_addr != primary:
set_as_replica = True
if set_as_replica:
rconn.slaveof(parts[0], int(parts[1]))
print(f"set {h} as replica of {primary}")
def refresh_node(data, stat):
global primary_addr
if not data:
print("There is no data")
return
hosts = json.loads(data.decode('utf-8'))
primary = hosts["primary"]
primary_addr = primary
conn = set_primary(primary)
if conn:
print(f"Primary Redis is {primary}")
set_replicas(primary, hosts["secondary"])
zk = init_kazoo(conf.section("zookeeper")["hosts"], ZK_PATH, refresh_node, False)
failure_count = 0
def get_good_secondary(hosts):
for h in hosts:
conn = redis.from_url(f"redis://{h}/")
if info(conn)[0]:
return h, conn
return None, None
def get_redis_info_from_zk(path):
data = zk.get(ZK_PATH)
if not data:
return None
return json.loads(data[0].decode('utf-8'))
def monitor():
global failure_count
global primary_addr
while True:
conn = connect_to_redis(primary_addr)
ok, value = info(conn)
if ok:
failure_count = 0
hosts = get_redis_info_from_zk(ZK_PATH)
if hosts:
if primary_addr == hosts["primary"]:
set_replicas(hosts["primary"], hosts["secondary"])
time.sleep(10)
else:
print(f"check: {primary_addr} failure_count: {failure_count}")
failure_count += 1
if failure_count >= 3:
hosts = get_redis_info_from_zk(ZK_PATH)
if not hosts:
print("There is no data")
time.sleep(10)
continue
host, conn = get_good_secondary(hosts["secondary"])
if not conn:
print("There is no good secondary", hosts)
time.sleep(10)
continue
old_p = hosts["primary"]
hosts["secondary"].remove(host)
hosts["secondary"].append(old_p)
hosts["primary"] = host
set_primary(host)
zk.set(ZK_PATH, json.dumps(hosts).encode('utf-8'))
failure_count = 0
monitor()