Skip to content

Commit 25b1e25

Browse files
committed
feat(scheduler): add InstanceScheduled structure
Signed-off-by: iverly <github@iverly.net>
1 parent 214f41f commit 25b1e25

5 files changed

Lines changed: 115 additions & 36 deletions

File tree

Lines changed: 21 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
1-
use log::debug;
2-
use tokio::sync::mpsc;
3-
use tokio_stream::wrappers::ReceiverStream;
4-
use tonic::{Request, Response, Status};
5-
61
use proto::scheduler::{
72
instance_service_server::InstanceService, Instance, InstanceIdentifier, InstanceStatus,
83
};
4+
use tokio::sync::mpsc;
5+
use tokio_stream::wrappers::ReceiverStream;
6+
use tonic::{Request, Response};
97

108
use crate::{manager::Manager, Event};
119

@@ -25,8 +23,8 @@ impl InstanceService for InstanceListener {
2523
async fn create(
2624
&self,
2725
request: Request<Instance>,
28-
) -> Result<Response<Self::CreateStream>, Status> {
29-
debug!("received request: {:?}", request);
26+
) -> Result<Response<Self::CreateStream>, tonic::Status> {
27+
log::debug!("received gRPC request: {:?}", request);
3028
let (tx, rx) = Manager::create_mpsc_channel();
3129

3230
match self
@@ -38,33 +36,22 @@ impl InstanceService for InstanceListener {
3836
return Ok(Response::new(ReceiverStream::new(rx)));
3937
}
4038
Err(_) => {
41-
return Err(Status::internal("could not send event to manager"));
39+
return Err(tonic::Status::internal("could not send event to manager"));
4240
}
4341
}
4442
}
4543

46-
type CreateStream = ReceiverStream<Result<InstanceStatus, Status>>;
44+
type CreateStream = ReceiverStream<Result<InstanceStatus, tonic::Status>>;
4745

