Skip to content

Commit c4c4d09

Browse files
fix(machine-validation): address machine validation stale review feedback (#2911)
Addresses the review feedback on the machine validation stale-run changes (#2838). Summary - Simplified stale timeout clamping logic. - Kept the minimum stale timeout enforcement so healthy runs do not go stale between heartbeats. - Added the stale reconciliation error message to logs. - Added focused DB/API test coverage for stale-run detection and reconciliation. ## Related issues Fixes #454 ## Type of Change <!-- Check one that best describes this PR --> - [ ] **Add** - New feature or capability - [ ] **Change** - Changes in existing functionality - [x] **Fix** - Bug fixes - [ ] **Remove** - Removed features or deprecated functionality - [ ] **Internal** - Internal changes (refactoring, tests, docs, etc.) ## Breaking Changes <!-- If checked, describe the breaking changes and migration steps --> <!-- Breaking changes are not generally permitted, please discuss on a GitHub discussion or with the development team if you believe you need to break a backward compatibility guarantee --> - [ ] **This PR contains breaking changes** ## Testing <!-- How was this tested? Check all that apply --> - [x] Unit tests added/updated - [x] Integration tests added/updated - [ ] Manual testing performed - [ ] No testing required (docs, internal refactor, etc.) ## Additional Notes
1 parent 6d8ab23 commit c4c4d09

4 files changed

Lines changed: 309 additions & 45 deletions

File tree

crates/api-core/src/machine_validation/mod.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,18 +45,17 @@ pub struct MachineValidationManager {
4545
impl MachineValidationManager {
4646
pub fn new(
4747
database_connection: sqlx::PgPool,
48-
config: MachineValidationConfig,
48+
mut config: MachineValidationConfig,
4949
meter: opentelemetry::metrics::Meter,
5050
) -> Self {
51-
let configured_stale_run_timeout = config.stale_run_timeout;
52-
let config = config.with_minimum_stale_run_timeout();
53-
if config.stale_run_timeout != configured_stale_run_timeout {
51+
if config.stale_run_timeout < MachineValidationConfig::MIN_STALE_RUN_TIMEOUT {
5452
tracing::warn!(
55-
configured_stale_run_timeout_seconds = configured_stale_run_timeout.as_secs(),
53+
configured_stale_run_timeout_seconds = config.stale_run_timeout.as_secs(),
5654
minimum_stale_run_timeout_seconds =
5755
MachineValidationConfig::MIN_STALE_RUN_TIMEOUT.as_secs(),
5856
"machine validation stale_run_timeout is below minimum; using minimum"
5957
);
58+
config.stale_run_timeout = MachineValidationConfig::MIN_STALE_RUN_TIMEOUT;
6059
}
6160

6261
let hold_period = config
@@ -399,14 +398,15 @@ async fn reconcile_stale_validation(
399398
record_failed_validation_side_effects(
400399
txn,
401400
&validation,
402-
error_message,
401+
error_message.clone(),
403402
"StaleMachineValidationRun",
404403
)
405404
.await?;
406405

407406
tracing::warn!(
408407
validation_id = %validation.id,
409408
machine_id = %validation.machine_id,
409+
error = %error_message,
410410
"reconciled stale machine validation run"
411411
);
412412

crates/api-db/src/machine_validation.rs

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,3 +368,122 @@ pub async fn mark_machine_validation_complete(
368368

369369
Ok(true)
370370
}
371+
372+
#[cfg(test)]
373+
mod tests {
374+
use std::str::FromStr;
375+
376+
use super::*;
377+
378+
fn test_machine_id() -> MachineId {
379+
MachineId::from_str("fm100htes3rn1npvbtm5qd57dkilaag7ljugl1llmm7rfuq1ov50i0rpl30").unwrap()
380+
}
381+
382+
async fn insert_active_validation(
383+
txn: &mut PgConnection,
384+
start_time: chrono::DateTime<chrono::Utc>,
385+
duration_to_complete: i64,
386+
last_heartbeat_at: Option<chrono::DateTime<chrono::Utc>>,
387+
) -> DatabaseResult<MachineValidationId> {
388+
let id = MachineValidationId::new();
389+
const QUERY: &str = "
390+
INSERT INTO machine_validation (
391+
id,
392+
machine_id,
393+
start_time,
394+
name,
395+
end_time,
396+
context,
397+
total,
398+
completed,
399+
state,
400+
duration_to_complete,
401+
last_heartbeat_at
402+
)
403+
VALUES ($1, $2, $3, $4, NULL, $5, 1, 0, $6, $7, $8)";
404+
405+
sqlx::query(QUERY)
406+
.bind(id)
407+
.bind(test_machine_id())
408+
.bind(start_time)
409+
.bind(format!("Test_{id}"))
410+
.bind("OnDemand")
411+
.bind(MachineValidationState::InProgress.to_string())
412+
.bind(duration_to_complete)
413+
.bind(last_heartbeat_at)
414+
.execute(txn)
415+
.await
416+
.map_err(|e| DatabaseError::query(QUERY, e))?;
417+
418+
Ok(id)
419+
}
420+
421+
#[crate::sqlx_test]
422+
async fn mark_stale_if_active_uses_heartbeat_when_present(
423+
pool: sqlx::PgPool,
424+
) -> Result<(), Box<dyn std::error::Error>> {
425+
let mut txn = pool.begin().await?;
426+
let now = chrono::Utc::now();
427+
let stale_run_timeout = std::time::Duration::from_secs(60);
428+
let status = MachineValidationStatus {
429+
state: MachineValidationState::Failed,
430+
..MachineValidationStatus::default()
431+
};
432+
433+
let fresh_heartbeat = insert_active_validation(
434+
txn.as_mut(),
435+
now - chrono::Duration::minutes(10),
436+
1,
437+
Some(now - chrono::Duration::seconds(30)),
438+
)
439+
.await?;
440+
let stale_heartbeat = insert_active_validation(
441+
txn.as_mut(),
442+
now - chrono::Duration::seconds(30),
443+
1,
444+
Some(now - chrono::Duration::seconds(61)),
445+
)
446+
.await?;
447+
let stale_without_heartbeat =
448+
insert_active_validation(txn.as_mut(), now - chrono::Duration::seconds(120), 1, None)
449+
.await?;
450+
451+
assert!(
452+
mark_stale_if_active(
453+
txn.as_mut(),
454+
&fresh_heartbeat,
455+
stale_run_timeout,
456+
now,
457+
&status,
458+
)
459+
.await?
460+
.is_none()
461+
);
462+
assert_eq!(
463+
mark_stale_if_active(
464+
txn.as_mut(),
465+
&stale_heartbeat,
466+
stale_run_timeout,
467+
now,
468+
&status,
469+
)
470+
.await?
471+
.map(|validation| validation.id),
472+
Some(stale_heartbeat)
473+
);
474+
assert_eq!(
475+
mark_stale_if_active(
476+
txn.as_mut(),
477+
&stale_without_heartbeat,
478+
stale_run_timeout,
479+
now,
480+
&status,
481+
)
482+
.await?
483+
.map(|validation| validation.id),
484+
Some(stale_without_heartbeat)
485+
);
486+
487+
Ok(())
488+
}
489+
}

crates/api-db/src/machine_validation_execution.rs

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -845,3 +845,187 @@ fn truncate_summary(value: &str) -> Option<String> {
845845
Some(value.chars().take(SUMMARY_LIMIT).collect())
846846
}
847847
}
848+
849+
#[cfg(test)]
850+
mod tests {
851+
use std::collections::BTreeSet;
852+
use std::str::FromStr;
853+
854+
use carbide_uuid::machine::MachineId;
855+
856+
use super::*;
857+
858+
fn test_machine_id() -> MachineId {
859+
MachineId::from_str("fm100htes3rn1npvbtm5qd57dkilaag7ljugl1llmm7rfuq1ov50i0rpl30").unwrap()
860+
}
861+
862+
async fn insert_active_validation(
863+
txn: &mut PgConnection,
864+
now: chrono::DateTime<chrono::Utc>,
865+
) -> DatabaseResult<MachineValidationId> {
866+
let id = MachineValidationId::new();
867+
const QUERY: &str = "
868+
INSERT INTO machine_validation (
869+
id,
870+
machine_id,
871+
start_time,
872+
name,
873+
end_time,
874+
context,
875+
total,
876+
completed,
877+
state,
878+
duration_to_complete,
879+
last_heartbeat_at
880+
)
881+
VALUES ($1, $2, $3, $4, NULL, $5, 1, 0, $6, 0, NULL)";
882+
883+
sqlx::query(QUERY)
884+
.bind(id)
885+
.bind(test_machine_id())
886+
.bind(now)
887+
.bind(format!("Test_{id}"))
888+
.bind("OnDemand")
889+
.bind("InProgress")
890+
.execute(txn)
891+
.await
892+
.map_err(|e| DatabaseError::query(QUERY, e))?;
893+
894+
Ok(id)
895+
}
896+
897+
async fn insert_attempt(
898+
txn: &mut PgConnection,
899+
validation_id: &MachineValidationId,
900+
test_id: &str,
901+
order_index: i32,
902+
state: MachineValidationAttemptState,
903+
started_at: Option<chrono::DateTime<chrono::Utc>>,
904+
last_heartbeat_at: Option<chrono::DateTime<chrono::Utc>>,
905+
) -> DatabaseResult<MachineValidationAttemptId> {
906+
let run_item_id = MachineValidationRunItemId::new();
907+
let attempt_id = MachineValidationAttemptId::new();
908+
const RUN_ITEM_QUERY: &str = "
909+
INSERT INTO machine_validation_run_items (
910+
id,
911+
run_id,
912+
test_id,
913+
display_name,
914+
context,
915+
state,
916+
order_index,
917+
attempt,
918+
max_attempts,
919+
timeout_seconds,
920+
started_at,
921+
last_heartbeat_at
922+
)
923+
VALUES ($1, $2, $3, $4, $5, $6, $7, 1, 1, 1, $8, $9)";
924+
925+
sqlx::query(RUN_ITEM_QUERY)
926+
.bind(run_item_id)
927+
.bind(validation_id)
928+
.bind(test_id)
929+
.bind(test_id)
930+
.bind("OnDemand")
931+
.bind(MachineValidationRunItemState::Running.to_string())
932+
.bind(order_index)
933+
.bind(started_at)
934+
.bind(last_heartbeat_at)
935+
.execute(&mut *txn)
936+
.await
937+
.map_err(|e| DatabaseError::query(RUN_ITEM_QUERY, e))?;
938+
939+
const ATTEMPT_QUERY: &str = "
940+
INSERT INTO machine_validation_attempts (
941+
id,
942+
run_item_id,
943+
attempt_number,
944+
state,
945+
command,
946+
args,
947+
started_at,
948+
last_heartbeat_at
949+
)
950+
VALUES ($1, $2, 1, $3, $4, $5, $6, $7)";
951+
952+
sqlx::query(ATTEMPT_QUERY)
953+
.bind(attempt_id)
954+
.bind(run_item_id)
955+
.bind(state.to_string())
956+
.bind("echo")
957+
.bind("ok")
958+
.bind(started_at)
959+
.bind(last_heartbeat_at)
960+
.execute(txn)
961+
.await
962+
.map_err(|e| DatabaseError::query(ATTEMPT_QUERY, e))?;
963+
964+
Ok(attempt_id)
965+
}
966+
967+
#[crate::sqlx_test]
968+
async fn find_stale_active_attempts_respects_heartbeat_and_duration_fallback(
969+
pool: sqlx::PgPool,
970+
) -> Result<(), Box<dyn std::error::Error>> {
971+
let mut txn = pool.begin().await?;
972+
let now = chrono::Utc::now();
973+
let validation_id = insert_active_validation(txn.as_mut(), now).await?;
974+
975+
insert_attempt(
976+
txn.as_mut(),
977+
&validation_id,
978+
"stale-heartbeat",
979+
0,
980+
MachineValidationAttemptState::Running,
981+
Some(now - chrono::Duration::seconds(10)),
982+
Some(now - chrono::Duration::seconds(61)),
983+
)
984+
.await?;
985+
let _fresh_heartbeat_attempt = insert_attempt(
986+
txn.as_mut(),
987+
&validation_id,
988+
"fresh-heartbeat",
989+
1,
990+
MachineValidationAttemptState::Running,
991+
Some(now - chrono::Duration::seconds(10)),
992+
Some(now - chrono::Duration::seconds(30)),
993+
)
994+
.await?;
995+
insert_attempt(
996+
txn.as_mut(),
997+
&validation_id,
998+
"legacy-stale",
999+
2,
1000+
MachineValidationAttemptState::Running,
1001+
Some(now - chrono::Duration::seconds(120)),
1002+
None,
1003+
)
1004+
.await?;
1005+
let _terminal_attempt = insert_attempt(
1006+
txn.as_mut(),
1007+
&validation_id,
1008+
"terminal",
1009+
3,
1010+
MachineValidationAttemptState::Failed,
1011+
Some(now - chrono::Duration::seconds(120)),
1012+
Some(now - chrono::Duration::seconds(120)),
1013+
)
1014+
.await?;
1015+
1016+
let stale_attempts =
1017+
find_stale_active_attempts(txn.as_mut(), std::time::Duration::from_secs(60), now)
1018+
.await?;
1019+
let stale_test_ids = stale_attempts
1020+
.iter()
1021+
.map(|attempt| attempt.test_id.as_str())
1022+
.collect::<BTreeSet<_>>();
1023+
1024+
assert_eq!(
1025+
stale_test_ids,
1026+
BTreeSet::from(["legacy-stale", "stale-heartbeat"])
1027+
);
1028+
1029+
Ok(())
1030+
}
1031+
}

0 commit comments

Comments
 (0)