diff --git a/nexosim/src/server/codegen/simulation.v1.rs b/nexosim/src/server/codegen/simulation.v1.rs index 2f9d28a..12a3c98 100644 --- a/nexosim/src/server/codegen/simulation.v1.rs +++ b/nexosim/src/server/codegen/simulation.v1.rs @@ -583,6 +583,8 @@ pub mod schedule_query_request { } #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] pub struct ScheduleQueryReply { + /// This field is hoisted because protobuf3 does not support `repeated` within + /// a `oneof`. It is Always empty if an error is returned #[prost(bytes = "vec", repeated, tag = "1")] pub replies: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec>, /// Always returns exactly 1 variant. diff --git a/nexosim/tests/codegen/simulation.v1.rs b/nexosim/tests/codegen/simulation.v1.rs index 374a575..aeb9c7e 100644 --- a/nexosim/tests/codegen/simulation.v1.rs +++ b/nexosim/tests/codegen/simulation.v1.rs @@ -583,6 +583,8 @@ pub mod schedule_query_request { } #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] pub struct ScheduleQueryReply { + /// This field is hoisted because protobuf3 does not support `repeated` within + /// a `oneof`. It is Always empty if an error is returned #[prost(bytes = "vec", repeated, tag = "1")] pub replies: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec>, /// Always returns exactly 1 variant. @@ -742,6 +744,7 @@ pub enum ErrorCode { DuplicateQuerySource = 24, DuplicateEventSink = 25, InvalidBenchConfig = 26, + BenchAlreadyBuilt = 27, /// Simulation runtime error. SimulationPanic = 40, SimulationNotStarted = 42, @@ -788,6 +791,7 @@ impl ErrorCode { Self::DuplicateQuerySource => "DUPLICATE_QUERY_SOURCE", Self::DuplicateEventSink => "DUPLICATE_EVENT_SINK", Self::InvalidBenchConfig => "INVALID_BENCH_CONFIG", + Self::BenchAlreadyBuilt => "BENCH_ALREADY_BUILT", Self::SimulationPanic => "SIMULATION_PANIC", Self::SimulationNotStarted => "SIMULATION_NOT_STARTED", Self::SimulationTerminated => "SIMULATION_TERMINATED", @@ -828,6 +832,7 @@ impl ErrorCode { "DUPLICATE_QUERY_SOURCE" => Some(Self::DuplicateQuerySource), "DUPLICATE_EVENT_SINK" => Some(Self::DuplicateEventSink), "INVALID_BENCH_CONFIG" => Some(Self::InvalidBenchConfig), + "BENCH_ALREADY_BUILT" => Some(Self::BenchAlreadyBuilt), "SIMULATION_PANIC" => Some(Self::SimulationPanic), "SIMULATION_NOT_STARTED" => Some(Self::SimulationNotStarted), "SIMULATION_TERMINATED" => Some(Self::SimulationTerminated), diff --git a/nexosim/tests/integration/main.rs b/nexosim/tests/integration/main.rs index 06b96db..41e75f1 100644 --- a/nexosim/tests/integration/main.rs +++ b/nexosim/tests/integration/main.rs @@ -6,6 +6,8 @@ mod injector; mod model_scheduling; mod serialization; #[cfg(all(feature = "server", not(miri)))] +mod server_basic; +#[cfg(all(feature = "server", not(miri)))] mod server_scheduling; #[cfg(not(miri))] mod simulation_clock_sync; @@ -20,3 +22,7 @@ mod simulation_scheduling; mod simulation_ticked_mode; #[cfg(not(miri))] mod simulation_timeout; + +// Shared server test utils. +#[cfg(all(feature = "server", not(miri)))] +mod server_utils; diff --git a/nexosim/tests/integration/server_basic.rs b/nexosim/tests/integration/server_basic.rs new file mode 100644 index 0000000..6d5eef8 --- /dev/null +++ b/nexosim/tests/integration/server_basic.rs @@ -0,0 +1,240 @@ +use prost_types::Timestamp; +use serde::{Deserialize, Serialize}; + +use nexosim::model::{Context, Model}; +use nexosim::ports::QuerySource; +use nexosim::simulation::{Mailbox, SimInit}; + +use super::server_utils::get_client; +use super::server_utils::grpc_client::{ + BuildReply, BuildRequest, Error, ErrorCode, InitReply, InitRequest, Path, ProcessQueryReply, + ProcessQueryRequest, StepUntilRequest, TerminateReply, TerminateRequest, build_reply, + init_reply, process_query_reply, step_until_request, terminate_reply, +}; +use crate::some_deadline_secs; + +macro_rules! assert_resp_ok { + ($resp:ident, $type:ident, $module:ident) => { + match $resp.into_inner() { + $type { + result: Some($module::Result::Empty(())), + } => (), + a => panic!("Expected Ok, got: {:?}", a), + }; + }; +} +macro_rules! assert_resp_err { + ($resp:ident, $type:ident, $module:ident, $code:expr) => { + match $resp.into_inner() { + $type { + result: Some($module::Result::Error(Error { code, .. })), + } if code == $code as i32 => (), + a => panic!("Expected Err: {:?}, got: {:?}", $code, a), + }; + }; +} + +#[derive(Serialize, Deserialize)] +struct SimpleModel; +#[Model] +impl SimpleModel { + async fn query(&mut self, value: i64, cx: &Context) -> i64 { + value * cx.time().as_secs() + } +} + +fn simple_bench(_: u8) -> Result> { + let simple = SimpleModel; + let simple_mbox = Mailbox::new(); + + let mut bench = SimInit::new(); + + QuerySource::new() + .connect(SimpleModel::query, &simple_mbox) + .bind_endpoint(&mut bench, "query")?; + + // Bench assembly. + bench = bench.add_model(simple, simple_mbox, "simple"); + Ok(bench) +} + +#[tokio::test] +async fn init_before_build_fail() { + let (mut client, signal, handle) = get_client(simple_bench).await; + + let resp = client + .init(InitRequest { + time: Some(Timestamp::default()), + }) + .await + .unwrap(); + + assert_resp_err!(resp, InitReply, init_reply, ErrorCode::BenchNotBuilt); + + // Shutdown. + signal.send(()).unwrap(); + handle.await.unwrap(); +} + +#[tokio::test] +async fn subsequent_build_fail() { + let (mut client, signal, handle) = get_client(simple_bench).await; + + let resp = client.build(BuildRequest { cfg: vec![0] }).await.unwrap(); + assert_resp_ok!(resp, BuildReply, build_reply); + + let resp = client.build(BuildRequest { cfg: vec![0] }).await.unwrap(); + assert_resp_err!(resp, BuildReply, build_reply, ErrorCode::BenchAlreadyBuilt); + + // Shutdown. + signal.send(()).unwrap(); + handle.await.unwrap(); +} + +#[tokio::test] +async fn build_terminate_build() { + let (mut client, signal, handle) = get_client(simple_bench).await; + + let resp = client.build(BuildRequest { cfg: vec![0] }).await.unwrap(); + assert_resp_ok!(resp, BuildReply, build_reply); + + let resp = client.terminate(TerminateRequest {}).await.unwrap(); + assert_resp_ok!(resp, TerminateReply, terminate_reply); + + let resp = client.build(BuildRequest { cfg: vec![0] }).await.unwrap(); + assert_resp_ok!(resp, BuildReply, build_reply); + + // Shutdown. + signal.send(()).unwrap(); + handle.await.unwrap(); +} + +#[tokio::test] +async fn build_after_init_fail() { + let (mut client, signal, handle) = get_client(simple_bench).await; + + let resp = client.build(BuildRequest { cfg: vec![0] }).await.unwrap(); + assert_resp_ok!(resp, BuildReply, build_reply); + + let resp = client + .init(InitRequest { + time: Some(Timestamp::default()), + }) + .await + .unwrap(); + assert_resp_ok!(resp, InitReply, init_reply); + + let _ = client + .step_until(StepUntilRequest { + deadline: some_deadline_secs!(3, step_until_request), + }) + .await + .unwrap(); + + let resp = client.build(BuildRequest { cfg: vec![0] }).await.unwrap(); + assert_resp_err!(resp, BuildReply, build_reply, ErrorCode::BenchAlreadyBuilt); + + // Verify that the simulation is sill alive and at a right ts. + let resp = client + .process_query(ProcessQueryRequest { + source: Some(Path { + segments: vec!["query".to_string()], + }), + request: vec![3], + }) + .await + .unwrap(); + match resp.into_inner() { + ProcessQueryReply { + result: Some(process_query_reply::Result::Empty(())), + replies, + } if replies == vec![vec![9]] => (), + a => panic!("Expected replies: [[9]], got: {:?}", a), + }; + + // Shutdown. + signal.send(()).unwrap(); + handle.await.unwrap(); +} + +#[tokio::test] +async fn subsequent_init_fail() { + let (mut client, signal, handle) = get_client(simple_bench).await; + + let resp = client.build(BuildRequest { cfg: vec![0] }).await.unwrap(); + assert_resp_ok!(resp, BuildReply, build_reply); + + // First init. + let resp = client + .init(InitRequest { + time: Some(Timestamp::default()), + }) + .await + .unwrap(); + assert_resp_ok!(resp, InitReply, init_reply); + + // Step the simulation. + let _ = client + .step_until(StepUntilRequest { + deadline: some_deadline_secs!(2, step_until_request), + }) + .await + .unwrap(); + + // Second init attempt. + let resp = client + .init(InitRequest { + time: Some(Timestamp::default()), + }) + .await + .unwrap(); + assert_resp_err!(resp, InitReply, init_reply, ErrorCode::BenchNotBuilt); + + // Verify that the simulation is sill alive and at a right ts. + let resp = client + .process_query(ProcessQueryRequest { + source: Some(Path { + segments: vec!["query".to_string()], + }), + request: vec![3], + }) + .await + .unwrap(); + match resp.into_inner() { + ProcessQueryReply { + result: Some(process_query_reply::Result::Empty(())), + replies, + } if replies == vec![vec![6]] => (), + a => panic!("Expected replies: [[6]], got: {:?}", a), + }; + + // Shutdown. + signal.send(()).unwrap(); + handle.await.unwrap(); +} + +#[tokio::test] +async fn build_after_complete_cycle() { + let (mut client, signal, handle) = get_client(simple_bench).await; + + let resp = client.build(BuildRequest { cfg: vec![0] }).await.unwrap(); + assert_resp_ok!(resp, BuildReply, build_reply); + + let resp = client + .init(InitRequest { + time: Some(Timestamp::default()), + }) + .await + .unwrap(); + assert_resp_ok!(resp, InitReply, init_reply); + + let resp = client.terminate(TerminateRequest {}).await.unwrap(); + assert_resp_ok!(resp, TerminateReply, terminate_reply); + + let resp = client.build(BuildRequest { cfg: vec![0] }).await.unwrap(); + assert_resp_ok!(resp, BuildReply, build_reply); + + // Shutdown. + signal.send(()).unwrap(); + handle.await.unwrap(); +} diff --git a/nexosim/tests/integration/server_scheduling.rs b/nexosim/tests/integration/server_scheduling.rs index e41cb83..6649c4c 100644 --- a/nexosim/tests/integration/server_scheduling.rs +++ b/nexosim/tests/integration/server_scheduling.rs @@ -10,24 +10,13 @@ use nexosim::model::{Context, Model}; use nexosim::ports::{EventSource, Output, QuerySource, SinkState, event_slot_endpoint}; use nexosim::simulation::{Mailbox, SimInit}; -mod grpc_client { - include!("../codegen/simulation.v1.rs"); -} -use grpc_client::{ - BuildRequest, InitRequest, ReadEventRequest, ScheduleEventRequest, ScheduleQueryRequest, +use super::server_utils::get_client; +use super::server_utils::grpc_client::{ + BuildRequest, InitRequest, Path, ReadEventRequest, ScheduleEventRequest, ScheduleQueryRequest, StepUntilRequest, read_event_reply, schedule_event_request, schedule_query_request, simulation_client::SimulationClient, step_until_request, }; - -/// Helper macro to generate namespaced deadline argument for gRPC requests. -macro_rules! some_deadline_secs { - ($seconds:expr, $request:ident) => { - Some($request::Deadline::Duration(prost_types::Duration { - seconds: $seconds, - ..Default::default() - })) - }; -} +use crate::some_deadline_secs; #[derive(Default, Serialize, Deserialize)] struct SimpleModel { @@ -65,12 +54,7 @@ pub fn simple_bench(_: u8) -> Result> { Ok(bench) } -fn get_free_port() -> u16 { - let socket = std::net::TcpListener::bind("0.0.0.0:0").unwrap(); - socket.local_addr().unwrap().port() -} - -async fn get_client( +async fn init_client( bench: F, ) -> ( SimulationClient, @@ -81,26 +65,7 @@ where F: FnMut(I) -> Result> + Send + 'static, I: serde::de::DeserializeOwned, { - let (tx, rx) = tokio::sync::oneshot::channel(); - let signal = async move { - rx.await.unwrap(); - }; - let port = get_free_port(); - let handle = tokio::task::spawn_blocking(move || { - nexosim::server::run_with_shutdown( - bench, - format!("0.0.0.0:{port}").parse().unwrap(), - signal, - ) - .unwrap(); - }); - - // Make sure the server is up. - tokio::time::sleep(Duration::from_secs(1)).await; - - let mut client = SimulationClient::connect(format!("http://127.0.0.1:{port}")) - .await - .unwrap(); + let (mut client, signal, handle) = get_client(bench).await; let _ = client.build(BuildRequest { cfg: vec![0] }).await.unwrap(); let _ = client @@ -109,16 +74,16 @@ where }) .await .unwrap(); - (client, tx, handle) + (client, signal, handle) } #[tokio::test] async fn event_schedule_simple() { - let (mut client, signal, handle) = get_client(simple_bench).await; + let (mut client, signal, handle) = init_client(simple_bench).await; let _ = client .schedule_event(ScheduleEventRequest { - source: Some(grpc_client::Path { + source: Some(Path { segments: vec!["input".to_string()], }), event: vec![7], @@ -131,7 +96,7 @@ async fn event_schedule_simple() { let _ = client .schedule_event(ScheduleEventRequest { - source: Some(grpc_client::Path { + source: Some(Path { segments: vec!["input".to_string()], }), event: vec![7], @@ -152,7 +117,7 @@ async fn event_schedule_simple() { let response = client .read_event(ReadEventRequest { - sink: Some(grpc_client::Path { + sink: Some(Path { segments: vec!["sink".to_string()], }), timeout: Some(prost_types::Duration { @@ -176,7 +141,7 @@ async fn event_schedule_simple() { let response = client .read_event(ReadEventRequest { - sink: Some(grpc_client::Path { + sink: Some(Path { segments: vec!["sink".to_string()], }), timeout: Some(prost_types::Duration { @@ -197,13 +162,13 @@ async fn event_schedule_simple() { #[tokio::test] async fn query_schedule_simple() { - let (mut client, signal, handle) = get_client(simple_bench).await; + let (mut client, signal, handle) = init_client(simple_bench).await; let mut query_client = client.clone(); let response_later = tokio::spawn(async move { query_client .schedule_query(ScheduleQueryRequest { - source: Some(grpc_client::Path { + source: Some(Path { segments: vec!["query".to_string()], }), request: vec![5], @@ -216,7 +181,7 @@ async fn query_schedule_simple() { let response_earlier = tokio::spawn(async move { query_client .schedule_query(ScheduleQueryRequest { - source: Some(grpc_client::Path { + source: Some(Path { segments: vec!["query".to_string()], }), request: vec![11], diff --git a/nexosim/tests/integration/server_utils.rs b/nexosim/tests/integration/server_utils.rs new file mode 100644 index 0000000..4d3f288 --- /dev/null +++ b/nexosim/tests/integration/server_utils.rs @@ -0,0 +1,65 @@ +use std::time::Duration; + +use tokio::sync::oneshot::Sender; +use tokio::task::JoinHandle; + +use nexosim::simulation::SimInit; + +pub(crate) mod grpc_client { + include!("../codegen/simulation.v1.rs"); +} +use grpc_client::simulation_client::SimulationClient; + +/// Helper macro to generate namespaced deadline argument for gRPC requests. +#[macro_export] +macro_rules! some_deadline_secs { + ($seconds:expr, $request:ident) => { + Some($request::Deadline::Duration(prost_types::Duration { + seconds: $seconds, + ..Default::default() + })) + }; +} + +fn get_free_port() -> u16 { + let socket = std::net::TcpListener::bind("0.0.0.0:0").unwrap(); + socket.local_addr().unwrap().port() +} + +/// Creates a client to a background-running gRPC server. +/// +/// Returns (gRPC client, shudtown signal, server thread handle) +pub(crate) async fn get_client( + bench: F, +) -> ( + SimulationClient, + Sender<()>, + JoinHandle<()>, +) +where + F: FnMut(I) -> Result> + Send + 'static, + I: serde::de::DeserializeOwned, +{ + let (tx, rx) = tokio::sync::oneshot::channel(); + let signal = async move { + rx.await.unwrap(); + }; + let port = get_free_port(); + let handle = tokio::task::spawn_blocking(move || { + nexosim::server::run_with_shutdown( + bench, + format!("0.0.0.0:{port}").parse().unwrap(), + signal, + ) + .unwrap(); + }); + + // Make sure the server is up. + tokio::time::sleep(Duration::from_secs(1)).await; + + let client = SimulationClient::connect(format!("http://127.0.0.1:{port}")) + .await + .unwrap(); + + (client, tx, handle) +}