Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
name: CI Validation

on:
workflow_dispatch:
pull_request:
push:
branches:
- main
- master

jobs:
rust-validate:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4

- name: Install Rust toolchain
uses: dtolnay/rust-toolchain@stable
with:
components: rustfmt, clippy

- name: Cache cargo registry and target
uses: actions/cache@v4
with:
path: |
~/.cargo/registry
~/.cargo/git
target
key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}
restore-keys: |
${{ runner.os }}-cargo-

- name: Format check
run: cargo fmt --all -- --check

- name: Build
run: cargo build --locked

- name: Clippy
run: cargo clippy --bin hulypulse --all-features -- -D warnings

- name: Unit tests
run: cargo test --bin hulypulse --locked
26 changes: 25 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "hulypulse"
version = "0.4.1"
version = "0.4.2"
edition = "2024"

[dependencies]
Expand Down Expand Up @@ -30,7 +30,7 @@ hulyrs = { git = "https://github.com/hcengineering/hulyrs.git", features = [ "ac
secrecy = { version = "0.10.3", optional = true }

#redis
redis = { version = "=0.32.5", features = ["aio", "tokio-comp", "sentinel"] }
redis = { version = "=0.32.5", features = ["aio", "tokio-comp", "sentinel", "connection-manager"] }

[[bin]]
name = "hulypulse"
Expand Down
23 changes: 23 additions & 0 deletions client/off/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,18 @@ export class HulypulseClient implements Disposable {
// this.handleMyDataChanged(get(myData), true)
}

private static isConnectionLikeError (err: string): boolean {
const s = err.toLowerCase()
return (
s.includes('broken pipe') ||
s.includes('connection reset') ||
s.includes('connection refused') ||
s.includes('connection aborted') ||
s.includes('unexpected eof') ||
s.includes('io error')
)
}

private handleMessage (data: string): void {
if (data === 'pong') {
clearTimeout(this.pingTimeout)
Expand All @@ -192,6 +204,17 @@ export class HulypulseClient implements Disposable {
try {
const message = JSON.parse(data); // as IncomingMessage
console.log('Received message', message);
if (
typeof message === 'object' &&
message !== null &&
'error' in message &&
typeof (message as { error: unknown }).error === 'string' &&
HulypulseClient.isConnectionLikeError((message as { error: string }).error)
) {
console.warn('Pulse server reported connection-like error; reconnecting')
this.reconnect()
return
}
// const message = JSON.parse(data) as IncomingMessage
// if (message.type === 'update' && message.presence !== undefined) {
// onPersonUpdate(message.id, message.presence ?? [])
Expand Down
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ pub static CONFIG: LazyLock<Config> = LazyLock::new(|| {
match settings {
Ok(settings) => settings,
Err(error) => {
eprintln!("configuration error: {}", error);
eprintln!("configuration error: {error}");
std::process::exit(1);
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::memory::{
MemoryBackend, memory_delete, memory_info, memory_list, memory_read, memory_save,
};
use crate::redis::{redis_delete, redis_info, redis_list, redis_read, redis_save};
use redis::aio::MultiplexedConnection;
use redis::aio::ConnectionManager;
use serde::Serialize;
use tokio::sync::RwLock;

Expand Down Expand Up @@ -80,7 +80,7 @@ pub fn deprecated_symbol_error(s: &str) -> DbResult<()> {

#[derive(Clone)]
enum DbBackend {
Redis(MultiplexedConnection),
Redis(ConnectionManager),
Memory {
db: MemoryBackend,
hub: Arc<RwLock<HubState>>,
Expand All @@ -93,7 +93,7 @@ pub struct Db {
}

impl Db {
pub fn new_redis(db: MultiplexedConnection) -> Self {
pub fn new_redis(db: ConnectionManager) -> Self {
Self {
backend: DbBackend::Redis(db),
}
Expand Down
2 changes: 1 addition & 1 deletion src/handlers_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub fn map_redis_error(err: impl std::fmt::Display) -> Error {
.nth(1)
.unwrap_or(msg.as_str());
if let Some((code, text)) = detail.split_once(": ") {
let text = format!("{} {}", code, text);
let text = format!("{code} {text}");
return match code {
"400" => actix_web::error::ErrorBadRequest(text),
"404" => actix_web::error::ErrorNotFound(text),
Expand Down
21 changes: 9 additions & 12 deletions src/handlers_ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.
//

use actix_ws;
use futures_util::StreamExt;

use futures::future::{AbortHandle, Abortable};
Expand Down Expand Up @@ -187,7 +186,7 @@ async fn handle_command(
tracing::debug!("PERSONAL from {} to {}", &client_name, &to);
let payload =
json!({ "personal": client_name, "correlation": correlation, "data": data });
if !send_to_name(&hub_state, &to, payload).await {
if !send_to_name(hub_state, &to, payload).await {
tracing::debug!("PERSONAL send from [{}] to [{}] failed", &client_name, &to);
result_err("failed", &correlation, ws).await;
}
Expand All @@ -201,7 +200,7 @@ async fn handle_command(
} => {
tracing::debug!("ANSWER from {} to {}", &client_name, &to);
let payload = json!({ "correlation": correlation, "data": data });
if !send_to_name(&hub_state, &to, payload).await {
if !send_to_name(hub_state, &to, payload).await {
tracing::debug!("PERSONAL send_to failed: no such session {}", to);
}
}
Expand Down Expand Up @@ -233,10 +232,8 @@ async fn handle_command(
// TTL logic
let real_ttl = if let Some(secs) = ttl {
Some(Ttl::Sec(secs as usize))
} else if let Some(timestamp) = expires_at {
Some(Ttl::At(timestamp))
} else {
None
expires_at.map(Ttl::At)
};

// SaveMode logic
Expand Down Expand Up @@ -447,11 +444,11 @@ pub async fn handler(
_ => "",
};

if let Some(ref claim) = claims {
if !test_rego_claims(claim, cmd.as_ref(), key) {
let _ = session.text("Unauthorized: Rego policy").await;
break;
}
if let Some(ref claim) = claims
&& !test_rego_claims(claim, cmd.as_ref(), key)
{
let _ = session.text("Unauthorized: Rego policy").await;
break;
}
}

Expand All @@ -470,7 +467,7 @@ pub async fn handler(
}

Err(err) => {
let _ = session.text(format!("Invalid JSON: {}", err)).await;
let _ = session.text(format!("Invalid JSON: {err}")).await;
}
},

Expand Down
13 changes: 5 additions & 8 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,15 @@ async fn check_workspace(
let workspace = Uuid::parse_str(&request.extract::<Path<String>>().await?);
let claims = request.extensions().get::<Claims>().cloned().unwrap();

if claims.is_system() || Ok(claims.workspace.clone()) == workspace.clone().map(Some) {
if claims.is_system() || Ok(claims.workspace) == workspace.clone().map(Some) {
next.call(request).await
} else {
warn!(
expected = ?claims.workspace,
actual = ?workspace,
"Unauthorized request, workspace mismatch"
);
Err(actix_web::error::ErrorUnauthorized("Unauthorized").into())
Err(actix_web::error::ErrorUnauthorized("Unauthorized"))
}
}

Expand All @@ -145,9 +145,9 @@ async fn main() -> anyhow::Result<()> {
BackendType::Redis => {
let redis_client = redis::client().await?;
let db_connection = redis_client
.get_multiplexed_async_connection()
.get_connection_manager()
.await
.map_err(|e| {
.inspect_err(|_e| {
tracing::error!(
"REDIS not found: {:?}",
&CONFIG
Expand All @@ -157,14 +157,11 @@ async fn main() -> anyhow::Result<()> {
.collect::<Vec<_>>()
.join(", ")
);
e
})?;
tokio::spawn({
let hub_state = hub_state.clone();
async move {
if let Err(err) = crate::redis::receiver(redis_client, hub_state).await {
tracing::error!("Redis receiver stopped: {err}");
}
crate::redis::receiver(redis_client, hub_state).await;
}
});
Db::new_redis(db_connection)
Expand Down
20 changes: 6 additions & 14 deletions src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,7 @@ pub async fn memory_list(backend: &MemoryBackend, key_prefix: &str) -> DbResult<
continue;
}

if k.strip_prefix(key_prefix)
.map_or(false, |s| s.contains('$'))
{
if k.strip_prefix(key_prefix).is_some_and(|s| s.contains('$')) {
continue;
}

Expand All @@ -138,7 +136,7 @@ pub async fn memory_info(backend: &MemoryBackend) -> DbResult<String> {
let map = backend.inner.read().await;
let keys = map.len();
let memory: usize = map.values().map(|v| v.data.len()).sum();
Ok(format!("{} keys, {} bytes", keys, memory))
Ok(format!("{keys} keys, {memory} bytes"))
}

/// memory_read(&backend, "key")
Expand Down Expand Up @@ -215,7 +213,7 @@ pub async fn memory_save<V: AsRef<[u8]>>(
if max_size != 0 && value.len() > max_size {
return error(
400,
format!("Value in memory mode must be less than {} bytes", max_size),
format!("Value in memory mode must be less than {max_size} bytes"),
);
}

Expand Down Expand Up @@ -273,10 +271,7 @@ pub async fn memory_save<V: AsRef<[u8]>>(
if &actual_md5 != expected_md5 {
return error(
412,
format!(
"md5 mismatch, current: {}, expected: {}",
actual_md5, expected_md5
),
format!("md5 mismatch, current: {actual_md5}, expected: {expected_md5}"),
);
}
*existing = Entry {
Expand Down Expand Up @@ -304,9 +299,7 @@ pub async fn memory_delete(
let mode = mode.unwrap_or(SaveMode::Upsert);

match mode {
SaveMode::Insert => {
return error(412, "Insert mode is not supported for delete");
}
SaveMode::Insert => error(412, "Insert mode is not supported for delete"),
SaveMode::Update | SaveMode::Upsert => {
let existed = map.remove(key).is_some();
Ok(existed)
Expand All @@ -320,8 +313,7 @@ pub async fn memory_delete(
return error(
412,
format!(
"md5 mismatch, current: {}, expected: {}",
actual_md5, expected_md5
"md5 mismatch, current: {actual_md5}, expected: {expected_md5}"
),
);
}
Expand Down
Loading
Loading