Skip to content

Commit fe22349

Browse files
Merge pull request #17 from SolidLabResearch/codex/runtime-lifecycle
runtime: harden query lifecycle state handling
2 parents f5cfd66 + bedce0a commit fe22349

5 files changed

Lines changed: 114 additions & 18 deletions

File tree

src/api/janus_api.rs

Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ struct RunningQuery {
9999
historical_handles: Vec<thread::JoinHandle<()>>,
100100
baseline_handle: Option<thread::JoinHandle<()>>,
101101
live_handle: Option<thread::JoinHandle<()>>,
102-
mqtt_subscriber_handle: Option<thread::JoinHandle<()>>,
102+
mqtt_subscriber_handles: Vec<thread::JoinHandle<()>>,
103103
// shutdown sender signals used to stop the workers
104104
shutdown_senders: Vec<Sender<()>>,
105105
// MQTT subscriber instances (for stopping)
@@ -224,13 +224,13 @@ impl JanusApi {
224224
parsed.baseline.as_ref().map(|baseline| baseline.window_name.clone());
225225
let mut historical_handles = Vec::new();
226226
let mut shutdown_senders = Vec::new();
227-
let status = Arc::new(RwLock::new(
227+
let initial_status =
228228
if !parsed.live_windows.is_empty() && !parsed.historical_windows.is_empty() {
229229
ExecutionStatus::WarmingBaseline
230230
} else {
231231
ExecutionStatus::Running
232-
},
233-
));
232+
};
233+
let status = Arc::new(RwLock::new(initial_status.clone()));
234234

235235
// 4. Spawn historical worker threads (one per historical window)
236236
for (i, window) in parsed.historical_windows.iter().enumerate() {
@@ -308,7 +308,7 @@ impl JanusApi {
308308

309309
// 5. Spawn live worker thread and MQTT subscribers (if there are live windows)
310310
let mut mqtt_subscribers = Vec::new();
311-
let mut mqtt_subscriber_handle = None;
311+
let mut mqtt_subscriber_handles = Vec::new();
312312
let mut baseline_handle = None;
313313

314314
let live_handle = if !parsed.live_windows.is_empty() && !parsed.rspql_query.is_empty() {
@@ -354,6 +354,8 @@ impl JanusApi {
354354
let parsed_clone = parsed.clone();
355355
let processor_for_baseline = Arc::clone(&live_processor);
356356
let status_for_baseline = Arc::clone(&status);
357+
let registry_for_baseline = Arc::clone(&self.registry);
358+
let query_id_for_baseline = query_id.clone();
357359
let baseline_mode = effective_baseline_mode;
358360
let baseline_window = effective_baseline_window.clone();
359361
let (baseline_shutdown_tx, baseline_shutdown_rx) = mpsc::channel::<()>();
@@ -390,12 +392,16 @@ impl JanusApi {
390392
*state = ExecutionStatus::Running;
391393
}
392394
}
395+
let _ = registry_for_baseline
396+
.set_status(&query_id_for_baseline, "Running");
393397
}
394398
Err(err) => {
395399
eprintln!("Async baseline warm-up error: {}", err);
396400
if let Ok(mut state) = status_for_baseline.write() {
397401
*state = ExecutionStatus::Failed(err.to_string());
398402
}
403+
let _ = registry_for_baseline
404+
.set_status(&query_id_for_baseline, format!("Failed({err})"));
399405
}
400406
}
401407
}));
@@ -431,7 +437,7 @@ impl JanusApi {
431437
});
432438

433439
mqtt_subscribers.push(subscriber);
434-
mqtt_subscriber_handle = Some(sub_handle);
440+
mqtt_subscriber_handles.push(sub_handle);
435441
}
436442

437443
// Spawn live worker thread to receive results
@@ -470,6 +476,21 @@ impl JanusApi {
470476
None
471477
};
472478

