Skip to content

Commit 0f92d82

Browse files
feat: add periodic reconciliation to detect and fix service drift
Add full reconciliation every 6 hours to compare known LoadBalancer services against actual cluster state. Track known services in memory and trigger webhooks for any detected drift. Include API connectivity verification before watch cycles to improve reliability.
1 parent 06a79e5 commit 0f92d82

1 file changed

Lines changed: 101 additions & 12 deletions

File tree

service_monitor.py

Lines changed: 101 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,17 @@
4141
last_event_time = time.time()
4242
HEALTH_CHECK_INTERVAL = 300 # 5 minutes in seconds
4343
MAX_WATCH_TIME = 3600 # 1 hour in seconds - force reconnect after this time
44+
RECONCILE_INTERVAL = 21600 # 6 hours in seconds - full reconciliation interval
4445

4546
# Reconnection settings
4647
INITIAL_RETRY_DELAY = 5 # seconds
4748
MAX_RETRY_DELAY = 300 # 5 minutes in seconds
4849
RETRY_BACKOFF_FACTOR = 1.5
4950

51+
# Track known LB services for reconciliation
52+
known_lb_services: set = set()
53+
last_reconcile_time = 0.0
54+
5055
# Running flag for graceful shutdown
5156
running = True
5257

@@ -101,16 +106,79 @@ def get_initial_resource_version(v1) -> Optional[str]:
101106
"""
102107
Get current resourceVersion without triggering workflow.
103108
This ensures we start watching from the current state.
109+
Also builds the initial snapshot of known LB services.
104110
"""
111+
global known_lb_services, last_reconcile_time
105112
try:
106113
services = v1.list_service_for_all_namespaces()
107-
lb_count = sum(1 for s in services.items if s.spec.type == 'LoadBalancer')
108-
logger.info(f"Found {lb_count} existing LoadBalancer services")
114+
known_lb_services = {
115+
f"{s.metadata.namespace}/{s.metadata.name}"
116+
for s in services.items if s.spec.type == 'LoadBalancer'
117+
}
118+
last_reconcile_time = time.time()
119+
logger.info(f"Found {len(known_lb_services)} existing LoadBalancer services")
109120
return services.metadata.resource_version
110121
except Exception as e:
111122
logger.error(f"Failed to get initial resourceVersion: {e}")
112123
return None
113124

