Skip to content

Commit 028b6fe

Browse files
authored
add debug logging for healthcheck and setup flows in coglet (#2828)
* add debug logging for healthcheck and setup flows in coglet Add tracing::debug! calls throughout the healthcheck and setup code paths to improve observability when debugging with RUST_LOG=debug or COG_LOG_LEVEL=debug. Healthcheck: log the full request lifecycle from HTTP handler through service layer, orchestrator handle, event loop dispatch/coalescing, worker handler invocation, and result distribution. Setup: log orchestrator config, setup duration, timeout behavior, health state transitions, setup result status, and setup log drain sizes across orchestrator, worker, and Python entry point. * demote healthcheck hot-path logging from debug to trace The healthcheck endpoint is polled every few seconds by K8s probes. Keep the happy-path lines at trace to avoid flooding debug logs, while leaving the unhealthy/error/timeout paths and all setup lines at debug where they are useful for diagnosing real issues.
1 parent fd95417 commit 028b6fe

5 files changed

Lines changed: 157 additions & 26 deletions

File tree

crates/coglet-python/src/lib.rs

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,10 @@ fn serve_subprocess(
345345
);
346346

347347
let setup_timeout = read_setup_timeout();
348+
debug!(
349+
setup_timeout_secs = setup_timeout.map(|d| d.as_secs()),
350+
is_train, "Orchestrator configuration"
351+
);
348352
let orch_config = coglet_core::orchestrator::OrchestratorConfig::new(pred_ref)
349353
.with_num_slots(max_concurrency)
350354
.with_train(is_train)
@@ -369,27 +373,42 @@ fn serve_subprocess(
369373
let setup_service = Arc::clone(&service_clone);
370374
tokio::spawn(async move {
371375
info!("Spawning worker subprocess");
376+
let spawn_start = std::time::Instant::now();
372377
match coglet_core::orchestrator::spawn_worker(orch_config, &mut setup_log_rx).await
373378
{
374379
Ok(ready) => {
375-
debug!("Worker ready, configuring service");
380+
let spawn_elapsed = spawn_start.elapsed();
381+
debug!(
382+
elapsed_ms = spawn_elapsed.as_millis() as u64,
383+
"Worker ready, configuring service"
384+
);
376385

377386
let num_slots = ready.handle.slot_ids().len();
387+
debug!(num_slots, "Setting up orchestrator on service");
378388

379389
setup_service
380390
.set_orchestrator(ready.pool, Arc::new(ready.handle))
381391
.await;
392+
debug!("Transitioning health to Ready");
382393
setup_service.set_health(Health::Ready).await;
383394

384395
if let Some(s) = ready.schema {
396+
debug!("Setting OpenAPI schema on service");
385397
setup_service.set_schema(s).await;
398+
} else {
399+
debug!("No OpenAPI schema provided by worker");
386400
}
387401

388402
let mode = if is_train { "train" } else { "predict" };
389403
info!(num_slots, mode, "Server ready");
390404

391405
// Drain final logs (includes "Server ready" above)
392406
let final_logs = coglet_core::drain_accumulated_logs(&mut setup_log_rx);
407+
debug!(
408+
initial_logs_len = ready.setup_logs.len(),
409+
final_logs_len = final_logs.len(),
410+
"Drained setup logs"
411+
);
393412
drop(setup_log_rx);
394413

395414
// Combine initial + final logs
@@ -401,7 +420,13 @@ fn serve_subprocess(
401420
info!("Setup complete, now accepting requests");
402421
}
403422
Err(e) => {
404-
error!(error = %e, "Worker initialization failed");
423+
let spawn_elapsed = spawn_start.elapsed();
424+
error!(
425+
error = %e,
426+
elapsed_ms = spawn_elapsed.as_millis() as u64,
427+
"Worker initialization failed"
428+
);
429+
debug!("Transitioning health to SetupFailed");
405430
setup_service.set_health(Health::SetupFailed).await;
406431
setup_service
407432
.set_setup_result(setup_result.failed(e.to_string()))

crates/coglet/src/orchestrator.rs

Lines changed: 59 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,7 @@ impl Orchestrator for OrchestratorHandle {
389389
}
390390

391391
async fn healthcheck(&self) -> Result<HealthcheckResult, OrchestratorError> {
392+
tracing::trace!("Healthcheck requested via orchestrator handle");
392393
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
393394

394395
// Send our channel to the event loop. If a healthcheck is already
@@ -403,11 +404,20 @@ impl Orchestrator for OrchestratorHandle {
403404
// If we time out, the healthcheck keeps running — our sender just gets a
404405
// silent failure when the event loop eventually broadcasts.
405406
match tokio::time::timeout(Duration::from_secs(10), response_rx).await {
406-
Ok(Ok(result)) => Ok(result),
407-
Ok(Err(_)) => Err(OrchestratorError::Protocol(
408-
"healthcheck response channel dropped".to_string(),
409-
)),
410-
Err(_) => Ok(HealthcheckResult::unhealthy("healthcheck timed out")),
407+
Ok(Ok(result)) => {
408+
tracing::trace!(healthy = result.is_healthy(), "Healthcheck completed");
409+
Ok(result)
410+
}
411+
Ok(Err(_)) => {
412+
tracing::debug!("Healthcheck response channel dropped");
413+
Err(OrchestratorError::Protocol(
414+
"healthcheck response channel dropped".to_string(),
415+
))
416+
}
417+
Err(_) => {
418+
tracing::debug!("Healthcheck timed out after 10s");
419+
Ok(HealthcheckResult::unhealthy("healthcheck timed out"))
420+
}
411421
}
412422
}
413423

@@ -564,15 +574,37 @@ pub async fn spawn_worker(
564574
};
565575

566576
let (slot_ids, schema) = match config.setup_timeout {
567-
Some(timeout) => match tokio::time::timeout(timeout, setup_fut).await {
568-
Ok(Ok((slots, schema))) => (slots, schema),
569-
Ok(Err(e)) => return Err(e),
570-
Err(_) => return Err(OrchestratorError::SetupTimeout),
571-
},
572-
None => setup_fut.await?,
577+
Some(timeout) => {
578+
tracing::debug!(
579+
timeout_secs = timeout.as_secs(),
580+
"Waiting for setup with timeout"
581+
);
582+
match tokio::time::timeout(timeout, setup_fut).await {
583+
Ok(Ok((slots, schema))) => {
584+
tracing::debug!(num_slots = slots.len(), "Setup completed within timeout");
585+
(slots, schema)
586+
}
587+
Ok(Err(e)) => {
588+
tracing::debug!(error = %e, "Setup failed");
589+
return Err(e);
590+
}
591+
Err(_) => {
592+
tracing::debug!(timeout_secs = timeout.as_secs(), "Setup timed out");
593+
return Err(OrchestratorError::SetupTimeout);
594+
}
595+
}
596+
}
597+
None => {
598+
tracing::debug!("Waiting for setup with no timeout");
599+
setup_fut.await?
600+
}
573601
};
574602

575603
let setup_logs = crate::setup_log_accumulator::drain_accumulated_logs(setup_log_rx);
604+
tracing::debug!(
605+
setup_logs_len = setup_logs.len(),
606+
"Drained accumulated setup logs"
607+
);
576608

577609
tracing::debug!(num_slots = slot_ids.len(), "Worker ready");
578610

@@ -786,6 +818,12 @@ async fn run_event_loop(
786818
);
787819
}
788820
Some(Ok(ControlResponse::HealthcheckResult { id: _, status, error })) => {
821+
tracing::trace!(
822+
?status,
823+
?error,
824+
pending_count = pending_healthchecks.len(),
825+
"Received healthcheck result from worker"
826+
);
789827
if pending_healthchecks.is_empty() {
790828
tracing::warn!("Received healthcheck result but no pending requests");
791829
} else {
@@ -795,6 +833,10 @@ async fn run_event_loop(
795833
HealthcheckResult::unhealthy(error.unwrap_or_else(|| "unhealthy".to_string()))
796834
}
797835
};
836+
tracing::trace!(
837+
pending_count = pending_healthchecks.len(),
838+
"Distributing healthcheck result to pending callers"
839+
);
798840
for tx in pending_healthchecks.drain(..) {
799841
let _ = tx.send(result.clone());
800842
}
@@ -834,6 +876,7 @@ async fn run_event_loop(
834876
if !in_flight {
835877
healthcheck_counter += 1;
836878
let hc_id = format!("hc_{}", healthcheck_counter);
879+
tracing::trace!(%hc_id, "Sending healthcheck request to worker");
837880

838881
let mut writer = ctrl_writer.lock().await;
839882
if let Err(e) = writer.send(ControlRequest::Healthcheck { id: hc_id }).await {
@@ -843,6 +886,11 @@ async fn run_event_loop(
843886
let _ = tx.send(result.clone());
844887
}
845888
}
889+
} else {
890+
tracing::trace!(
891+
pending_count = pending_healthchecks.len(),
892+
"Healthcheck already in-flight, coalescing request"
893+
);
846894
}
847895
}
848896

crates/coglet/src/service.rs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,14 @@ impl PredictionService {
258258
None => (0, 0),
259259
};
260260

261+
tracing::trace!(
262+
?state,
263+
available_slots,
264+
total_slots,
265+
setup_status = ?setup_result.as_ref().map(|r| r.status),
266+
"Building health snapshot"
267+
);
268+
261269
HealthSnapshot {
262270
state,
263271
available_slots,
@@ -275,10 +283,19 @@ impl PredictionService {
275283
tracing::warn!("Attempted to set READY without orchestrator, ignoring");
276284
return;
277285
}
286+
let previous = *self.health.read().await;
287+
tracing::debug!(from = ?previous, to = ?health, "Health state transition");
278288
*self.health.write().await = health;
279289
}
280290

281291
pub async fn set_setup_result(&self, result: SetupResult) {
292+
tracing::debug!(
293+
status = ?result.status,
294+
started_at = %result.started_at,
295+
completed_at = ?result.completed_at,
296+
logs_len = result.logs.len(),
297+
"Setting setup result"
298+
);
282299
*self.setup_result.write().await = Some(result);
283300
}
284301

@@ -349,9 +366,16 @@ impl PredictionService {
349366
&self,
350367
) -> Result<HealthcheckResult, crate::orchestrator::OrchestratorError> {
351368
if let Some(ref state) = *self.orchestrator.read().await {
352-
state.orchestrator.healthcheck().await
369+
tracing::trace!("Dispatching healthcheck to orchestrator");
370+
let result = state.orchestrator.healthcheck().await;
371+
tracing::trace!(
372+
healthy = result.as_ref().map(|r| r.is_healthy()).unwrap_or(false),
373+
error = ?result.as_ref().ok().and_then(|r| r.error.as_ref()),
374+
"Healthcheck result from orchestrator"
375+
);
376+
result
353377
} else {
354-
// No orchestrator = not ready, return healthy (healthcheck not applicable)
378+
tracing::debug!("No orchestrator configured, returning default healthy");
355379
Ok(HealthcheckResult::healthy())
356380
}
357381
}

crates/coglet/src/transport/http/routes.rs

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -99,27 +99,45 @@ fn generate_prediction_id() -> String {
9999
}
100100

101101
async fn health_check(State(service): State<Arc<PredictionService>>) -> Json<HealthCheckResponse> {
102+
tracing::trace!("Health check endpoint called");
102103
let snapshot = service.health().await;
104+
tracing::trace!(
105+
state = ?snapshot.state,
106+
available_slots = snapshot.available_slots,
107+
total_slots = snapshot.total_slots,
108+
has_setup_result = snapshot.setup_result.is_some(),
109+
"Health snapshot retrieved"
110+
);
103111

104112
// Run user healthcheck if ready (even when busy — healthcheck health
105113
// and slot availability are orthogonal concerns).
106114
let user_healthcheck_error = if snapshot.is_ready() {
107115
write_readiness_file();
108116

109117
// Run user-defined healthcheck
118+
tracing::trace!("Running user-defined healthcheck");
110119
match service.healthcheck().await {
111-
Ok(result) if result.is_healthy() => None,
112-
Ok(result) => result.error,
113-
Err(e) => Some(format!("Healthcheck error: {}", e)),
120+
Ok(result) if result.is_healthy() => {
121+
tracing::trace!("User healthcheck passed");
122+
None
123+
}
124+
Ok(result) => {
125+
tracing::debug!(error = ?result.error, "User healthcheck reported unhealthy");
126+
result.error
127+
}
128+
Err(e) => {
129+
tracing::debug!(error = %e, "User healthcheck returned error");
130+
Some(format!("Healthcheck error: {}", e))
131+
}
114132
}
115133
} else {
134+
tracing::trace!(state = ?snapshot.state, "Skipping user healthcheck (not ready)");
116135
None
117136
};
118137

119-
Json(HealthCheckResponse::from_snapshot(
120-
snapshot,
121-
user_healthcheck_error,
122-
))
138+
let response = HealthCheckResponse::from_snapshot(snapshot, user_healthcheck_error);
139+
tracing::trace!(status = ?response.status, "Health check response");
140+
Json(response)
123141
}
124142

125143
/// Write /var/run/cog/ready for K8s readiness probe.

crates/coglet/src/worker.rs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -544,21 +544,31 @@ pub async fn run_worker<H: PredictHandler>(
544544

545545
// Run setup
546546
tracing::info!("Worker starting setup");
547+
let setup_start = std::time::Instant::now();
547548
let setup_result = handler.setup().await;
548-
tracing::trace!("Setup handler returned");
549+
let setup_elapsed = setup_start.elapsed();
550+
tracing::debug!(
551+
elapsed_ms = setup_elapsed.as_millis() as u64,
552+
success = setup_result.is_ok(),
553+
"Setup handler returned"
554+
);
549555

550556
// Unregister Python's setup sender, but keep log_forwarder running
551557
// The fd_redirect capture threads will continue sending subprocess logs
552558
if let Some(cleanup) = setup_cleanup {
553-
tracing::trace!("Running cleanup (unregistering Python setup sender)");
559+
tracing::debug!("Running cleanup (unregistering Python setup sender)");
554560
cleanup();
555561
}
556562
// Note: We DON'T drop setup_log_tx or wait for log_forwarder
557563
// The log_forwarder continues running to forward subprocess output throughout worker lifetime
558564

559565
// Handle setup failure
560566
if let Err(e) = setup_result {
561-
tracing::error!(error = %e, "Setup failed");
567+
tracing::error!(
568+
error = %e,
569+
elapsed_ms = setup_elapsed.as_millis() as u64,
570+
"Setup failed"
571+
);
562572
let slot = slot_ids.first().copied().unwrap_or_else(SlotId::new);
563573
let mut w = ctrl_writer.lock().await;
564574
let _ = w
@@ -662,8 +672,14 @@ pub async fn run_worker<H: PredictHandler>(
662672
break;
663673
}
664674
Some(Ok(ControlRequest::Healthcheck { id })) => {
665-
tracing::debug!(%id, "Healthcheck requested");
675+
tracing::trace!(%id, "Healthcheck requested, invoking handler");
666676
let result = handler.healthcheck().await;
677+
tracing::trace!(
678+
%id,
679+
status = ?result.status,
680+
error = ?result.error,
681+
"Healthcheck handler returned"
682+
);
667683
let mut w = ctrl_writer.lock().await;
668684
let _ = w.send(ControlResponse::HealthcheckResult {
669685
id,

0 commit comments

Comments
 (0)