-
Notifications
You must be signed in to change notification settings - Fork 240
Expand file tree
/
Copy pathmock_handler.py
More file actions
111 lines (88 loc) · 3.55 KB
/
mock_handler.py
File metadata and controls
111 lines (88 loc) · 3.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
"""
MockHandler for PSLab.
This module provides a software simulation of the PSLab hardware interface.
It allows developers to test signal processing and communication logic
without physical hardware by injecting custom response data.
"""
import logging
import struct
from typing import Dict, Optional, Union
class MockHandler:
"""
A mock communication handler that simulates the PSLab hardware.
"""
def __init__(self, port: Optional[str] = None):
self.connected = False
self.port = port or "MOCK_PORT"
self.response_map: Dict[bytes, bytes] = {}
self.read_buffer = bytearray()
self.logger = logging.getLogger(__name__)
def open(self, port: Optional[str] = None) -> None:
self.connected = True
self.logger.info(f"MockHandler connected on {self.port}")
def close(self) -> None:
self.connected = False
self.read_buffer.clear()
self.logger.info("MockHandler disconnected.")
def write(self, data: bytes) -> int:
if not self.connected:
raise RuntimeError("Attempted to write to closed MockHandler.")
# Check for registered responses
match_found = False
for command, response in self.response_map.items():
if data.startswith(command):
self.read_buffer.extend(response)
match_found = True
break
if not match_found:
self.logger.debug(f"Write: {data.hex()} (No specific response mapped)")
return len(data)
def read(self, size: int = 1) -> bytes:
if not self.connected:
raise RuntimeError("Attempted to read from closed MockHandler.")
if len(self.read_buffer) < size:
self.logger.warning(f"MockHandler: Requested {size} bytes, but only {len(self.read_buffer)} available.")
# Return what we have, then empty bytes
data = self.read_buffer[:]
self.read_buffer.clear()
return bytes(data)
data = self.read_buffer[:size]
self.read_buffer = self.read_buffer[size:]
return bytes(data)
def clear_buffer(self) -> None:
self.read_buffer.clear()
# --- Interface Compatibility Methods ---
def send_byte(self, val: Union[int, bytes]) -> None:
"""
Sends a single byte to the device.
Handles both integer (0-255) and single-byte bytes objects.
"""
if isinstance(val, int):
self.write(bytes([val]))
elif isinstance(val, bytes):
self.write(val)
else:
raise TypeError(f"send_byte expects int or bytes, got {type(val)}")
def send_int(self, val: int) -> None:
"""Sends a 16-bit integer to the device."""
self.write(struct.pack('<H', val)) # Little-endian unsigned short
def read_byte(self) -> int:
"""Reads a single byte and returns it as an integer."""
data = self.read(1)
if not data:
return 0
return data[0]
def read_int(self) -> int:
"""Reads two bytes and interprets them as a 16-bit integer."""
data = self.read(2)
if len(data) < 2:
return 0
return struct.unpack('<H', data)[0]
def get_ack(self) -> bool:
"""Simulates receiving an acknowledgement from the device."""
return True
# --- Developer Control Methods ---
def register_response(self, command: bytes, response: bytes) -> None:
self.response_map[command] = response
def inject_data(self, data: bytes) -> None:
self.read_buffer.extend(data)