Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 105 additions & 32 deletions src/device/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,11 +441,20 @@ impl DeviceManager {

match &device_entry.status {
DeviceStatus::ContinuousMode => {
DeviceManager::check_continuous_mode_device(device_entry, receiver, device.id)
.await;
if !DeviceManager::check_continuous_mode_device(
device_entry,
receiver,
device.id,
)
.await
{
Self::recover(self, device.id, DeviceStatus::ContinuousMode).await;
}
}
DeviceStatus::Running => {
DeviceManager::check_running_device(device_entry, device.id).await;
if !DeviceManager::check_running_device(device_entry, device.id).await {
Self::recover(self, device.id, DeviceStatus::Running).await;
}
}
status => {
error!("Device Manager found an unhandled device status, status: {status:?}. Device id: {:?}", device.id);
Expand All @@ -455,21 +464,82 @@ impl DeviceManager {
}
}

async fn recover(manager: &mut DeviceManager, device_id: Uuid, status: DeviceStatus) {
const MAX_RECOVERY_ATTEMPTS: u32 = 2;
for attempt in 1..=MAX_RECOVERY_ATTEMPTS {
warn!(
"Attempting device recovery ({}/{}). Device id: {:?}",
attempt, MAX_RECOVERY_ATTEMPTS, device_id
);
let recovered = match status {
DeviceStatus::Running => manager.recover_running_device(device_id).await.is_ok(),
DeviceStatus::ContinuousMode => manager
.recover_continuous_mode_device(device_id)
.await
.is_ok(),
_ => false,
};
if recovered {
info!("Device recovered successfully. Device id: {:?}", device_id);
return;
}
if attempt < MAX_RECOVERY_ATTEMPTS {
sleep(Duration::from_millis(500)).await;
}
}
error!(
"Device recovery failed after {} attempts, marking with error. Device id: {:?}",
MAX_RECOVERY_ATTEMPTS, device_id
);
if let Some(device) = manager.device.get_mut(&device_id) {
device.status = DeviceStatus::Error;
}
}

async fn recover_running_device(&mut self, device_id: Uuid) -> Result<Answer, ManagerError> {
self.check_device_status(device_id, &[DeviceStatus::Running])?;
let (source, device_type) = {
let device = self.get_mut_device(device_id)?;
if let Some(actor) = device.actor.take() {
actor.abort();
}
device.handler = None;
device.status = DeviceStatus::Available;
(device.source.clone(), device.device_type.clone())
};

let device_info =
Box::pin(self.create_device_helper(device_id, source, device_type)).await?;
Ok(Answer::DeviceInfo(vec![device_info]))
}

async fn recover_continuous_mode_device(
&mut self,
device_id: Uuid,
) -> Result<Answer, ManagerError> {
self.continuous_mode_off(device_id).await?;
self.continuous_mode(device_id).await
}

async fn check_continuous_mode_device(
device_entry: &mut Device,
mut receiver: Receiver<ProtocolMessage>,
device_id: Uuid,
) {
) -> bool {
let Some(broadcast) = &device_entry.broadcast else {
error!("Device actor broadcast service finished, marking device with error. Device id: {:?}", device_id);
device_entry.status = DeviceStatus::Error;
return;
error!(
"Device actor broadcast service finished. Device id: {:?}",
device_id
);
return false;
};

if broadcast.is_finished() {
error!("Device actor broadcast service finished, marking device with error. Device id: {:?}", device_id);
device_entry.status = DeviceStatus::Error;
return;
error!(
"Device actor broadcast service finished. Device id: {:?}",
device_id
);
return false;
}

match &device_entry.device_type {
Expand All @@ -478,34 +548,39 @@ impl DeviceManager {
.await
{
Err(_err) => {
error!(
"Device connection timeout, marking with error. Device id: {device_id:?}",
);
device_entry.status = DeviceStatus::Error;
error!("Device connection timeout. Device id: {device_id:?}",);
false
}
Ok(Err(err)) => match err {
tokio::sync::broadcast::error::RecvError::Lagged(_) => error!(
"Device connection error. Device id: {device_id:?}, Error: {err:?}"
),
tokio::sync::broadcast::error::RecvError::Closed => {
error!("Device connection error, marking with error. Device id: {device_id:?}, Error: {err:?}");
device_entry.status = DeviceStatus::Error;
Ok(Err(err)) => {
match err {
tokio::sync::broadcast::error::RecvError::Lagged(_) => {
error!(
"Device connection error. Device id: {device_id:?}, Error: {err:?}"
);
false
}
tokio::sync::broadcast::error::RecvError::Closed => {
error!("Device connection error. Device id: {device_id:?}, Error: {err:?}");
false
}
}
},
}
Ok(Ok(_ok)) => {
debug!("Device still responsive. Device id: {device_id:?}");
true
}
}
}
device_selection => {
error!("Device connection error, Cannot check health of {device_selection:?}. Device id: {device_id:?}");
error!("Cannot check health of {device_selection:?}. Device id: {device_id:?}");
false
}
}
}

async fn check_running_device(device_entry: &mut Device, device_id: Uuid) {
async fn check_running_device(device_entry: &mut Device, device_id: Uuid) -> bool {
let Some(handler) = &device_entry.handler else {
return;
return true;
};

let handler_clone = handler.clone();
Expand All @@ -519,21 +594,19 @@ impl DeviceManager {
.await
{
Err(_err) => {
error!(
"Device connection timeout, marking with error. Device id: {:?}",
device_id
);
device_entry.status = DeviceStatus::Error;
error!("Device connection timeout. Device id: {:?}", device_id);
false
}
Ok(Err(err)) => {
error!(
"Device connection error, marking with error. Device id: {:?}, Error: {:?}",
"Device connection error. Device id: {:?}, Error: {:?}",
device_id, err
);
device_entry.status = DeviceStatus::Error;
false
}
Ok(Ok(_answer)) => {
debug!("Device still responsive. Device id: {:?}", device_id);
true
}
}
}
Expand Down
Loading