From f1f43428ad0f93a639ea7e7dc1710fae4b455ae7 Mon Sep 17 00:00:00 2001 From: Richard Markiewicz Date: Fri, 8 May 2026 09:44:32 -0400 Subject: [PATCH] feat(dgw): refuse JREC push when recording storage is full --- README.md | 4 ++ crates/transport/src/ws.rs | 24 ++++++++++ devolutions-gateway/src/api/jrec.rs | 65 +++++++++++++++++++++++++--- devolutions-gateway/src/config.rs | 10 +++++ devolutions-gateway/src/recording.rs | 37 ++++++++++++++-- devolutions-gateway/tests/config.rs | 6 +++ 6 files changed, 136 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 563a6b834..70cea9c2e 100644 --- a/README.md +++ b/README.md @@ -170,6 +170,10 @@ Stable options are: - **RecordingPath** (_FilePath_): Path to the recordings folder. +- **MinRecordingStorageFreeSpace** (_Integer_): Minimum free space (in bytes) on the recording storage volume below which + `/jet/jrec/push/{id}` rejects new recording streams with HTTP 507 Insufficient Storage. Omit to skip this threshold + check; the gateway can still return 507 independently when the recording storage is not writable. + - **Ngrok** (_Object_): JSON object describing the ngrok configuration for ingress listeners. * **AuthToken** (_String_): Specifies the authentication token used to connect to the ngrok service. diff --git a/crates/transport/src/ws.rs b/crates/transport/src/ws.rs index ba1456739..6faee95ef 100644 --- a/crates/transport/src/ws.rs +++ b/crates/transport/src/ws.rs @@ -173,6 +173,30 @@ impl CloseWebSocketHandle { }) .await; } + + /// Sends a close frame with an application-specific code in the private range (4000–4999). + /// + /// Clients dispatch on `code`; no reason text is sent. If `code` is outside the private + /// range (e.g. a typo or a reserved code such as 1005/1006 which RFC 6455 §7.4.1 forbids + /// sending on the wire), this falls back to 1011 to keep the protocol valid. + pub async fn app_close(self, code: u16) { + let safe_code = if (4000..=4999).contains(&code) { + code + } else { + tracing::error!( + code, + "app_close called with a code outside the private range; falling back to 1011" + ); + 1011 + }; + let _ = self + .sender + .send(WsCloseFrame { + code: safe_code, + message: String::new(), + }) + .await; + } } /// A background "sentinel" task responsible for keeping the WebSocket connection alive diff --git a/devolutions-gateway/src/api/jrec.rs b/devolutions-gateway/src/api/jrec.rs index d47d2d413..5dcd1b053 100644 --- a/devolutions-gateway/src/api/jrec.rs +++ b/devolutions-gateway/src/api/jrec.rs @@ -17,9 +17,10 @@ use tracing::Instrument as _; use uuid::Uuid; use crate::DgwState; +use crate::api::heartbeat::recording_storage_health; use crate::extract::{JrecToken, RecordingDeleteScope, RecordingsReadScope}; use crate::http::{HttpError, HttpErrorBuilder}; -use crate::recording::RecordingMessageSender; +use crate::recording::{PushOutcome, RecordingMessageSender}; use crate::token::{JrecTokenClaims, RecordingFileType, RecordingOperation}; pub fn make_router(state: DgwState) -> Router { @@ -65,6 +66,46 @@ async fn jrec_push( return Err(HttpError::forbidden().msg("expected push operation")); } + let conf = conf_handle.get_conf(); + + // Pre-flight: refuse the upgrade up-front when the recording storage cannot accept + // a new stream. Returning HTTP 507 here gives the client a clear, actionable status + // before the WebSocket is even established, so it can avoid a doomed session entirely. + // + // The recording directory is created lazily by design (see #1746) so that disk-space + // reporting still works on a not-yet-mounted volume. Ensure it exists for the probe + // below; if creation itself fails, surface that as 507. + if let Err(error) = tokio::fs::create_dir_all(&conf.recording_path).await { + warn!( + client = %source_addr, + %session_id, + %error, + "Refusing JREC push: failed to ensure recording storage directory" + ); + return Err(HttpErrorBuilder::new(StatusCode::INSUFFICIENT_STORAGE).msg("recording storage is not accessible")); + } + + let storage = recording_storage_health(conf.recording_path.as_std_path()); + if !storage.recording_storage_is_writeable { + warn!(client = %source_addr, %session_id, "Refusing JREC push: recording storage is not writable"); + return Err(HttpErrorBuilder::new(StatusCode::INSUFFICIENT_STORAGE).msg("recording storage is not writable")); + } + if let (Some(min), Some(available)) = ( + conf.min_recording_storage_free_space, + storage.recording_storage_available_space, + ) && available < min + { + warn!( + client = %source_addr, + %session_id, + available_bytes = available, + min_bytes = min, + "Refusing JREC push: free space below configured minimum" + ); + return Err(HttpErrorBuilder::new(StatusCode::INSUFFICIENT_STORAGE) + .msg("recording storage below minimum free-space threshold")); + } + let response = ws.on_upgrade(move |ws| { handle_jrec_push( ws, @@ -110,14 +151,26 @@ async fn handle_jrec_push( .instrument(info_span!("jrec", client = %source_addr)) .await; - if let Err(error) = result { - close_handle.server_error("forwarding failure".to_owned()).await; - error!(client = %source_addr, error = format!("{error:#}"), "WebSocket-JREC failure"); - } else { - close_handle.normal_close().await; + match result { + Ok(PushOutcome::Done) => close_handle.normal_close().await, + Ok(PushOutcome::StorageFull) => { + warn!(client = %source_addr, %session_id, "JREC push closed: storage full"); + close_handle.app_close(STORAGE_FULL_CLOSE_CODE).await; + } + Err(error) => { + close_handle.server_error("forwarding failure".to_owned()).await; + error!(client = %source_addr, error = format!("{error:#}"), "WebSocket-JREC failure"); + } } } +/// WebSocket close code sent on `/jrec/push/{id}` when the recording storage volume is full +/// and the stream cannot continue. +/// +/// Codes in 4000-4999 are reserved for private application use per +/// . +const STORAGE_FULL_CLOSE_CODE: u16 = 4010; + /// Deletes a recording stored on this instance #[cfg_attr(feature = "openapi", utoipa::path( delete, diff --git a/devolutions-gateway/src/config.rs b/devolutions-gateway/src/config.rs index 5ca9fc49f..9af73fbae 100644 --- a/devolutions-gateway/src/config.rs +++ b/devolutions-gateway/src/config.rs @@ -188,6 +188,7 @@ pub struct Conf { pub delegation_private_key: Option, pub plugins: Option>, pub recording_path: Utf8PathBuf, + pub min_recording_storage_free_space: Option, pub jrl_file: Utf8PathBuf, pub ngrok: Option, pub verbosity_profile: dto::VerbosityProfile, @@ -911,6 +912,7 @@ impl Conf { delegation_private_key, plugins: conf_file.plugins.clone(), recording_path, + min_recording_storage_free_space: conf_file.min_recording_storage_free_space, jrl_file, ngrok: conf_file.ngrok.clone(), verbosity_profile: conf_file.verbosity_profile.unwrap_or_default(), @@ -1685,6 +1687,13 @@ pub mod dto { #[serde(skip_serializing_if = "Option::is_none")] pub recording_path: Option, + /// Minimum free space (in bytes) on the recording storage volume below which the + /// gateway returns HTTP 507 Insufficient Storage on `/jet/jrec/push/{id}`. Omit to + /// skip this threshold check; the gateway can still return 507 independently when + /// the recording storage is not writable. + #[serde(skip_serializing_if = "Option::is_none")] + pub min_recording_storage_free_space: Option, + /// Ngrok config (closely maps https://ngrok.com/docs/ngrok-agent/config/) #[serde(skip_serializing_if = "Option::is_none")] pub ngrok: Option, @@ -1776,6 +1785,7 @@ pub mod dto { jrl_file: None, plugins: None, recording_path: None, + min_recording_storage_free_space: None, web_app: None, ai_gateway: None, job_queue_database: None, diff --git a/devolutions-gateway/src/recording.rs b/devolutions-gateway/src/recording.rs index 78ba9dee1..f07913868 100644 --- a/devolutions-gateway/src/recording.rs +++ b/devolutions-gateway/src/recording.rs @@ -58,6 +58,19 @@ impl JrecManifest { } } +/// Outcome of a successful push session. +/// +/// `Err` from `ClientPush::run` is reserved for unexpected failures. Conditions that the +/// caller is expected to surface to the client (e.g. disk full) are reported as variants of +/// this enum so the caller can choose an appropriate close frame. +#[derive(Debug)] +pub enum PushOutcome { + /// The recording stream completed normally or was terminated by a shutdown signal. + Done, + /// The underlying file write failed because the recording storage volume is full. + StorageFull, +} + #[derive(TypedBuilder)] pub struct ClientPush { recordings: RecordingMessageSender, @@ -72,7 +85,7 @@ impl ClientPush where S: AsyncRead + AsyncWrite + Unpin + Send + 'static, { - pub async fn run(self) -> anyhow::Result<()> { + pub async fn run(self) -> anyhow::Result { let Self { recordings, claims, @@ -98,7 +111,7 @@ where Err(e) => { warn!(error = format!("{e:#}"), "Unable to start recording"); client_stream.shutdown().await.context("shutdown")?; - return Ok(()); + return Ok(PushOutcome::Done); } }; @@ -144,11 +157,18 @@ where let res = tokio::select! { res = copy_fut => { - res.context("JREC streaming to file").map(|_| ()) + match res { + Ok(_) => Ok(PushOutcome::Done), + Err(e) if is_storage_full(&e) => { + warn!(%session_id, "Recording storage is full; closing push stream"); + Ok(PushOutcome::StorageFull) + } + Err(e) => Err(anyhow::Error::new(e).context("JREC streaming to file")), + } }, _ = shutdown_signal.wait() => { trace!("Received shutdown signal"); - client_stream.shutdown().await.context("shutdown") + client_stream.shutdown().await.context("shutdown").map(|_| PushOutcome::Done) }, }; @@ -167,6 +187,15 @@ where } } +/// Returns `true` if the I/O error indicates the storage volume is full. +/// +/// Uses `io::ErrorKind::StorageFull` (stable since Rust 1.83) which the standard library maps +/// from the platform-specific code: `ENOSPC` on Unix, `ERROR_DISK_FULL` / `ERROR_HANDLE_DISK_FULL` +/// on Windows. +fn is_storage_full(error: &io::Error) -> bool { + matches!(error.kind(), io::ErrorKind::StorageFull) +} + /// A set containing IDs of currently active recordings. /// /// The ID is inserted at the initial recording diff --git a/devolutions-gateway/tests/config.rs b/devolutions-gateway/tests/config.rs index 4cb015e3f..98a751afc 100644 --- a/devolutions-gateway/tests/config.rs +++ b/devolutions-gateway/tests/config.rs @@ -91,6 +91,7 @@ fn hub_sample() -> Sample { jrl_file: None, plugins: None, recording_path: None, + min_recording_storage_free_space: None, job_queue_database: None, traffic_audit_database: None, ngrok: None, @@ -140,6 +141,7 @@ fn legacy_sample() -> Sample { jrl_file: None, plugins: None, recording_path: None, + min_recording_storage_free_space: None, job_queue_database: None, traffic_audit_database: None, ngrok: None, @@ -188,6 +190,7 @@ fn system_store_sample() -> Sample { jrl_file: None, plugins: None, recording_path: None, + min_recording_storage_free_space: None, job_queue_database: None, traffic_audit_database: None, ngrok: None, @@ -261,6 +264,7 @@ fn standalone_custom_auth_sample() -> Sample { jrl_file: None, plugins: None, recording_path: None, + min_recording_storage_free_space: None, job_queue_database: None, traffic_audit_database: None, ngrok: None, @@ -341,6 +345,7 @@ fn standalone_no_auth_sample() -> Sample { jrl_file: None, plugins: None, recording_path: None, + min_recording_storage_free_space: None, job_queue_database: None, traffic_audit_database: None, ngrok: None, @@ -421,6 +426,7 @@ fn proxy_sample() -> Sample { jrl_file: None, plugins: None, recording_path: None, + min_recording_storage_free_space: None, job_queue_database: None, traffic_audit_database: None, ngrok: None,