diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..c5dc5a0 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,44 @@ +name: CI Validation + +on: + workflow_dispatch: + pull_request: + push: + branches: + - main + - master + +jobs: + rust-validate: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Install Rust toolchain + uses: dtolnay/rust-toolchain@stable + with: + components: rustfmt, clippy + + - name: Cache cargo registry and target + uses: actions/cache@v4 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} + restore-keys: | + ${{ runner.os }}-cargo- + + - name: Format check + run: cargo fmt --all -- --check + + - name: Build + run: cargo build --locked + + - name: Clippy + run: cargo clippy --bin hulypulse --all-features -- -D warnings + + - name: Unit tests + run: cargo test --bin hulypulse --locked diff --git a/Cargo.lock b/Cargo.lock index 5264488..d93f334 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -309,6 +309,12 @@ version = "1.0.100" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "async-trait" version = "0.1.89" @@ -348,6 +354,15 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "backon" +version = "1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "592277618714fbcecda9a02ba7a8781f319d26532a88553bbacc77ba5d2b3a8d" +dependencies = [ + "fastrand", +] + [[package]] name = "backtrace" version = "0.3.76" @@ -825,6 +840,12 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "fastrand" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" + [[package]] name = "find-msvc-tools" version = "0.1.2" @@ -1175,7 +1196,7 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" [[package]] name = "hulypulse" -version = "0.4.1" +version = "0.4.2" dependencies = [ "actix-cors", "actix-web", @@ -2107,9 +2128,12 @@ version = "0.32.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7cd3650deebc68526b304898b192fa4102a4ef0b9ada24da096559cb60e0eef8" dependencies = [ + "arc-swap", + "backon", "bytes", "cfg-if", "combine", + "futures-channel", "futures-util", "itoa", "num-bigint", diff --git a/Cargo.toml b/Cargo.toml index 50f1daf..51ea626 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "hulypulse" -version = "0.4.1" +version = "0.4.2" edition = "2024" [dependencies] @@ -30,7 +30,7 @@ hulyrs = { git = "https://github.com/hcengineering/hulyrs.git", features = [ "ac secrecy = { version = "0.10.3", optional = true } #redis -redis = { version = "=0.32.5", features = ["aio", "tokio-comp", "sentinel"] } +redis = { version = "=0.32.5", features = ["aio", "tokio-comp", "sentinel", "connection-manager"] } [[bin]] name = "hulypulse" diff --git a/client/off/client.ts b/client/off/client.ts index 5908dfe..9992863 100644 --- a/client/off/client.ts +++ b/client/off/client.ts @@ -183,6 +183,18 @@ export class HulypulseClient implements Disposable { // this.handleMyDataChanged(get(myData), true) } + private static isConnectionLikeError (err: string): boolean { + const s = err.toLowerCase() + return ( + s.includes('broken pipe') || + s.includes('connection reset') || + s.includes('connection refused') || + s.includes('connection aborted') || + s.includes('unexpected eof') || + s.includes('io error') + ) + } + private handleMessage (data: string): void { if (data === 'pong') { clearTimeout(this.pingTimeout) @@ -192,6 +204,17 @@ export class HulypulseClient implements Disposable { try { const message = JSON.parse(data); // as IncomingMessage console.log('Received message', message); + if ( + typeof message === 'object' && + message !== null && + 'error' in message && + typeof (message as { error: unknown }).error === 'string' && + HulypulseClient.isConnectionLikeError((message as { error: string }).error) + ) { + console.warn('Pulse server reported connection-like error; reconnecting') + this.reconnect() + return + } // const message = JSON.parse(data) as IncomingMessage // if (message.type === 'update' && message.presence !== undefined) { // onPersonUpdate(message.id, message.presence ?? []) diff --git a/src/config.rs b/src/config.rs index e5e7f07..712d6b9 100644 --- a/src/config.rs +++ b/src/config.rs @@ -94,7 +94,7 @@ pub static CONFIG: LazyLock = LazyLock::new(|| { match settings { Ok(settings) => settings, Err(error) => { - eprintln!("configuration error: {}", error); + eprintln!("configuration error: {error}"); std::process::exit(1); } } diff --git a/src/db.rs b/src/db.rs index 006728d..7d2ce38 100644 --- a/src/db.rs +++ b/src/db.rs @@ -5,7 +5,7 @@ use crate::memory::{ MemoryBackend, memory_delete, memory_info, memory_list, memory_read, memory_save, }; use crate::redis::{redis_delete, redis_info, redis_list, redis_read, redis_save}; -use redis::aio::MultiplexedConnection; +use redis::aio::ConnectionManager; use serde::Serialize; use tokio::sync::RwLock; @@ -80,7 +80,7 @@ pub fn deprecated_symbol_error(s: &str) -> DbResult<()> { #[derive(Clone)] enum DbBackend { - Redis(MultiplexedConnection), + Redis(ConnectionManager), Memory { db: MemoryBackend, hub: Arc>, @@ -93,7 +93,7 @@ pub struct Db { } impl Db { - pub fn new_redis(db: MultiplexedConnection) -> Self { + pub fn new_redis(db: ConnectionManager) -> Self { Self { backend: DbBackend::Redis(db), } diff --git a/src/handlers_http.rs b/src/handlers_http.rs index ee6a8a9..9e1fe42 100644 --- a/src/handlers_http.rs +++ b/src/handlers_http.rs @@ -40,7 +40,7 @@ pub fn map_redis_error(err: impl std::fmt::Display) -> Error { .nth(1) .unwrap_or(msg.as_str()); if let Some((code, text)) = detail.split_once(": ") { - let text = format!("{} {}", code, text); + let text = format!("{code} {text}"); return match code { "400" => actix_web::error::ErrorBadRequest(text), "404" => actix_web::error::ErrorNotFound(text), diff --git a/src/handlers_ws.rs b/src/handlers_ws.rs index b836f72..af6be72 100644 --- a/src/handlers_ws.rs +++ b/src/handlers_ws.rs @@ -13,7 +13,6 @@ // limitations under the License. // -use actix_ws; use futures_util::StreamExt; use futures::future::{AbortHandle, Abortable}; @@ -187,7 +186,7 @@ async fn handle_command( tracing::debug!("PERSONAL from {} to {}", &client_name, &to); let payload = json!({ "personal": client_name, "correlation": correlation, "data": data }); - if !send_to_name(&hub_state, &to, payload).await { + if !send_to_name(hub_state, &to, payload).await { tracing::debug!("PERSONAL send from [{}] to [{}] failed", &client_name, &to); result_err("failed", &correlation, ws).await; } @@ -201,7 +200,7 @@ async fn handle_command( } => { tracing::debug!("ANSWER from {} to {}", &client_name, &to); let payload = json!({ "correlation": correlation, "data": data }); - if !send_to_name(&hub_state, &to, payload).await { + if !send_to_name(hub_state, &to, payload).await { tracing::debug!("PERSONAL send_to failed: no such session {}", to); } } @@ -233,10 +232,8 @@ async fn handle_command( // TTL logic let real_ttl = if let Some(secs) = ttl { Some(Ttl::Sec(secs as usize)) - } else if let Some(timestamp) = expires_at { - Some(Ttl::At(timestamp)) } else { - None + expires_at.map(Ttl::At) }; // SaveMode logic @@ -447,11 +444,11 @@ pub async fn handler( _ => "", }; - if let Some(ref claim) = claims { - if !test_rego_claims(claim, cmd.as_ref(), key) { - let _ = session.text("Unauthorized: Rego policy").await; - break; - } + if let Some(ref claim) = claims + && !test_rego_claims(claim, cmd.as_ref(), key) + { + let _ = session.text("Unauthorized: Rego policy").await; + break; } } @@ -470,7 +467,7 @@ pub async fn handler( } Err(err) => { - let _ = session.text(format!("Invalid JSON: {}", err)).await; + let _ = session.text(format!("Invalid JSON: {err}")).await; } }, diff --git a/src/main.rs b/src/main.rs index 083d936..2c2afb0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -117,7 +117,7 @@ async fn check_workspace( let workspace = Uuid::parse_str(&request.extract::>().await?); let claims = request.extensions().get::().cloned().unwrap(); - if claims.is_system() || Ok(claims.workspace.clone()) == workspace.clone().map(Some) { + if claims.is_system() || Ok(claims.workspace) == workspace.clone().map(Some) { next.call(request).await } else { warn!( @@ -125,7 +125,7 @@ async fn check_workspace( actual = ?workspace, "Unauthorized request, workspace mismatch" ); - Err(actix_web::error::ErrorUnauthorized("Unauthorized").into()) + Err(actix_web::error::ErrorUnauthorized("Unauthorized")) } } @@ -145,9 +145,9 @@ async fn main() -> anyhow::Result<()> { BackendType::Redis => { let redis_client = redis::client().await?; let db_connection = redis_client - .get_multiplexed_async_connection() + .get_connection_manager() .await - .map_err(|e| { + .inspect_err(|_e| { tracing::error!( "REDIS not found: {:?}", &CONFIG @@ -157,14 +157,11 @@ async fn main() -> anyhow::Result<()> { .collect::>() .join(", ") ); - e })?; tokio::spawn({ let hub_state = hub_state.clone(); async move { - if let Err(err) = crate::redis::receiver(redis_client, hub_state).await { - tracing::error!("Redis receiver stopped: {err}"); - } + crate::redis::receiver(redis_client, hub_state).await; } }); Db::new_redis(db_connection) diff --git a/src/memory.rs b/src/memory.rs index 5003f2a..70d6fe5 100644 --- a/src/memory.rs +++ b/src/memory.rs @@ -110,9 +110,7 @@ pub async fn memory_list(backend: &MemoryBackend, key_prefix: &str) -> DbResult< continue; } - if k.strip_prefix(key_prefix) - .map_or(false, |s| s.contains('$')) - { + if k.strip_prefix(key_prefix).is_some_and(|s| s.contains('$')) { continue; } @@ -138,7 +136,7 @@ pub async fn memory_info(backend: &MemoryBackend) -> DbResult { let map = backend.inner.read().await; let keys = map.len(); let memory: usize = map.values().map(|v| v.data.len()).sum(); - Ok(format!("{} keys, {} bytes", keys, memory)) + Ok(format!("{keys} keys, {memory} bytes")) } /// memory_read(&backend, "key") @@ -215,7 +213,7 @@ pub async fn memory_save>( if max_size != 0 && value.len() > max_size { return error( 400, - format!("Value in memory mode must be less than {} bytes", max_size), + format!("Value in memory mode must be less than {max_size} bytes"), ); } @@ -273,10 +271,7 @@ pub async fn memory_save>( if &actual_md5 != expected_md5 { return error( 412, - format!( - "md5 mismatch, current: {}, expected: {}", - actual_md5, expected_md5 - ), + format!("md5 mismatch, current: {actual_md5}, expected: {expected_md5}"), ); } *existing = Entry { @@ -304,9 +299,7 @@ pub async fn memory_delete( let mode = mode.unwrap_or(SaveMode::Upsert); match mode { - SaveMode::Insert => { - return error(412, "Insert mode is not supported for delete"); - } + SaveMode::Insert => error(412, "Insert mode is not supported for delete"), SaveMode::Update | SaveMode::Upsert => { let existed = map.remove(key).is_some(); Ok(existed) @@ -320,8 +313,7 @@ pub async fn memory_delete( return error( 412, format!( - "md5 mismatch, current: {}, expected: {}", - actual_md5, expected_md5 + "md5 mismatch, current: {actual_md5}, expected: {expected_md5}" ), ); } diff --git a/src/redis.rs b/src/redis.rs index 4cb83fc..1c1c075 100644 --- a/src/redis.rs +++ b/src/redis.rs @@ -15,11 +15,12 @@ use std::{ sync::Arc, - time::{SystemTime, UNIX_EPOCH}, + time::{Duration, SystemTime, UNIX_EPOCH}, }; use ::redis::Msg; use tokio::sync::RwLock; +use tokio::time::sleep; use tokio_stream::StreamExt; use tracing::*; @@ -31,7 +32,7 @@ use crate::{ use redis::{ Client, ConnectionInfo, ProtocolVersion, RedisConnectionInfo, ToRedisArgs, - aio::MultiplexedConnection, + aio::ConnectionManager, }; // use serde::Serialize; @@ -41,7 +42,7 @@ static MAX_LOOP_COUNT: usize = 1000; // to avoid infinite loops pub async fn push_event( hub_state: &Arc>, - redis: &mut MultiplexedConnection, + redis: &mut ConnectionManager, ev: RedisEvent, ) { // Value only for Set @@ -61,7 +62,7 @@ pub async fn push_event( } /// redis_info(&connection) -pub async fn redis_info(conn: &mut MultiplexedConnection) -> DbResult { +pub async fn redis_info(conn: &mut ConnectionManager) -> DbResult { let info: String = redis::cmd("INFO").query_async(conn).await?; let mut redis_keys: Option = None; @@ -70,16 +71,16 @@ pub async fn redis_info(conn: &mut MultiplexedConnection) -> DbResult { for line in info.lines() { if line.starts_with("db0:") { // parsing: db0:keys=152,expires=10,avg_ttl=456789 - if let Some(keys_part) = line.split(',').find(|s| s.starts_with("keys=")) { - if let Some(val) = keys_part.strip_prefix("keys=") { - redis_keys = val.parse::().ok(); - } + if let Some(keys_part) = line.split(',').find(|s| s.starts_with("keys=")) + && let Some(val) = keys_part.strip_prefix("keys=") + { + redis_keys = val.parse::().ok(); } } - if line.starts_with("used_memory:") { - if let Some(val) = line.strip_prefix("used_memory:") { - redis_bytes = val.parse::().ok(); - } + if line.starts_with("used_memory:") + && let Some(val) = line.strip_prefix("used_memory:") + { + redis_bytes = val.parse::().ok(); } } @@ -91,7 +92,7 @@ pub async fn redis_info(conn: &mut MultiplexedConnection) -> DbResult { } /// redis_list(&connection,prefix) -pub async fn redis_list(conn: &mut MultiplexedConnection, key: &str) -> DbResult> { +pub async fn redis_list(conn: &mut ConnectionManager, key: &str) -> DbResult> { deprecated_symbol_error(key)?; if !key.ends_with('/') { return error(412, "Key must end with slash"); @@ -111,7 +112,7 @@ pub async fn redis_list(conn: &mut MultiplexedConnection, key: &str) -> DbResult for k in keys { // Check for $-security path - if k.strip_prefix(key).map_or(false, |s| s.contains('$')) { + if k.strip_prefix(key).is_some_and(|s| s.contains('$')) { continue; } @@ -143,7 +144,7 @@ pub async fn redis_list(conn: &mut MultiplexedConnection, key: &str) -> DbResult } /// redis_read(&connection,key) -pub async fn redis_read(conn: &mut MultiplexedConnection, key: &str) -> DbResult> { +pub async fn redis_read(conn: &mut ConnectionManager, key: &str) -> DbResult> { deprecated_symbol_error(key)?; if key.ends_with('/') { @@ -174,13 +175,13 @@ pub async fn redis_read(conn: &mut MultiplexedConnection, key: &str) -> DbResult /// redis_save(&connection,key,value,[ttl?],[mode?]) pub async fn redis_save( - conn: &mut MultiplexedConnection, + conn: &mut ConnectionManager, key: &str, value: T, ttl: Option, mode: Option, ) -> DbResult<()> { - deprecated_symbol_error(&key)?; + deprecated_symbol_error(key)?; if key.ends_with('/') { return error(412, "Key must not end with a slash"); @@ -191,7 +192,7 @@ pub async fn redis_save( if max_size != 0 && value.to_redis_args().iter().map(|a| a.len()).sum::() > max_size { return error( 400, - format!("Value in memory mode must be less than {} bytes", max_size), + format!("Value in memory mode must be less than {max_size} bytes"), ); } @@ -262,10 +263,7 @@ pub async fn redis_save( let _: () = redis::cmd("UNWATCH").query_async(conn).await?; return error( 412, - format!( - "md5 mismatch, current: {}, expected: {}", - actual_md5, expected_md5 - ), + format!("md5 mismatch, current: {actual_md5}, expected: {expected_md5}"), ); } @@ -297,7 +295,7 @@ pub async fn redis_save( /// redis_delete(&connection,key) pub async fn redis_delete( - conn: &mut MultiplexedConnection, + conn: &mut ConnectionManager, key: &str, mode: Option, ) -> DbResult { @@ -312,7 +310,7 @@ pub async fn redis_delete( match mode { SaveMode::Update | SaveMode::Upsert => { let deleted: i32 = redis::cmd("DEL").arg(key).query_async(conn).await?; - return Ok(deleted > 0); + Ok(deleted > 0) } SaveMode::Equal(ref expected_md5) => { @@ -335,10 +333,7 @@ pub async fn redis_delete( let _: () = redis::cmd("UNWATCH").query_async(conn).await?; return error( 412, - format!( - "md5 mismatch, current: {}, expected: {}", - actual_md5, expected_md5 - ), + format!("md5 mismatch, current: {actual_md5}, expected: {expected_md5}"), ); } @@ -358,9 +353,7 @@ pub async fn redis_delete( } } - SaveMode::Insert => { - return error(412, "Insert mode is not supported for delete"); - } + SaveMode::Insert => error(412, "Insert mode is not supported for delete"), } } @@ -399,43 +392,80 @@ impl TryFrom for RedisEvent { } } -pub async fn receiver( - redis_client: Client, - hub_state: Arc>, -) -> anyhow::Result<()> { - let mut redis = redis_client.get_multiplexed_async_connection().await?; - let mut pubsub = redis_client.get_async_pubsub().await?; - - let _: String = ::redis::cmd("CONFIG") - .arg("SET") - .arg("notify-keyspace-events") - .arg("E$gx") - .query_async(&mut redis) - .await?; - - for pattern in [ - "__keyevent@*__:set", - "__keyevent@*__:del", - "__keyevent@*__:unlink", - "__keyevent@*__:expired", - ] { - pubsub.psubscribe(pattern).await?; - } +pub async fn receiver(redis_client: Client, hub_state: Arc>) { + let mut backoff_secs = 1_u64; - let mut messages = pubsub.on_message(); + 'subscriber: loop { + let cmd_conn = match redis_client.get_connection_manager().await { + Ok(c) => c, + Err(e) => { + error!("Redis connection manager (keyspace subscriber): {e}"); + sleep(Duration::from_secs(backoff_secs)).await; + backoff_secs = (backoff_secs * 2).min(60); + continue; + } + }; - while let Some(message) = messages.next().await { - match RedisEvent::try_from(message) { - Ok(ev) => { - push_event(&hub_state, &mut redis, ev).await; + { + let mut c = cmd_conn.clone(); + if let Err(e) = ::redis::cmd("CONFIG") + .arg("SET") + .arg("notify-keyspace-events") + .arg("E$gx") + .query_async::(&mut c) + .await + { + error!("Redis CONFIG SET notify-keyspace-events failed: {e}"); + sleep(Duration::from_secs(backoff_secs)).await; + backoff_secs = (backoff_secs * 2).min(60); + continue; } + } + + let mut pubsub = match redis_client.get_async_pubsub().await { + Ok(p) => p, Err(e) => { - warn!("invalid redis message: {e}"); + error!("Redis pub/sub connect failed: {e}"); + sleep(Duration::from_secs(backoff_secs)).await; + backoff_secs = (backoff_secs * 2).min(60); + continue; + } + }; + + for pattern in [ + "__keyevent@*__:set", + "__keyevent@*__:del", + "__keyevent@*__:unlink", + "__keyevent@*__:expired", + ] { + if let Err(e) = pubsub.psubscribe(pattern).await { + error!("Redis PSUBSCRIBE {pattern} failed: {e}"); + sleep(Duration::from_secs(backoff_secs)).await; + backoff_secs = (backoff_secs * 2).min(60); + continue 'subscriber; } } - } - Ok(()) + info!("Redis keyspace subscriber connected"); + backoff_secs = 1; + + let mut messages = pubsub.on_message(); + while let Some(message) = messages.next().await { + match RedisEvent::try_from(message) { + Ok(ev) => { + let mut c = cmd_conn.clone(); + push_event(&hub_state, &mut c, ev).await; + } + Err(e) => { + warn!("invalid redis message: {e}"); + } + } + } + + warn!("Redis keyspace message stream ended; reconnecting after {backoff_secs}s"); + sleep(Duration::from_secs(backoff_secs)).await; + backoff_secs = (backoff_secs * 2).min(60); + } } /// redis_connect() diff --git a/src/workspace_owner.rs b/src/workspace_owner.rs index 794dbef..14dc545 100644 --- a/src/workspace_owner.rs +++ b/src/workspace_owner.rs @@ -58,7 +58,7 @@ pub fn check_workspace_core(claims_opt: Option, key: &str) -> Result<(), } pub fn test_rego_claims(claim: &Claims, command: &str, key: &str) -> bool { - let data = serde_json::to_value(&claim).unwrap_or_default(); + let data = serde_json::to_value(claim).unwrap_or_default(); let mut rego = REGORUS_ENGINE.clone(); rego.set_input(regorus::Value::from(json!({ @@ -88,13 +88,13 @@ pub static POLICY_TEXT: LazyLock = LazyLock::new(|| { if !path.exists() { panic!("Policy file not found: {}", path.display()); } - let policy_text = match fs::read_to_string(path) { - Ok(text) => format!("package main\n\n{}", text), + + match fs::read_to_string(path) { + Ok(text) => format!("package main\n\n{text}"), Err(e) => { panic!("Failed to read policy file {}: {}", path.display(), e); } - }; - policy_text + } }); pub static REGORUS_ENGINE: LazyLock = LazyLock::new(|| {