1+ import struct
12import unittest
23from typing import Optional
3- import numpy as np
44from msgq .visionipc import VisionIpcServer , VisionIpcClient , VisionStreamType
55
66
@@ -55,13 +55,17 @@ def test_send_single_buffer(self):
5555 assert self .server is not None
5656 assert self .client is not None
5757 assert self .client .buffer_len is not None
58- buf = np . zeros (self .client .buffer_len , dtype = np . uint8 )
59- buf . view ( '<u8' )[ 0 ] = 1234
58+ buf = bytearray (self .client .buffer_len )
59+ struct . pack_into ( "<Q" , buf , 0 , 1234 )
6060 self .server .send (VisionStreamType .VISION_STREAM_ROAD , buf , frame_id = 1337 )
6161
6262 recv_buf = self .client .recv ()
6363 assert recv_buf is not None
64- assert recv_buf .data .view ('<u8' )[0 ] == 1234
64+ data = recv_buf .data
65+ assert isinstance (data , memoryview )
66+ assert struct .unpack_from ("<Q" , data , 0 )[0 ] == 1234
67+ assert len (data ) == self .client .buffer_len
68+ assert data [8 :].nbytes == self .client .buffer_len - 8
6569 assert self .client .frame_id == 1337
6670 assert recv_buf .frame_id == 1337
6771
@@ -70,7 +74,7 @@ def test_no_conflate(self):
7074 assert self .server is not None
7175 assert self .client is not None
7276 assert self .client .buffer_len is not None
73- buf = np . zeros (self .client .buffer_len , dtype = np . uint8 )
77+ buf = bytearray (self .client .buffer_len )
7478 self .server .send (VisionStreamType .VISION_STREAM_ROAD , buf , frame_id = 1 )
7579 self .server .send (VisionStreamType .VISION_STREAM_ROAD , buf , frame_id = 2 )
7680
@@ -87,7 +91,7 @@ def test_conflate(self):
8791 assert self .server is not None
8892 assert self .client is not None
8993 assert self .client .buffer_len is not None
90- buf = np . zeros (self .client .buffer_len , dtype = np . uint8 )
94+ buf = bytearray (self .client .buffer_len )
9195 self .server .send (VisionStreamType .VISION_STREAM_ROAD , buf , frame_id = 1 )
9296 self .server .send (VisionStreamType .VISION_STREAM_ROAD , buf , frame_id = 2 )
9397
0 commit comments