Skip to content
This repository was archived by the owner on Jul 17, 2025. It is now read-only.

Commit c02298d

Browse files
Erika Hunhoffhunhoffe
authored andcommitted
Add multicore client rackscale integration test
1 parent 9b7b5b2 commit c02298d

5 files changed

Lines changed: 214 additions & 13 deletions

File tree

kernel/src/arch/x86_64/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,8 @@ fn _start(argc: isize, _argv: *const *const u8) -> isize {
414414
.get()
415415
.map_or(false, |c| c.mode == crate::cmdline::Mode::Client)
416416
{
417-
let _ = spin::lazy::Lazy::force(&rackscale::client::RPC_CLIENT);
417+
// Force client instantiation
418+
let _ = rackscale::client::RPC_CLIENT.clone();
418419
}
419420

420421
// Bring up the rest of the system (needs topology, APIC, and global memory)

kernel/src/arch/x86_64/rackscale/client.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@
22
// SPDX-License-Identifier: Apache-2.0 OR MIT
33

44
use alloc::boxed::Box;
5+
use alloc::sync::Arc;
56
use hashbrown::HashMap;
67
use lazy_static::lazy_static;
78
use spin::{Lazy, Mutex};
89

910
use rpc::api::{RPCClient, RPCHandler, RegistrationHandler};
11+
use rpc::client::Client;
1012
use rpc::rpc::{ClientId, RPCError, RPCHeader};
1113

1214
use crate::cmdline::Transport;
@@ -17,31 +19,31 @@ use crate::transport::shmem::SHMEM_REGION;
1719
/// A handle to an RPC client
1820
///
1921
/// This is used to send requests to a remote control-plane.
20-
#[thread_local]
21-
pub(crate) static RPC_CLIENT: Lazy<Mutex<Box<dyn RPCClient>>> = Lazy::new(|| {
22+
lazy_static! {
23+
pub(crate) static ref RPC_CLIENT: Arc<Mutex<Box<Client>>> =
2224
// Create network stack and instantiate RPC Client
23-
return if crate::CMDLINE
25+
if crate::CMDLINE
2426
.get()
2527
.map_or(false, |c| c.transport == Transport::Ethernet)
2628
{
2729
// To support alloc_phys, client needs shared memory to be mapped
2830
lazy_static::initialize(&SHMEM_REGION);
2931

30-
Mutex::new(
32+
Arc::new(Mutex::new(
3133
crate::transport::ethernet::init_ethernet_rpc(
3234
smoltcp::wire::IpAddress::v4(172, 31, 0, 11),
3335
6970,
3436
true,
3537
)
3638
.expect("Failed to initialize ethernet RPC"),
37-
)
39+
))
3840
} else {
3941
// Default is Shmem, even if transport unspecified
40-
Mutex::new(
42+
Arc::new(Mutex::new(
4143
crate::transport::shmem::init_shmem_rpc(true).expect("Failed to initialize shmem RPC"),
42-
)
44+
))
4345
};
44-
});
46+
}
4547

4648
// Mapping between local frame IDs and remote memory address space ID (node id, currently).
4749
lazy_static! {

kernel/src/arch/x86_64/rackscale/registration.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,10 @@ pub(crate) fn initialize_client(
6363
if send_client_data {
6464
let (affinity_shmem_offset, affinity_shmem_size) = get_affinity_shmem();
6565

66-
// TODO: calculate cores correctly
6766
let req = ClientRegistrationRequest {
6867
affinity_shmem_offset,
6968
affinity_shmem_size,
70-
num_cores: 4,
69+
num_cores: atopology::MACHINE_TOPOLOGY.num_threads() as u64,
7170
};
7271
let mut req_data = [0u8; core::mem::size_of::<ClientRegistrationRequest>()];
7372
unsafe { encode(&req, &mut (&mut req_data).as_mut()) }.unwrap();

kernel/tests/s06_rackscale_tests.rs

Lines changed: 117 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,14 @@
1010
//! * `s06_*`: Rackscale (distributed) tests
1111
1212
use rexpect::errors::*;
13+
use rexpect::process::signal::SIGTERM;
1314
use rexpect::process::wait::WaitStatus;
1415

15-
use testutils::builder::BuildArgs;
16+
use testutils::builder::{BuildArgs, Machine};
1617
use testutils::helpers::{
1718
setup_network, setup_shmem, spawn_dcm, spawn_nrk, SHMEM_PATH, SHMEM_SIZE,
1819
};
19-
use testutils::runner_args::{check_for_successful_exit, RunnerArgs};
20+
use testutils::runner_args::{check_for_successful_exit, wait_for_sigterm, RunnerArgs};
2021

2122
#[cfg(not(feature = "baremetal"))]
2223
#[test]
@@ -479,3 +480,117 @@ fn s06_rackscale_shmem_multiinstance() {
479480

480481
let _ignore = remove_file(SHMEM_PATH);
481482
}
483+
484+
#[cfg(not(feature = "baremetal"))]
485+
#[test]
486+
fn s06_rackscale_shmem_userspace_multicore_test() {
487+
rackscale_userspace_multicore_test(true);
488+
}
489+
490+
#[cfg(not(feature = "baremetal"))]
491+
#[test]
492+
fn s06_rackscale_ethernet_userspace_multicore_test() {
493+
rackscale_userspace_multicore_test(false);
494+
}
495+
496+
#[cfg(not(feature = "baremetal"))]
497+
fn rackscale_userspace_multicore_test(is_shmem: bool) {
498+
use std::fs::remove_file;
499+
use std::sync::Arc;
500+
use std::thread::sleep;
501+
use std::time::Duration;
502+
503+
// Setup ivshmem file
504+
setup_shmem(SHMEM_PATH, SHMEM_SIZE);
505+
506+
setup_network(2);
507+
let timeout = 30_000;
508+
509+
let machine = Machine::determine();
510+
let client_num_cores: usize = machine.max_cores() - 1;
511+
512+
// Create build for both controller and client
513+
let build = Arc::new(
514+
BuildArgs::default()
515+
.module("init")
516+
.user_feature("test-scheduler-smp")
517+
.kernel_feature("shmem")
518+
.kernel_feature("ethernet")
519+
.kernel_feature("rackscale")
520+
.release()
521+
.build(),
522+
);
523+
524+
// Run DCM and controller in separate thread
525+
let build1 = build.clone();
526+
let controller = std::thread::spawn(move || {
527+
let controller_cmd = if is_shmem {
528+
"mode=controller transport=shmem"
529+
} else {
530+
"mode=controller transport=ethernet"
531+
};
532+
let cmdline_controller = RunnerArgs::new_with_build("userspace-smp", &build1)
533+
.timeout(timeout)
534+
.cmd(controller_cmd)
535+
.shmem_size(SHMEM_SIZE as usize)
536+
.shmem_path(SHMEM_PATH)
537+
.tap("tap0")
538+
.no_network_setup()
539+
.workers(2)
540+
.use_vmxnet3();
541+
542+
let mut output = String::new();
543+
let mut qemu_run = || -> Result<WaitStatus> {
544+
let mut dcm = spawn_dcm(1, timeout)?;
545+
let mut p = spawn_nrk(&cmdline_controller)?;
546+
547+
//output += p.exp_string("Finished sending requests!")?.as_str();
548+
output += p.exp_eof()?.as_str();
549+
550+
dcm.send_control('c')?;
551+
p.process.exit()
552+
};
553+
554+
let _ignore = qemu_run();
555+
});
556+
557+
// Run client in separate thead. Wait a bit to make sure DCM and controller started
558+
let build2 = build.clone();
559+
let _client = std::thread::spawn(move || {
560+
sleep(Duration::from_millis(5_000));
561+
let client_cmd = if is_shmem {
562+
"mode=client transport=shmem"
563+
} else {
564+
"mode=client transport=ethernet"
565+
};
566+
let cmdline_client = RunnerArgs::new_with_build("userspace-smp", &build2)
567+
.timeout(timeout)
568+
.cmd(client_cmd)
569+
.shmem_size(SHMEM_SIZE as usize)
570+
.shmem_path(SHMEM_PATH)
571+
.tap("tap2")
572+
.no_network_setup()
573+
.workers(2)
574+
.cores(client_num_cores)
575+
.memory(4096)
576+
.use_vmxnet3();
577+
578+
let mut output = String::new();
579+
let mut qemu_run = || -> Result<WaitStatus> {
580+
let mut p = spawn_nrk(&cmdline_client)?;
581+
582+
for _i in 0..client_num_cores {
583+
let r = p.exp_regex(r#"init: Hello from core (\d+)"#)?;
584+
output += r.0.as_str();
585+
output += r.1.as_str();
586+
}
587+
p.process.kill(SIGTERM)
588+
};
589+
590+
wait_for_sigterm(&cmdline_client, qemu_run(), output);
591+
});
592+
593+
controller.join().unwrap();
594+
595+
let _ignore = remove_file(SHMEM_PATH);
596+
}

lib/rpc/tests/integration-test.rs

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,3 +74,87 @@ fn test_client_server_shmem_transport() {
7474
.unwrap();
7575
assert_eq!(send_data, recv_data);
7676
}
77+
78+
#[test]
79+
fn test_client_shmem_multithread() {
80+
use spin::Mutex;
81+
use std::alloc::{alloc, Layout};
82+
use std::sync::Arc;
83+
use std::thread;
84+
85+
use rpc::api::{RPCClient, RPCHandler, RPCServer, RegistrationHandler};
86+
use rpc::client::Client;
87+
use rpc::rpc::{ClientId, RPCError, RPCHeader};
88+
use rpc::server::Server;
89+
use rpc::transport::shmem::allocator::ShmemAllocator;
90+
use rpc::transport::shmem::{Queue, Receiver, Sender};
91+
use rpc::transport::ShmemTransport;
92+
93+
let alloc_size = 128 * 1024 * 1024;
94+
let alloc = (unsafe { alloc(Layout::from_size_align(alloc_size, 1).expect("Layout failed")) }
95+
as *mut u8) as u64;
96+
97+
let allocator = ShmemAllocator::new(alloc, alloc_size as u64);
98+
// Create transport
99+
let server_to_client_queue = Arc::new(Queue::with_capacity_in(true, 32, &allocator).unwrap());
100+
let client_to_server_queue = Arc::new(Queue::with_capacity_in(true, 32, &allocator).unwrap());
101+
102+
let server_sender = Sender::with_shared_queue(server_to_client_queue.clone());
103+
let server_receiver = Receiver::with_shared_queue(client_to_server_queue.clone());
104+
let server_transport = ShmemTransport::new(server_receiver, server_sender);
105+
106+
thread::spawn(move || {
107+
// Create a server
108+
let rpc_server_transport = Box::new(server_transport);
109+
let mut server = Server::new(rpc_server_transport);
110+
111+
// Register an echo RPC
112+
fn echo_rpc_handler(_hdr: &mut RPCHeader, _payload: &mut [u8]) -> Result<(), RPCError> {
113+
Ok(())
114+
}
115+
const ECHO_HANDLER: RPCHandler = echo_rpc_handler;
116+
server.register(1, &ECHO_HANDLER).unwrap();
117+
118+
// Accept a client
119+
fn register_client(
120+
_hdr: &mut RPCHeader,
121+
_payload: &mut [u8],
122+
) -> Result<ClientId, RPCError> {
123+
Ok(0)
124+
}
125+
pub const CLIENT_REGISTRAR: RegistrationHandler = register_client;
126+
server.add_client(&CLIENT_REGISTRAR).unwrap();
127+
128+
// Run the server
129+
server.run_server().unwrap();
130+
});
131+
132+
// Create a client
133+
let client_sender = Sender::with_shared_queue(client_to_server_queue.clone());
134+
let client_receiver = Receiver::with_shared_queue(server_to_client_queue.clone());
135+
let client_transport = ShmemTransport::new(client_receiver, client_sender);
136+
let rpc_client_transport = Box::new(client_transport);
137+
let client = Arc::new(Mutex::new(Client::new(rpc_client_transport)));
138+
139+
{
140+
// Connect to server
141+
let my_client = Arc::clone(&client);
142+
my_client.lock().connect(&[]).unwrap();
143+
}
144+
145+
for _ in 1..10 {
146+
let my_client = Arc::clone(&client);
147+
thread::spawn(move || {
148+
// Setup for RPCs
149+
let send_data = [1u8; 40];
150+
let mut recv_data = [0u8; 40];
151+
152+
// Test echo
153+
my_client
154+
.lock()
155+
.call(0, 1, &[&send_data], &mut [&mut recv_data])
156+
.unwrap();
157+
assert_eq!(send_data, recv_data);
158+
});
159+
}
160+
}

0 commit comments

Comments
 (0)