@@ -65,7 +65,6 @@ def _run(self):
6565 buf = blob_file .read (BUF_SIZE )
6666 if not buf :
6767 break
68-
6968 sent = self .sock .send (buf )
7069 while sent < len (buf ):
7170 sent += self .sock .send (buf [sent :])
@@ -110,7 +109,21 @@ def start_guest_echo_server(vm):
110109 return os .path .join (vm .jailer .chroot_path (), VSOCK_UDS_PATH )
111110
112111
113- def check_host_connections (uds_path , blob_path , blob_hash ):
112+ def start_seqpacket_echo_server (vm ):
113+ """Start a vsock seqpacket echo server in the microVM.
114+
115+ Returns a UDS path to connect to the server.
116+ """
117+ cmd = f"nohup /tmp/vsock_seq_server serve { ECHO_SERVER_PORT } af_vsock >/dev/null 2>&1 &"
118+ vm .ssh .check_output (cmd )
119+
120+ # Give the server time to initialise
121+ time .sleep (1 )
122+
123+ return os .path .join (vm .jailer .chroot_path (), VSOCK_UDS_PATH )
124+
125+
126+ def check_host_connections (uds_path , blob_path , blob_hash , vsock_type = SOCK_STREAM ):
114127 """Test host-initiated connections.
115128
116129 This will spawn `TEST_CONNECTION_COUNT` `HostEchoWorker` threads.
@@ -121,7 +134,7 @@ def check_host_connections(uds_path, blob_path, blob_hash):
121134
122135 workers = []
123136 for _ in range (TEST_CONNECTION_COUNT ):
124- worker = HostEchoWorker (uds_path , blob_path )
137+ worker = HostEchoWorker (uds_path , blob_path , vsock_type )
125138 workers .append (worker )
126139 worker .start ()
127140
@@ -132,6 +145,76 @@ def check_host_connections(uds_path, blob_path, blob_hash):
132145 assert wrk .hash == blob_hash
133146
134147
148+ def check_guest_connections_seqpacket (
149+ vm , server_port_path , server_bin_path , blob_path , blob_hash
150+ ):
151+ """Test guest-initiated connections.
152+
153+ This will start an echo server on the host (in its own thread), then
154+ start `TEST_CONNECTION_COUNT` workers inside the guest VM, all
155+ communicating with the echo server.
156+ """
157+ port = server_port_path .split ("_" )[- 1 ]
158+ if Path (server_port_path ).exists ():
159+ Path (
160+ server_port_path
161+ ).unlink () # the vsock server program doesn't have reuseaddr
162+
163+ echo_server = Popen ([server_bin_path , "serve" , port , "af_unix" , server_port_path ])
164+
165+ try :
166+ # Give the server program bit of time to create the socket
167+ for attempt in Retrying (
168+ wait = wait_fixed (0.2 ),
169+ stop = stop_after_attempt (3 ),
170+ reraise = True ,
171+ ):
172+ with attempt :
173+ assert Path (server_port_path ).exists ()
174+
175+ # Link the listening Unix socket into the VM's jail, so that
176+ # Firecracker can connect to it.
177+ vm .create_jailed_resource (server_port_path )
178+
179+ # Increase maximum process count for the ssh service.
180+ # Avoids: "bash: fork: retry: Resource temporarily unavailable"
181+ # Needed to execute the bash script that tests for concurrent
182+ # vsock guest initiated connections.
183+ vm .ssh .check_output (
184+ "echo 1024 > /sys/fs/cgroup/system.slice/ssh.service/pids.max"
185+ )
186+
187+ # Build the guest worker sub-command.
188+ # `vsock_helper` will read the blob file from STDIN and send the echo
189+ # server response to STDOUT. This response is then hashed, and the
190+ # hash is compared against `blob_hash` (computed on the host). This
191+ # comparison sets the exit status of the worker command.
192+ worker_cmd = "hash=$("
193+ worker_cmd += "cat {}" .format (blob_path )
194+ worker_cmd += " | /tmp/vsock_helper echo 2 {} seqpacket" .format (
195+ ECHO_SERVER_PORT
196+ )
197+ worker_cmd += " | md5sum | cut -f1 -d\\ "
198+ worker_cmd += ")"
199+ worker_cmd += ' && [[ "$hash" = "{}" ]]' .format (blob_hash )
200+
201+ # Run `TEST_CONNECTION_COUNT` concurrent workers, using the above
202+ # worker sub-command.
203+ # If any worker fails, this command will fail. If all worker sub-commands
204+ # succeed, this will also succeed.
205+ cmd = 'workers="";'
206+ cmd += "for i in $(seq 1 {}); do" .format (TEST_CONNECTION_COUNT )
207+ cmd += " ({})& " .format (worker_cmd )
208+ cmd += ' workers="$workers $!";'
209+ cmd += "done;"
210+ cmd += "for w in $workers; do wait $w || (wait; exit 1); done"
211+
212+ vm .ssh .check_output (cmd )
213+ finally :
214+ echo_server .terminate ()
215+ echo_server .wait ()
216+
217+
135218def check_guest_connections (vm , server_port_path , blob_path , blob_hash ):
136219 """Test guest-initiated connections.
137220
@@ -173,7 +256,7 @@ def check_guest_connections(vm, server_port_path, blob_path, blob_hash):
173256 # comparison sets the exit status of the worker command.
174257 worker_cmd = "hash=$("
175258 worker_cmd += "cat {}" .format (blob_path )
176- worker_cmd += " | /tmp/vsock_helper echo 2 {}" .format (ECHO_SERVER_PORT )
259+ worker_cmd += " | /tmp/vsock_helper echo 2 {} stream " .format (ECHO_SERVER_PORT )
177260 worker_cmd += " | md5sum | cut -f1 -d\\ "
178261 worker_cmd += ")"
179262 worker_cmd += ' && [[ "$hash" = "{}" ]]' .format (blob_hash )
@@ -216,14 +299,19 @@ def _vsock_connect_to_guest(uds_path, port):
216299 return sock
217300
218301
219- def _copy_vsock_data_to_guest (ssh_connection , blob_path , vm_blob_path , vsock_helper ):
302+ def _copy_vsock_data_to_guest (
303+ ssh_connection , blob_path , vm_blob_path , vsock_helper = None , vsock_seq_server = None
304+ ):
220305 # Copy the data file and a vsock helper to the guest.
221306
222307 cmd = "mkdir -p /tmp/vsock"
223308 ecode , _ , _ = ssh_connection .run (cmd )
224309 assert ecode == 0 , "Failed to set up tmpfs drive on the guest."
310+ if vsock_helper :
311+ ssh_connection .scp_put (vsock_helper , "/tmp/vsock_helper" )
312+ if vsock_seq_server :
313+ ssh_connection .scp_put (vsock_seq_server , "/tmp/vsock_seq_server" )
225314
226- ssh_connection .scp_put (vsock_helper , "/tmp/vsock_helper" )
227315 ssh_connection .scp_put (blob_path , vm_blob_path )
228316
229317
0 commit comments