Skip to content

Commit 7544e78

Browse files
authored
engine: Timeout signal (#10)
* fix sessions * fix sessions fmt * timeout signal
1 parent bd2b7c9 commit 7544e78

3 files changed

Lines changed: 106 additions & 3 deletions

File tree

crates/embucketd/src/cli.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,14 @@ pub struct CliOpts {
152152
help = "Tracing span processor"
153153
)]
154154
pub tracing_span_processor: TracingSpanProcessor,
155+
156+
#[arg(
157+
long,
158+
env = "IDLE_TIMEOUT_SECONDS",
159+
default_value = "18000",
160+
help = "Service idle timeout in seconds"
161+
)]
162+
pub timeout: Option<u64>,
155163
}
156164

157165
#[derive(Debug, Clone, ValueEnum)]

crates/embucketd/src/main.rs

Lines changed: 63 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use axum::{
2121
use catalog_metastore::InMemoryMetastore;
2222
use clap::Parser;
2323
use dotenv::dotenv;
24-
use executor::service::CoreExecutionService;
24+
use executor::service::{CoreExecutionService, ExecutionService, TIMEOUT_SIGNAL_INTERVAL_SECONDS};
2525
use executor::utils::Config as ExecutionConfig;
2626
use opentelemetry::trace::TracerProvider;
2727
use opentelemetry_sdk::Resource;
@@ -218,8 +218,9 @@ async fn async_main(
218218
.expect("Failed to bind to address");
219219
let addr = listener.local_addr().expect("Failed to get local address");
220220
tracing::info!(%addr, "Listening on http");
221+
let timeout = opts.timeout.unwrap();
221222
axum::serve(listener, router)
222-
.with_graceful_shutdown(shutdown_signal())
223+
.with_graceful_shutdown(shutdown_signal(execution_svc.clone(), timeout))
223224
.await
224225
.expect("Failed to start server");
225226

@@ -337,7 +338,7 @@ fn setup_tracing(opts: &cli::CliOpts) -> SdkTracerProvider {
337338
clippy::redundant_pub_crate,
338339
clippy::cognitive_complexity
339340
)]
340-
async fn shutdown_signal() {
341+
async fn shutdown_signal(execution_svc: Arc<dyn ExecutionService>, timeout: u64) {
341342
let ctrl_c = async {
342343
signal::ctrl_c()
343344
.await
@@ -355,13 +356,21 @@ async fn shutdown_signal() {
355356
#[cfg(not(unix))]
356357
let terminate = std::future::pending::<()>();
357358

359+
let timeout = execution_svc.timeout_signal(
360+
tokio::time::Duration::from_secs(TIMEOUT_SIGNAL_INTERVAL_SECONDS),
361+
tokio::time::Duration::from_secs(timeout),
362+
);
363+
358364
tokio::select! {
359365
() = ctrl_c => {
360366
tracing::warn!("Ctrl+C received, starting graceful shutdown");
361367
},
362368
() = terminate => {
363369
tracing::warn!("SIGTERM received, starting graceful shutdown");
364370
},
371+
() = timeout => {
372+
tracing::warn!("No sessions in use & no running queries - timeout, starting graceful shutdown");
373+
}
365374
}
366375

367376
tracing::warn!("signal received, starting graceful shutdown");
@@ -371,3 +380,54 @@ async fn shutdown_signal() {
371380
#[derive(OpenApi)]
372381
#[openapi()]
373382
pub struct ApiDoc;
383+
384+
#[cfg(test)]
385+
mod tests {
386+
use api_snowflake_rest_sessions::session::SessionStore;
387+
use executor::models::QueryContext;
388+
use executor::service::ExecutionService;
389+
use executor::service::make_test_execution_svc;
390+
use executor::session::to_unix;
391+
use std::sync::atomic::Ordering;
392+
use std::time::Duration;
393+
use time::OffsetDateTime;
394+
395+
#[tokio::test]
396+
#[allow(clippy::expect_used, clippy::too_many_lines)]
397+
async fn test_timeout_signal() {
398+
let execution_svc = make_test_execution_svc().await;
399+
400+
let df_session_id = "fasfsafsfasafsass".to_string();
401+
let user_session = execution_svc
402+
.create_session(&df_session_id)
403+
.await
404+
.expect("Failed to create a session");
405+
406+
execution_svc
407+
.query(&df_session_id, "SELECT SLEEP(5)", QueryContext::default())
408+
.await
409+
.expect("Failed to execute query (session deleted)");
410+
411+
user_session
412+
.expiry
413+
.store(to_unix(OffsetDateTime::now_utc()), Ordering::Relaxed);
414+
415+
let session_store = SessionStore::new(execution_svc.clone());
416+
417+
tokio::task::spawn({
418+
let session_store = session_store.clone();
419+
async move {
420+
session_store
421+
.continuously_delete_expired(Duration::from_secs(1))
422+
.await;
423+
}
424+
});
425+
426+
let timeout = execution_svc.timeout_signal(Duration::from_secs(1), Duration::from_secs(3));
427+
tokio::select! {
428+
() = timeout => {
429+
tracing::warn!("No sessions in use & no running queries - timeout, starting graceful shutdown");
430+
}
431+
}
432+
}
433+
}

crates/executor/src/service.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ use uuid::Uuid;
4242

4343
const DEFAULT_SCHEMA: &str = "public";
4444

45+
pub const TIMEOUT_SIGNAL_INTERVAL_SECONDS: u64 = 60;
46+
4547
#[async_trait::async_trait]
4648
pub trait ExecutionService: Send + Sync {
4749
async fn create_session(&self, session_id: &str) -> Result<Arc<UserSession>>;
@@ -127,6 +129,8 @@ pub trait ExecutionService: Send + Sync {
127129
file_name: &str,
128130
format: Format,
129131
) -> Result<usize>;
132+
133+
async fn timeout_signal(&self, interval: Duration, idle_timeout: Duration) -> ();
130134
}
131135

132136
pub struct CoreExecutionService {
@@ -767,6 +771,37 @@ impl ExecutionService for CoreExecutionService {
767771

768772
Ok(rows_loaded)
769773
}
774+
775+
async fn timeout_signal(&self, interval: Duration, idle_timeout: Duration) -> () {
776+
let mut interval = tokio::time::interval(interval);
777+
interval.tick().await; // The first tick completes immediately; skip.
778+
let mut idle_since: Option<std::time::Instant> = None;
779+
loop {
780+
interval.tick().await;
781+
let sessions_empty = {
782+
let sessions = self.df_sessions.read().await;
783+
sessions.is_empty()
784+
};
785+
let queries_empty = self.queries.count() == 0;
786+
let idle_now = sessions_empty && queries_empty;
787+
match (idle_now, idle_since) {
788+
(true, None) => {
789+
// just entered idle
790+
idle_since = Some(std::time::Instant::now());
791+
}
792+
(true, Some(since)) => {
793+
if since.elapsed() >= idle_timeout {
794+
// stayed idle long enough
795+
return;
796+
}
797+
}
798+
(false, _) => {
799+
// became active again, reset the idle window
800+
idle_since = None;
801+
}
802+
}
803+
}
804+
}
770805
}
771806

772807
//Test environment

0 commit comments

Comments
 (0)