Skip to content

Commit 71beebb

Browse files
committed
Add capability for reset all sessions of a given channel.
1 parent d12590c commit 71beebb

6 files changed

Lines changed: 92 additions & 2 deletions

File tree

crates/bitpart-cli/src/main.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,18 @@ enum Commands {
111111
device_name: String,
112112
},
113113

114+
/// reset all active chat sessions on a channel
115+
#[command(arg_required_else_help = true)]
116+
ChannelReset {
117+
/// Channel ID
118+
#[arg(short, long)]
119+
id: String,
120+
121+
/// Bot ID
122+
#[arg(short, long)]
123+
bot_id: String,
124+
},
125+
114126
/// delete a bot
115127
#[command(arg_required_else_help = true)]
116128
Delete {
@@ -296,6 +308,17 @@ async fn main() -> Result<()> {
296308
send(&mut sender, &req).await?;
297309
hangup(&mut sender).await?;
298310
}
311+
Commands::ChannelReset { id, bot_id } => {
312+
let req = json!({"message_type": "ResetChannel",
313+
"data" : {
314+
"id": id,
315+
"bot_id": bot_id,
316+
}});
317+
debug!("Request: {:?}", req.to_string());
318+
319+
send(&mut sender, &req).await?;
320+
hangup(&mut sender).await?;
321+
}
299322
Commands::Delete { id } => {
300323
let req = json!({"message_type": "DeleteBot",
301324
"data" : {

crates/bitpart-common/src/error.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,15 @@ use figment;
2020
use futures;
2121
use hex;
2222
use opentelemetry_otlp;
23-
use presage;
23+
use presage::{
24+
self,
25+
libsignal_service::{prelude::InvalidDeviceId, protocol::SignalProtocolError},
26+
};
2427
use presage_store_bitpart::BitpartStoreError;
2528
use prost;
2629
use sea_orm::DbErr;
2730
use serde_json::Error as SerdeError;
28-
use std::{array, io};
31+
use std::{array, io, num::ParseIntError};
2932
use thiserror::Error;
3033
use thiserror_ext::Box;
3134
use tokio;
@@ -78,6 +81,12 @@ pub enum BitpartErrorKind {
7881
ProtocolBuffers(#[from] prost::UnknownEnumValue),
7982
#[error("Bincode error: `{0}`")]
8083
Bincode(#[from] bincode::Error),
84+
#[error("Parse Int error: `{0}`")]
85+
ParseInt(#[from] ParseIntError),
86+
#[error("Invalid DeviceID error: `{0}`")]
87+
InvalidDeviceId(#[from] InvalidDeviceId),
88+
#[error("Signal Protocol error: `{0}`")]
89+
SignalProtocol(#[from] SignalProtocolError),
8190
}
8291

8392
impl<S: std::error::Error> From<presage::Error<S>> for BitpartErrorKind {

crates/bitpart-common/src/socket.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ pub enum SocketMessage<S: Serialize> {
5656
bot_id: String,
5757
device_name: String,
5858
},
59+
ResetChannel {
60+
id: String,
61+
bot_id: String,
62+
},
5963
ChatRequest(Box<Request>),
6064
Response(Response<S>),
6165
Error(Response<S>),

crates/bitpart/src/api.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,26 @@ pub async fn start_channel(channel_id: &str, bot_id: &str, state: &mut ApiState)
448448
Ok(recv.await?)
449449
}
450450

451+
pub async fn reset_channel(channel_id: &str, bot_id: &str, state: &mut ApiState) -> Result<String> {
452+
let (send, recv) = oneshot::channel();
453+
let contents = signal::ChannelMessageContents::ResetSessions {
454+
id: channel_id.to_owned(),
455+
};
456+
let mut data = state.tokens.lock().await;
457+
let token = data
458+
.entry((bot_id.to_owned(), channel_id.to_owned()))
459+
.or_insert(state.parent_token.child_token());
460+
let msg = signal::ChannelMessage {
461+
msg: contents,
462+
db: state.db.clone(),
463+
token: token.clone(),
464+
tracker: state.tracker.clone(),
465+
sender: send,
466+
};
467+
state.manager.send(msg);
468+
Ok(recv.await?)
469+
}
470+
451471
pub async fn read_channel(
452472
id: &str,
453473
bot_id: &str,

crates/bitpart/src/channels/signal.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,9 @@ pub enum ChannelMessageContents {
8181
id: String,
8282
attachments_dir: PathBuf,
8383
},
84+
ResetSessions {
85+
id: String,
86+
},
8487
}
8588

8689
pub struct ChannelMessage {
@@ -235,6 +238,31 @@ async fn process_channel_message(msg: ChannelMessage) -> Result<()> {
235238
.map_err(BitpartErrorKind::Signal)?)
236239
}
237240
}
241+
ChannelMessageContents::ResetSessions { id } => {
242+
let store = BitpartStore::open(&id, &db, OnNewIdentity::Trust).await?;
243+
if let Ok(mut manager) = Manager::load_registered(store).await {
244+
let sessions: Vec<(String, Vec<u8>)> = manager.store().get_all("sessions").await?;
245+
for (address, _) in sessions {
246+
let timestamp = std::time::SystemTime::now()
247+
.duration_since(std::time::UNIX_EPOCH)
248+
.expect("Time went backwards")
249+
.as_millis() as u64;
250+
let addr: Vec<&str> = address.split('.').collect();
251+
let uuid = uuid::Uuid::parse_str(addr[0])?;
252+
manager
253+
.send_session_reset(&ServiceId::Aci(uuid.into()), timestamp)
254+
.await?
255+
}
256+
Ok(sender
257+
.send("".to_owned())
258+
.map_err(BitpartErrorKind::Signal)?)
259+
} else {
260+
warn!("Skipping startup of unregistered channel");
261+
Ok(sender
262+
.send("".to_owned())
263+
.map_err(BitpartErrorKind::Signal)?)
264+
}
265+
}
238266
}
239267
}
240268

crates/bitpart/src/socket.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,12 @@ async fn process_message(
159159
Err(err) => wrap_error("ReadChannel", &err.to_string()),
160160
}
161161
}
162+
SocketMessage::ResetChannel { id, bot_id } => {
163+
match api::reset_channel(&id, &bot_id, state).await {
164+
Ok(res) => wrap_response("ReadChannel", &res),
165+
Err(err) => wrap_error("ReadChannel", &err.to_string()),
166+
}
167+
}
162168
SocketMessage::ListChannels(options) => {
163169
if let Some(paginate) = options {
164170
match api::list_channels(paginate.limit, paginate.offset, state).await {

0 commit comments

Comments
 (0)