1- import random
21import unittest
32from typing import Optional
43import numpy as np
54from msgq .visionipc import VisionIpcServer , VisionIpcClient , VisionStreamType
65
76
87class TestVisionIpc (unittest .TestCase ):
9- server : VisionIpcServer
8+ server : Optional [ VisionIpcServer ]
109 client : Optional [VisionIpcClient ]
1110
11+ def setUp (self ):
12+ self .server = None
13+ self .client = None
14+
15+ def tearDown (self ):
16+ self .client = None
17+ self .server = None
18+
1219 def setup_vipc (self , name , * stream_types , num_buffers = 1 , width = 100 , height = 100 , conflate = False ):
1320 self .server = VisionIpcServer (name )
1421 for stream_type in stream_types :
@@ -26,18 +33,13 @@ def setup_vipc(self, name, *stream_types, num_buffers=1, width=100, height=100,
2633 def test_connect (self ):
2734 self .setup_vipc ("camerad" , VisionStreamType .VISION_STREAM_ROAD )
2835 assert self .client is not None
29- assert self .client .is_connected
30- del self .client
31- del self .server
36+ assert self .client .is_connected ()
3237
3338 def test_available_streams (self ):
34- for k in range (4 ):
35- stream_types = set (random .choices ([x .value for x in VisionStreamType ], k = k ))
36- self .setup_vipc ("camerad" , * stream_types )
37- available_streams = VisionIpcClient .available_streams ("camerad" , True )
38- assert available_streams == stream_types
39- del self .client
40- del self .server
39+ stream_types = (VisionStreamType .VISION_STREAM_ROAD , VisionStreamType .VISION_STREAM_WIDE_ROAD )
40+ self .setup_vipc ("camerad" , * stream_types )
41+ available_streams = VisionIpcClient .available_streams ("camerad" , True )
42+ assert available_streams == {stream .value for stream in stream_types }
4143
4244 def test_buffers (self ):
4345 width , height , num_buffers = 100 , 200 , 5
@@ -47,26 +49,25 @@ def test_buffers(self):
4749 assert self .client .height == height
4850 assert self .client .buffer_len is not None and self .client .buffer_len > 0
4951 assert self .client .num_buffers == num_buffers
50- del self .client
51- del self .server
5252
5353 def test_send_single_buffer (self ):
5454 self .setup_vipc ("camerad" , VisionStreamType .VISION_STREAM_ROAD )
55+ assert self .server is not None
5556 assert self .client is not None
5657 assert self .client .buffer_len is not None
5758 buf = np .zeros (self .client .buffer_len , dtype = np .uint8 )
58- buf .view ('<i4 ' )[0 ] = 1234
59+ buf .view ('<u8 ' )[0 ] = 1234
5960 self .server .send (VisionStreamType .VISION_STREAM_ROAD , buf , frame_id = 1337 )
6061
6162 recv_buf = self .client .recv ()
6263 assert recv_buf is not None
63- assert recv_buf .data .view ('<i4 ' )[0 ] == 1234
64+ assert recv_buf .data .view ('<u8 ' )[0 ] == 1234
6465 assert self .client .frame_id == 1337
65- del self .client
66- del self .server
66+ assert recv_buf .frame_id == 1337
6767
6868 def test_no_conflate (self ):
6969 self .setup_vipc ("camerad" , VisionStreamType .VISION_STREAM_ROAD )
70+ assert self .server is not None
7071 assert self .client is not None
7172 assert self .client .buffer_len is not None
7273 buf = np .zeros (self .client .buffer_len , dtype = np .uint8 )
@@ -80,11 +81,10 @@ def test_no_conflate(self):
8081 recv_buf = self .client .recv ()
8182 assert recv_buf is not None
8283 assert self .client .frame_id == 2
83- del self .client
84- del self .server
8584
8685 def test_conflate (self ):
8786 self .setup_vipc ("camerad" , VisionStreamType .VISION_STREAM_ROAD , conflate = True )
87+ assert self .server is not None
8888 assert self .client is not None
8989 assert self .client .buffer_len is not None
9090 buf = np .zeros (self .client .buffer_len , dtype = np .uint8 )
@@ -97,5 +97,3 @@ def test_conflate(self):
9797
9898 recv_buf = self .client .recv ()
9999 assert recv_buf is None
100- del self .client
101- del self .server
0 commit comments