479+
self.registry.increment_execution_count(query_id).map_err(|e| {
480+
JanusApiError::RegistryError(format!(
481+
"Failed to increment execution count for '{}': {}",
482+
query_id, e
483+
))
484+
})?;
485+
self.registry
486+
.set_status(query_id, format!("{:?}", initial_status))
487+
.map_err(|e| {
488+
JanusApiError::RegistryError(format!(
489+
"Failed to update query status for '{}': {}",
490+
query_id, e
491+
))
492+
})?;
493+
473494
// 6. Store running query information
474495
let running = RunningQuery {
475496
metadata,
@@ -479,7 +500,7 @@ impl JanusApi {
479500
historical_handles,
480501
baseline_handle,
481502
live_handle,
482-
mqtt_subscriber_handle,
503+
mqtt_subscriber_handles,
483504
shutdown_senders,
484505
mqtt_subscribers,
485506
};
@@ -506,6 +527,7 @@ impl JanusApi {
506527
let running = running_map.remove(query_id).ok_or_else(|| {
507528
JanusApiError::ExecutionError(format!("Query '{}' is not running", query_id))
508529
})?;
530+
drop(running_map);
509531

510532
// Send shutdown signals
511533
for shutdown_tx in running.shutdown_senders {
@@ -521,6 +543,25 @@ impl JanusApi {
521543
if let Ok(mut status) = running.status.write() {
522544
*status = ExecutionStatus::Stopped;
523545
}
546+
self.registry.set_status(query_id, "Stopped").map_err(|e| {
547+
JanusApiError::RegistryError(format!(
548+
"Failed to update query status for '{}': {}",
549+
query_id, e
550+
))
551+
})?;
552+
553+
for handle in running.historical_handles {
554+
let _ = handle.join();
555+
}
556+
if let Some(handle) = running.baseline_handle {
557+
let _ = handle.join();
558+
}
559+
if let Some(handle) = running.live_handle {
560+
let _ = handle.join();
561+
}
562+
for handle in running.mqtt_subscriber_handles {
563+
let _ = handle.join();
564+
}
524565

525566
Ok(())
526567
}

src/http/server.rs

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -296,15 +296,6 @@ async fn get_query(
296296
.ok_or_else(|| ApiError::NotFound(format!("Query '{}' not found", query_id)))?;
297297

298298
let is_running = state.janus_api.is_running(&query_id);
299-
let status = if is_running {
300-
state
301-
.janus_api
302-
.get_query_status(&query_id)
303-
.map(|s| format!("{:?}", s))
304-
.unwrap_or_else(|| "Unknown".to_string())
305-
} else {
306-
"Registered".to_string()
307-
};
308299

309300
Ok(Json(QueryDetailsResponse {
310301
query_id: metadata.query_id,
@@ -313,7 +304,7 @@ async fn get_query(
313304
registered_at: metadata.registered_at,
314305
execution_count: metadata.execution_count,
315306
is_running,
316-
status,
307+
status: metadata.status,
317308
}))
318309
}
319310

src/registry/query_registry.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ pub struct QueryMetadata {
1515
pub baseline_mode: BaselineBootstrapMode,
1616
pub registered_at: u64,
1717
pub execution_count: u64,
18+
pub status: String,
1819
pub subscribers: Vec<QueryId>,
1920
}
2021

@@ -102,6 +103,7 @@ impl QueryRegistry {
102103
baseline_mode,
103104
registered_at: Self::current_timestamp(),
104105
execution_count: 0,
106+
status: "Registered".to_string(),
105107
subscribers: Vec::new(),
106108
};
107109

@@ -145,6 +147,20 @@ impl QueryRegistry {
145147
Ok(())
146148
}
147149

150+
pub fn set_status(
151+
&self,
152+
query_id: &QueryId,
153+
status: impl Into<String>,
154+
) -> Result<(), QueryRegistryError> {
155+
let mut queries = self.queries.write().unwrap();
156+
let query = queries
157+
.get_mut(query_id)
158+
.ok_or_else(|| QueryRegistryError::QueryNotFound(query_id.clone()))?;
159+
160+
query.status = status.into();
161+
Ok(())
162+
}
163+
148164
/// To remove a query from the registry
149165
pub fn unregister(&self, query_id: &QueryId) -> Result<QueryMetadata, QueryRegistryError> {
150166
let mut queries = self.queries.write().unwrap();

tests/http_server_integration_test.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,8 @@ async fn test_stop_route_stops_running_query_and_delete_requires_stop() {
226226
assert!(get_response.status().is_success());
227227
let get_body: Value = get_response.json().await.expect("invalid get response");
228228
assert_eq!(get_body["is_running"], false);
229-
assert_eq!(get_body["status"], "Registered");
229+
assert_eq!(get_body["status"], "Stopped");
230+
assert_eq!(get_body["execution_count"], 1);
230231

231232
let delete_response = server
232233
.client

tests/janus_api_integration_test.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,53 @@ fn test_stop_query() {
355355
);
356356
}
357357

358+
#[test]
359+
fn test_execution_count_and_status_update_across_lifecycle() {
360+
let storage = Arc::new(
361+
StreamingSegmentedStorage::new(StreamingConfig::default())
362+
.expect("Failed to create storage"),
363+
);
364+
let parser = JanusQLParser::new().expect("Failed to create parser");
365+
let registry = Arc::new(QueryRegistry::new());
366+
367+
let api = JanusApi::new(parser, Arc::clone(&registry), storage).expect("Failed to create API");
368+
369+
let janusql = r#"
370+
PREFIX ex: <http://example.org/>
371+
SELECT ?s
372+
FROM NAMED WINDOW ex:w ON STREAM ex:stream1 [RANGE 1000 STEP 200]
373+
WHERE { WINDOW ex:w { ?s ?p ?o } }
374+
"#;
375+
376+
let metadata = api
377+
.register_query("lifecycle_query".into(), janusql)
378+
.expect("Failed to register query");
379+
assert_eq!(metadata.execution_count, 0);
380+
assert_eq!(metadata.status, "Registered");
381+
382+
let _handle = api.start_query(&"lifecycle_query".into()).expect("Failed to start query");
383+
384+
let after_start =
385+
registry.get(&"lifecycle_query".into()).expect("query should exist after start");
386+
assert_eq!(after_start.execution_count, 1);
387+
assert_eq!(after_start.status, "Running");
388+
389+
api.stop_query(&"lifecycle_query".into()).expect("Failed to stop query");
390+
391+
let after_stop =
392+
registry.get(&"lifecycle_query".into()).expect("query should exist after stop");
393+
assert_eq!(after_stop.execution_count, 1);
394+
assert_eq!(after_stop.status, "Stopped");
395+
396+
let _handle = api.start_query(&"lifecycle_query".into()).expect("Failed to restart query");
397+
398+
let after_restart = registry
399+
.get(&"lifecycle_query".into())
400+
.expect("query should exist after restart");
401+
assert_eq!(after_restart.execution_count, 2);
402+
assert_eq!(after_restart.status, "Running");
403+
}
404+
358405
#[test]
359406
fn test_multiple_queries_concurrent() {
360407
let storage = create_test_storage_with_data().expect("Failed to create storage");

0 commit comments

Comments
 (0)