125+
126+
def verify_api_connection(v1) -> bool:
127+
"""Lightweight API server connectivity check."""
128+
try:
129+
v1.read_namespace(name='default', _request_timeout=10)
130+
return True
131+
except Exception as e:
132+
logger.warning(f"API server connectivity check failed: {e}")
133+
return False
134+
135+
136+
def reconcile_services(v1) -> bool:
137+
"""
138+
Full reconciliation: list all LB services and compare against known state.
139+
If the set of services changed, trigger the webhook to ensure consistency.
140+
Returns True if reconciliation succeeded.
141+
"""
142+
global known_lb_services, last_reconcile_time
143+
try:
144+
services = v1.list_service_for_all_namespaces()
145+
current_lb_services = {
146+
f"{s.metadata.namespace}/{s.metadata.name}"
147+
for s in services.items if s.spec.type == 'LoadBalancer'
148+
}
149+
150+
added = current_lb_services - known_lb_services
151+
removed = known_lb_services - current_lb_services
152+
153+
if added or removed:
154+
logger.warning(
155+
f"Reconciliation detected drift: "
156+
f"{len(added)} added, {len(removed)} removed"
157+
)
158+
for svc in added:
159+
logger.info(f"Reconciliation: new service {svc}")
160+
trigger_semaphore_webhook('RECONCILE_ADDED', svc)
161+
for svc in removed:
162+
logger.info(f"Reconciliation: removed service {svc}")
163+
trigger_semaphore_webhook('RECONCILE_DELETED', svc)
164+
else:
165+
logger.info(
166+
f"Reconciliation OK: {len(current_lb_services)} LB services, no drift"
167+
)
168+
169+
known_lb_services = current_lb_services
170+
last_reconcile_time = time.time()
171+
172+
# Also update resourceVersion from the list response
173+
rv = services.metadata.resource_version
174+
if rv:
175+
set_resource_version(rv)
176+
177+
return True
178+
except Exception as e:
179+
logger.error(f"Reconciliation failed: {e}")
180+
return False
181+
114182
def send_mattermost_notification(event_type: str, namespace: str, service_name: str) -> bool:
115183
"""
116184
Send a notification to Mattermost about the triggered Semaphore webhook.
@@ -254,8 +322,7 @@ def watch_services(timeout_seconds: Optional[int] = None):
254322
resource_version = get_resource_version()
255323
logger.info(f"Starting to watch LoadBalancer services with timeout: {timeout_seconds or 'None'} seconds, resourceVersion: {resource_version or 'None'}...")
256324
start_time = time.time()
257-
update_last_event_time() # Initialize the last event time
258-
touch_health_file() # Initialize health file
325+
touch_health_file()
259326

260327
try:
261328
# Use resourceVersion for resumable watches and enable bookmarks
@@ -308,10 +375,16 @@ def watch_services(timeout_seconds: Optional[int] = None):
308375
# Only process LoadBalancer services
309376
if service_type == 'LoadBalancer':
310377
logger.info(f"LoadBalancer service event: {event_type} - {namespace}/{name}")
378+
service_key = f"{namespace}/{name}"
379+
380+
# Keep known_lb_services in sync
381+
if event_type in ['ADDED', 'MODIFIED']:
382+
known_lb_services.add(service_key)
383+
elif event_type == 'DELETED':
384+
known_lb_services.discard(service_key)
311385

312386
# Trigger webhook for relevant events
313387
if event_type in ['ADDED', 'MODIFIED', 'DELETED']:
314-
service_key = f"{namespace}/{name}"
315388
trigger_semaphore_webhook(event_type, service_key)
316389

317390
# Check if we've been watching for too long and should restart
@@ -389,37 +462,53 @@ def main():
389462

390463
while running:
391464
try:
465+
# Verify API server connectivity before starting a watch cycle
466+
probe_v1 = initialize_kubernetes_client()
467+
if not verify_api_connection(probe_v1):
468+
logger.warning("API server unreachable, will retry...")
469+
attempt += 1
470+
delay = exponential_backoff(attempt)
471+
start_wait = time.time()
472+
while running and time.time() - start_wait < delay:
473+
time.sleep(1)
474+
continue
475+
476+
# Run periodic full reconciliation
477+
if time.time() - last_reconcile_time > RECONCILE_INTERVAL:
478+
logger.info("Starting periodic reconciliation...")
479+
reconcile_services(probe_v1)
480+
392481
# Use MAX_WATCH_TIME for longer watch periods (1 hour)
393482
watch_services(timeout_seconds=MAX_WATCH_TIME)
394-
483+
395484
# Reset attempt counter on successful completion
396485
attempt = 0
397-
486+
398487
except Exception as e:
399488
logger.error(f"Error in watch_services: {str(e)}")
400489
attempt += 1
401490
delay = exponential_backoff(attempt)
402491
logger.info(f"Retrying in {delay:.1f} seconds (attempt {attempt})...")
403-
492+
404493
# Wait with interruption support
405494
start_wait = time.time()
406495
while running and time.time() - start_wait < delay:
407496
time.sleep(1)
408-
497+
409498
# Perform periodic health check
410499
if time.time() - last_health_check > HEALTH_CHECK_INTERVAL:
411500
healthy = perform_health_check()
412501
last_health_check = time.time()
413-
502+
414503
# If not healthy, force a reconnection
415504
if not healthy:
416505
logger.warning("Forcing reconnection due to unhealthy state")
417506
continue
418-
507+
419508
# Small delay to prevent tight loop if watch_services returns immediately
420509
if running:
421510
time.sleep(1)
422-
511+
423512
logger.info("Service watcher shutting down")
424513

425514
if __name__ == "__main__":

0 commit comments

Comments
 (0)