Skip to content

Commit eb889a7

Browse files
Speed up tests (#686)
* speed up tests * fix fake socket test race * fix fake socket setup order * wait for vipc listener startup * Revert "wait for vipc listener startup" This reverts commit b96129a. * reverts * down to 0.2s * fix that * don't need that
1 parent 55d1547 commit eb889a7

6 files changed

Lines changed: 34 additions & 16 deletions

File tree

msgq/ipc_pyx.pyx

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# cython: c_string_encoding=ascii, language_level=3
33

44
import sys
5+
import time
56
from libcpp.string cimport string
67
from libcpp.vector cimport vector
78
from libcpp cimport bool
@@ -249,3 +250,11 @@ cdef class PubSocket:
249250

250251
def all_readers_updated(self):
251252
return self.socket.all_readers_updated()
253+
254+
def wait_for_readers(self, double timeout=1.0, double interval=0.001):
255+
deadline = time.monotonic() + timeout
256+
while time.monotonic() < deadline:
257+
if self.all_readers_updated():
258+
return
259+
time.sleep(interval)
260+
raise TimeoutError("subscriber did not connect")

msgq/tests/test_messaging.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@ def test_pub_sub(self):
1717
sock = random_sock()
1818
pub_sock = msgq.pub_sock(sock)
1919
sub_sock = msgq.sub_sock(sock, conflate=False, timeout=None)
20-
21-
for _ in range(1000):
20+
for _ in range(100):
2221
msg = random_bytes()
2322
pub_sock.send(msg)
2423
recvd = sub_sock.receive()
@@ -36,7 +35,6 @@ def test_conflate(self):
3635
msg = random_bytes()
3736
pub_sock.send(msg)
3837
sent_msgs.append(msg)
39-
time.sleep(0.1)
4038
recvd_msgs = msgq.drain_sock_raw(sub_sock)
4139
if conflate:
4240
assert len(recvd_msgs) == 1
@@ -48,7 +46,7 @@ def test_conflate(self):
4846

4947
def test_receive_timeout(self):
5048
sock = random_sock()
51-
timeout_ms = 50
49+
timeout_ms = 5
5250
sub_sock = msgq.sub_sock(sock, timeout=timeout_ms)
5351

5452
start_time = time.monotonic()

msgq/tests/test_poller.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import unittest
2-
import time
32
import msgq
43
import concurrent.futures
54

@@ -30,7 +29,7 @@ def test_poll_once(self):
3029
with concurrent.futures.ThreadPoolExecutor() as e:
3130
poll = e.submit(poller)
3231

33-
time.sleep(0.1) # Slow joiner syndrome
32+
pub.wait_for_readers()
3433

3534
# Send message
3635
pub.send(b"a")
@@ -52,13 +51,11 @@ def test_poll_and_create_many_subscribers(self):
5251
with concurrent.futures.ThreadPoolExecutor() as e:
5352
poll = e.submit(poller)
5453

55-
time.sleep(0.1) # Slow joiner syndrome
54+
pub.wait_for_readers()
5655
c = msgq.Context()
5756
for _ in range(10):
5857
msgq.SubSocket().connect(c, SERVICE_NAME)
5958

60-
time.sleep(0.1)
61-
6259
# Send message
6360
pub.send(b"a")
6461

@@ -95,7 +92,7 @@ def test_multiple_messages(self):
9592
sub = msgq.SubSocket()
9693
sub.connect(context, SERVICE_NAME)
9794

98-
time.sleep(0.1) # Slow joiner
95+
pub.wait_for_readers()
9996

10097
for i in range(1, 100):
10198
pub.send(b'a'*i)
@@ -127,7 +124,7 @@ def test_conflate(self):
127124
sub = msgq.SubSocket()
128125
sub.connect(context, SERVICE_NAME, conflate=True)
129126

130-
time.sleep(0.1) # Slow joiner
127+
pub.wait_for_readers()
131128
pub.send(b'a')
132129
pub.send(b'b')
133130

msgq/visionipc/tests/test_visionipc.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,5 +99,5 @@ def test_conflate(self):
9999
assert recv_buf is not None
100100
assert self.client.frame_id == 2
101101

102-
recv_buf = self.client.recv()
102+
recv_buf = self.client.recv(timeout_ms=5)
103103
assert recv_buf is None

msgq/visionipc/visionipc_client.cc

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,13 @@
1414
static int connect_to_vipc_server(const std::string &name, bool blocking) {
1515
const std::string ipc_path = get_ipc_path(name);
1616
int socket_fd = ipc_connect(ipc_path.c_str());
17+
bool logged_retry = false;
1718
while (socket_fd < 0 && blocking) {
18-
std::cout << "VisionIpcClient connecting" << std::endl;
19-
std::this_thread::sleep_for(std::chrono::milliseconds(100));
19+
if (!logged_retry) {
20+
std::cout << "VisionIpcClient connecting" << std::endl;
21+
logged_retry = true;
22+
}
23+
std::this_thread::sleep_for(std::chrono::milliseconds(20));
2024
socket_fd = ipc_connect(ipc_path.c_str());
2125
}
2226
return socket_fd;

msgq/visionipc/visionipc_server.cc

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,11 @@ void VisionIpcServer::listener(){
105105

106106
VisionStreamType type = VisionStreamType::VISION_STREAM_MAX;
107107
int r = ipc_sendrecv_with_fds(false, fd, &type, sizeof(type), nullptr, 0, nullptr);
108-
assert(r == sizeof(type));
108+
if (r != sizeof(type)) {
109+
close(fd);
110+
if (should_exit) break;
111+
continue;
112+
}
109113

110114
// send available stream types
111115
if (type == VisionStreamType::VISION_STREAM_MAX) {
@@ -184,7 +188,13 @@ void VisionIpcServer::send(VisionBuf * buf, VisionIpcBufExtra * extra, bool sync
184188

185189
VisionIpcServer::~VisionIpcServer(){
186190
should_exit = true;
187-
listener_thread.join();
191+
if (listener_thread.joinable()) {
192+
int sock = ipc_connect(get_ipc_path(name).c_str());
193+
if (sock >= 0) {
194+
close(sock);
195+
}
196+
listener_thread.join();
197+
}
188198

189199
// VisionBuf cleanup
190200
for (auto const& [type, buf] : buffers) {

0 commit comments

Comments
 (0)