48-
async fn start(&self, request: Request<InstanceIdentifier>) -> Result<Response<()>, Status> {
49-
debug!("received request: {:?}", request);
50-
let (tx, rx) = Manager::create_oneshot_channel();
51-
52-
match self
53-
.sender
54-
.send(Event::InstanceStart(request.into_inner().id, tx))
55-
.await
56-
{
57-
Ok(_) => {
58-
return rx.await.unwrap();
59-
}
60-
Err(_) => {
61-
return Err(Status::internal("could not send event to manager"));
62-
}
63-
}
46+
async fn start(&self, _: Request<InstanceIdentifier>) -> Result<Response<()>, tonic::Status> {
47+
Err(tonic::Status::unimplemented("not implemented"))
6448
}
6549

66-
async fn stop(&self, request: Request<InstanceIdentifier>) -> Result<Response<()>, Status> {
67-
debug!("received request: {:?}", request);
50+
async fn stop(
51+
&self,
52+
request: Request<InstanceIdentifier>,
53+
) -> Result<Response<()>, tonic::Status> {
54+
log::debug!("received gRPC request: {:?}", request);
6855
let (tx, rx) = Manager::create_oneshot_channel();
6956

7057
match self
@@ -76,13 +63,16 @@ impl InstanceService for InstanceListener {
7663
return rx.await.unwrap();
7764
}
7865
Err(_) => {
79-
return Err(Status::internal("could not send event to manager"));
66+
return Err(tonic::Status::internal("could not send event to manager"));
8067
}
8168
}
8269
}
8370

84-
async fn destroy(&self, request: Request<InstanceIdentifier>) -> Result<Response<()>, Status> {
85-
debug!("received request: {:?}", request);
71+
async fn destroy(
72+
&self,
73+
request: Request<InstanceIdentifier>,
74+
) -> Result<Response<()>, tonic::Status> {
75+
log::debug!("received gRPC request: {:?}", request);
8676
let (tx, rx) = Manager::create_oneshot_channel();
8777

8878
match self
@@ -94,7 +84,7 @@ impl InstanceService for InstanceListener {
9484
return rx.await.unwrap();
9585
}
9686
Err(_) => {
97-
return Err(Status::internal("could not send event to manager"));
87+
return Err(tonic::Status::internal("could not send event to manager"));
9888
}
9989
}
10090
}

scheduler/src/instance/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
pub mod listener;
2+
pub mod scheduled;
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
use proto::scheduler::{Instance, InstanceStatus, Status};
2+
use tokio::sync::mpsc;
3+
4+
use crate::{NodeIdentifier, ProxyError};
5+
6+
/// InstanceScheduled represents an instance that is scheduled to a node. It is used to send
7+
/// messages to the node and contains the node identifier where it's scheduled.
8+
///
9+
/// Properties:
10+
///
11+
/// * `id`: The id of the instance.
12+
/// * `instance`: The instance that is being registered.
13+
/// * `node_id`: The node identifier of the node that the instance is running on.
14+
/// * `tx`: This is the channel that the instance will use to send status updates to the controller.
15+
#[derive(Debug, Clone)]
16+
pub struct InstanceScheduled {
17+
pub id: String,
18+
pub instance: Instance,
19+
pub node_id: Option<NodeIdentifier>,
20+
pub tx: mpsc::Sender<Result<InstanceStatus, tonic::Status>>,
21+
}
22+
23+
impl InstanceScheduled {
24+
/// `new` creates a new `InstanceStatus` struct
25+
///
26+
/// Arguments:
27+
///
28+
/// * `id`: The id of the instance.
29+
/// * `instance`: The instance that we want to run.
30+
/// * `node_id`: The node identifier of the node that the instance is running on.
31+
/// * `tx`: This is the channel that the instance will use to send status updates to the controller.
32+
///
33+
/// Returns:
34+
///
35+
/// A new instance of the struct `InstanceStatus`
36+
pub fn new(
37+
id: String,
38+
instance: Instance,
39+
node_id: Option<NodeIdentifier>,
40+
tx: mpsc::Sender<Result<InstanceStatus, tonic::Status>>,
41+
) -> Self {
42+
Self {
43+
id,
44+
instance,
45+
node_id,
46+
tx,
47+
}
48+
}
49+
50+
/// This function updates the status of the instance and sends the updated status to the controller
51+
///
52+
/// Arguments:
53+
///
54+
/// * `status`: The status of the node.
55+
/// * `description`: A string that describes the status of the node.
56+
///
57+
/// Returns:
58+
///
59+
/// A Result<(), ProxyError>
60+
pub async fn change_status(
61+
&mut self,
62+
status: Status,
63+
description: Option<String>,
64+
) -> Result<(), ProxyError> {
65+
self.instance.status = status.into();
66+
67+
self.tx
68+
.send(Ok(InstanceStatus {
69+
id: self.id.clone(),
70+
status: status.into(),
71+
status_description: description.unwrap_or_else(|| "".to_string()),
72+
resource: match self.instance.status() {
73+
Status::Running => self.instance.resource.clone(),
74+
_ => None,
75+
},
76+
}))
77+
.await
78+
.map_err(|_| ProxyError::ChannelSenderError)?;
79+
80+
Ok(())
81+
}
82+
}

scheduler/src/lib.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use tokio::sync::{mpsc, oneshot};
77
use tonic::Response;
88

99
pub mod config;
10-
pub mod instance_listener;
10+
pub mod instance;
1111
pub mod manager;
1212
pub mod node_listener;
1313
pub mod storage;
@@ -26,13 +26,20 @@ pub enum SchedulerError {
2626
Unknown,
2727
}
2828

29+
#[derive(Error, Debug)]
30+
pub enum ProxyError {
31+
#[error("an error occurred while sending a message to the channel")]
32+
ChannelSenderError,
33+
}
34+
2935
#[derive(Debug)]
3036
#[allow(dead_code)]
3137
pub struct Node {
3238
id: String,
3339
}
3440

3541
pub type NodeIdentifier = String;
42+
pub type InstanceIdentifier = String;
3643

3744
#[derive(Debug)]
3845
pub enum Event {

scheduler/src/manager.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,9 @@ use tokio::sync::mpsc;
1010
use tokio::{sync::oneshot, task::JoinHandle};
1111
use tonic::{transport::Server, Response};
1212

13+
use crate::instance::listener::InstanceListener;
1314
use crate::SchedulerError;
14-
use crate::{
15-
config::Config, instance_listener::InstanceListener, node_listener::NodeListener,
16-
storage::Storage, Event, Node,
17-
};
15+
use crate::{config::Config, node_listener::NodeListener, storage::Storage, Event, Node};
1816

1917
#[derive(Debug)]
2018
pub struct Manager {

0 commit comments

Comments
 (0)