Skip to content

Commit f8d4579

Browse files
authored
Add missing command methods to provide complete interface to all firmware commands (#47)
* Add missing command methods to provide complete interface to all firmware commands - I2C commands: i2c_reset, i2c_read_no_register_uint8, i2c_write_no_register_uint8, i2c_read_payload_uint16, i2c_begin_transaction, i2c_write, i2c_end_transaction, i2c_buffer_size - I2C_1 commands: i2c_1_reset, i2c_1_read_no_register_uint8, i2c_1_write_no_register_uint8, i2c_1_read_payload_uint16, i2c_1_begin_transaction, i2c_1_write, i2c_1_end_transaction, i2c_1_buffer_size - SPI commands: spi_buffer_size, spi_set_clock_divider - Register commands: register_read_uint8, register_write_uint8, register_read_uint32, register_write_uint32 - Startup/Demo commands: post_serial_startup_commands_available, read_post_serial_startup_command - Utility commands: sleep_seconds - Info commands: info, reboot, serialnumber, license * Add i2c_write_bulk and i2c_1_write_bulk methods for large data transfers - Implement chunked writing with automatic transaction management - Support writing up to 8k bytes by automatically splitting into buffer-sized chunks - Use try/finally to ensure proper transaction cleanup - Add comprehensive docstrings explaining the functionality * Update the HISTORY.md to reflect the actual date * Add nop command and comprehensive test suite - Add nop_func to commandconstants.hpp and main.cpp - Add nop method to TeensyToAny Python class - Create comprehensive test suite with pytest - Add test for oversized input handling (>2k bytes) - Test buffer size limits and normal command functionality - Update HISTORY.md with nop command addition * Simplify tests and add pytest hardware markers - Replace complex mock-based tests with simple hardware tests - Add @pytest.mark.hardware to all hardware-dependent tests - Add pytest.ini and pyproject.toml configuration to skip hardware tests by default - Tests are now skipped unless explicitly run with '-m hardware' * Fix pylint issues in test file - Fix import order: pytest before local imports - Add pylint disable comments for protected access to _ask method - Achieve 10.00/10 pylint score for test file
1 parent d5df74c commit f8d4579

6 files changed

Lines changed: 301 additions & 2 deletions

File tree

.gitattributes

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
11
teensytoany/_version.py export-subst
2+
# SCM syntax highlighting & preventing 3-way merges
3+
pixi.lock merge=binary linguist-language=YAML linguist-generated=true

.gitignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,3 +100,8 @@ ENV/
100100

101101
# mypy
102102
.mypy_cache/
103+
# pixi environments
104+
.pixi/*
105+
!.pixi/config.toml
106+
pixi.toml
107+
pixi.lock

HISTORY.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,24 @@
11
# History
22

3+
## 0.12.0 (2025-08-21)
4+
5+
* Add missing command methods to provide complete interface to all firmware commands:
6+
- I2C commands: `i2c_reset`, `i2c_read_no_register_uint8`, `i2c_write_no_register_uint8`,
7+
`i2c_read_payload_uint16`, `i2c_begin_transaction`, `i2c_write`, `i2c_end_transaction`,
8+
`i2c_buffer_size`, `i2c_write_bulk`
9+
- I2C_1 commands: `i2c_1_reset`, `i2c_1_read_no_register_uint8`, `i2c_1_write_no_register_uint8`,
10+
`i2c_1_read_payload_uint16`, `i2c_1_begin_transaction`, `i2c_1_write`, `i2c_1_end_transaction`,
11+
`i2c_1_buffer_size`, `i2c_1_write_bulk`
12+
- SPI commands: `spi_buffer_size`, `spi_set_clock_divider`
13+
- Register commands: `register_read_uint8`, `register_write_uint8`, `register_read_uint32`,
14+
`register_write_uint32`
15+
- Startup/Demo commands: `post_serial_startup_commands_available`, `read_post_serial_startup_command`
16+
- Utility commands: `sleep_seconds`
17+
- Info commands: `info`, `reboot`, `serialnumber`, `license`, `nop`
18+
* Add `i2c_write_bulk` and `i2c_1_write_bulk` methods for writing up to 8k bytes with automatic
19+
chunking and transaction management
20+
* Add `nop` command for testing purposes and comprehensive test suite
21+
322
## 0.11.1 (2025-07-18)
423

524
* Provide an ability to define the device name at initialization time.

pytest.ini

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
[pytest]
2+
markers =
3+
hardware: marks tests as requiring hardware (deselect with '-m "not hardware"')
4+
addopts = -m "not hardware"

teensytoany/teensytoany.py

Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -604,6 +604,77 @@ def i2c_ping(self, address: int):
604604
cmd = f"i2c_ping 0x{address:02x}"
605605
self._ask(cmd)
606606

607+
def i2c_reset(self):
608+
"""Reset the I2C PORT in case of lockup."""
609+
self._ask("i2c_reset")
610+
611+
def i2c_read_no_register_uint8(self, address: int):
612+
"""Read a uint8_t from the I2C bus without specifying a register address."""
613+
cmd = f"i2c_read_no_register_uint8 0x{address:02x}"
614+
returned = self._ask(cmd)
615+
return int(returned, base=0)
616+
617+
def i2c_write_no_register_uint8(self, address: int, data: int):
618+
"""Write a uint8_t to the I2C bus without specifying a register address."""
619+
data = data & 0xFF
620+
cmd = f"i2c_write_no_register_uint8 0x{address:02x} 0x{data:02x}"
621+
self._ask(cmd)
622+
623+
def i2c_read_payload_uint16(self, address: int, register_address: int,
624+
num_bytes: int) -> Sequence:
625+
"""Read up to 256 bytes from the I2C bus starting at a specified 16 bit register address."""
626+
cmd = f"i2c_read_payload_uint16 0x{address:02x} 0x{register_address:04x} {num_bytes}"
627+
returned = self._ask(cmd)
628+
register_data = [int(val, base=0) for val in returned.split()]
629+
return register_data
630+
631+
def i2c_begin_transaction(self, address: int):
632+
"""Begin a transaction with the I2C device."""
633+
cmd = f"i2c_begin_transaction 0x{address:02x}"
634+
self._ask(cmd)
635+
636+
def i2c_write(self, data: Sequence):
637+
"""Write data to the I2C device."""
638+
data_str = ' '.join([f"{val}" for val in data])
639+
cmd = f"i2c_write {data_str}"
640+
self._ask(cmd)
641+
642+
def i2c_end_transaction(self, stop: bool = True):
643+
"""End a transaction with the I2C device."""
644+
cmd = f"i2c_end_transaction {str(stop).lower()}"
645+
self._ask(cmd)
646+
647+
def i2c_write_bulk(self, address: int, data: Sequence):
648+
"""Write large amounts of data to the I2C device in chunks.
649+
650+
This method automatically handles chunking data to fit within the I2C buffer
651+
size and manages the transaction lifecycle.
652+
653+
Parameters
654+
----------
655+
address: int
656+
I2C device address
657+
data: Sequence
658+
Data to write (up to 8k bytes)
659+
"""
660+
if len(data) > 8192:
661+
raise ValueError("Data size exceeds maximum of 8192 bytes")
662+
663+
buffer_size = self.i2c_buffer_size()
664+
self.i2c_begin_transaction(address)
665+
666+
try:
667+
for i in range(0, len(data), buffer_size):
668+
chunk = data[i:i + buffer_size]
669+
self.i2c_write(chunk)
670+
finally:
671+
self.i2c_end_transaction()
672+
673+
def i2c_buffer_size(self):
674+
"""Get the maximum I2C buffer size for this board."""
675+
returned = self._ask("i2c_buffer_size")
676+
return int(returned, base=0)
677+
607678
def i2c_1_init(self, baud_rate: int=100_100, timeout=200_000, register_space=1):
608679
cmd = f"i2c_1_init {baud_rate:d} {timeout:d} {register_space:d}"
609680
self._ask(cmd)
@@ -701,6 +772,77 @@ def i2c_1_ping(self, address: int):
701772
cmd = f"i2c_1_ping 0x{address:02x}"
702773
self._ask(cmd)
703774

775+
def i2c_1_reset(self):
776+
"""Reset the I2C_1 PORT in case of lockup."""
777+
self._ask("i2c_1_reset")
778+
779+
def i2c_1_read_no_register_uint8(self, address: int):
780+
"""Read a uint8_t from the I2C_1 bus without specifying a register address."""
781+
cmd = f"i2c_1_read_no_register_uint8 0x{address:02x}"
782+
returned = self._ask(cmd)
783+
return int(returned, base=0)
784+
785+
def i2c_1_write_no_register_uint8(self, address: int, data: int):
786+
"""Write a uint8_t to the I2C_1 bus without specifying a register address."""
787+
data = data & 0xFF
788+
cmd = f"i2c_1_write_no_register_uint8 0x{address:02x} 0x{data:02x}"
789+
self._ask(cmd)
790+
791+
def i2c_1_read_payload_uint16(self, address: int, register_address: int,
792+
num_bytes: int) -> Sequence:
793+
"""Read up to 256 bytes from the I2C_1 bus at a specified 16 bit register address."""
794+
cmd = f"i2c_1_read_payload_uint16 0x{address:02x} 0x{register_address:04x} {num_bytes}"
795+
returned = self._ask(cmd)
796+
register_data = [int(val, base=0) for val in returned.split()]
797+
return register_data
798+
799+
def i2c_1_begin_transaction(self, address: int):
800+
"""Begin a transaction with the I2C_1 device."""
801+
cmd = f"i2c_1_begin_transaction 0x{address:02x}"
802+
self._ask(cmd)
803+
804+
def i2c_1_write(self, data: Sequence):
805+
"""Write data to the I2C_1 device."""
806+
data_str = ' '.join([f"{val}" for val in data])
807+
cmd = f"i2c_1_write {data_str}"
808+
self._ask(cmd)
809+
810+
def i2c_1_end_transaction(self, stop: bool = True):
811+
"""End a transaction with the I2C_1 device."""
812+
cmd = f"i2c_1_end_transaction {str(stop).lower()}"
813+
self._ask(cmd)
814+
815+
def i2c_1_write_bulk(self, address: int, data: Sequence):
816+
"""Write large amounts of data to the I2C_1 device in chunks.
817+
818+
This method automatically handles chunking data to fit within the I2C_1 buffer
819+
size and manages the transaction lifecycle.
820+
821+
Parameters
822+
----------
823+
address: int
824+
I2C_1 device address
825+
data: Sequence
826+
Data to write (up to 8k bytes)
827+
"""
828+
if len(data) > 8192:
829+
raise ValueError("Data size exceeds maximum of 8192 bytes")
830+
831+
buffer_size = self.i2c_1_buffer_size()
832+
self.i2c_1_begin_transaction(address)
833+
834+
try:
835+
for i in range(0, len(data), buffer_size):
836+
chunk = data[i:i + buffer_size]
837+
self.i2c_1_write(chunk)
838+
finally:
839+
self.i2c_1_end_transaction()
840+
841+
def i2c_1_buffer_size(self):
842+
"""Get the maximum I2C_1 buffer size for this board."""
843+
returned = self._ask("i2c_1_buffer_size")
844+
return int(returned, base=0)
845+
704846
def gpio_digital_write(self, pin, value):
705847
"""Call the ardunio DigitalWrite function.
706848
@@ -821,6 +963,64 @@ def register_read_uint16(self, register_address):
821963
returned = self._ask(f"register_read_uint16 {register_address}")
822964
return int(returned, base=0)
823965

966+
def register_read_uint8(self, register_address):
967+
"""Read value directly from a teensy register.
968+
969+
Parameters
970+
----------
971+
register_address: int
972+
Register address to read from.
973+
974+
Returns
975+
-------
976+
value: int
977+
Read value. Will be from 0 to 255.
978+
979+
"""
980+
returned = self._ask(f"register_read_uint8 {register_address}")
981+
return int(returned, base=0)
982+
983+
def register_write_uint8(self, register_address, value):
984+
"""Write value directly to teensy register.
985+
986+
Parameters
987+
----------
988+
register_address: int
989+
Register address to write to.
990+
value: int
991+
Value to write to address. Should be between 0 and 255.
992+
"""
993+
self._ask(f"register_write_uint8 {register_address} {value}")
994+
995+
def register_read_uint32(self, register_address):
996+
"""Read value directly from a teensy register.
997+
998+
Parameters
999+
----------
1000+
register_address: int
1001+
Register address to read from.
1002+
1003+
Returns
1004+
-------
1005+
value: int
1006+
Read value. Will be from 0 to 4294967295.
1007+
1008+
"""
1009+
returned = self._ask(f"register_read_uint32 {register_address}")
1010+
return int(returned, base=0)
1011+
1012+
def register_write_uint32(self, register_address, value):
1013+
"""Write value directly to teensy register.
1014+
1015+
Parameters
1016+
----------
1017+
register_address: int
1018+
Register address to write to.
1019+
value: int
1020+
Value to write to address. Should be between 0 and 4294967295.
1021+
"""
1022+
self._ask(f"register_write_uint32 {register_address} {value}")
1023+
8241024
@property
8251025
def version(self):
8261026
return self._version
@@ -915,6 +1115,15 @@ def spi_read_byte(self, data):
9151115
value = self._ask(f"spi_read_byte {data}")
9161116
return int(value, base=0)
9171117

1118+
def spi_buffer_size(self):
1119+
"""Get the maximum SPI buffer size for this board."""
1120+
returned = self._ask("spi_buffer_size")
1121+
return int(returned, base=0)
1122+
1123+
def spi_set_clock_divider(self, divider: int):
1124+
"""Set the SPI clock divider."""
1125+
self._ask(f"spi_set_clock_divider {divider}")
1126+
9181127
def analog_write_frequency(self, pin: int, frequency: int):
9191128
frequency = int(frequency)
9201129
self._ask(f"analog_write_frequency {pin} {frequency}")
@@ -1051,6 +1260,20 @@ def disable_demo_commands(self):
10511260
"""Disable the demo commands on startup."""
10521261
self._ask("disable_demo_commands")
10531262

1263+
def post_serial_startup_commands_available(self):
1264+
"""Return the number of post serial startup commands available."""
1265+
returned = self._ask("post_serial_startup_commands_available")
1266+
return int(returned, base=0)
1267+
1268+
def read_post_serial_startup_command(self, index):
1269+
"""Read the post serial startup command at the specified index."""
1270+
returned = self._ask(f"read_post_serial_startup_command {index}")
1271+
return returned
1272+
1273+
def sleep_seconds(self, duration: float):
1274+
"""Sleep (and block) for the desired duration in seconds."""
1275+
self._ask(f"sleep {duration}")
1276+
10541277
def fastled_add_leds(self, led_class, has_white, pin, n_leds):
10551278
has_white = int(bool(has_white))
10561279
self._ask(f"fastled_add_leds {led_class} {has_white} {pin} {n_leds}")
@@ -1076,3 +1299,23 @@ def fastled_set_hsv(self, led_index, h, s, v):
10761299

10771300
def fastled_set_hue(self, led_index, hue):
10781301
self._ask(f"fastled_set_hue {led_index} {hue}")
1302+
1303+
def info(self):
1304+
"""Displays information about this TeensyToAny device."""
1305+
return self._ask("info")
1306+
1307+
def reboot(self):
1308+
"""Runs setup routine again, for this device."""
1309+
self._ask("reboot")
1310+
1311+
def serialnumber(self):
1312+
"""Displays the serial number of the board."""
1313+
return self._ask("serialnumber")
1314+
1315+
def license(self):
1316+
"""Display the license information for the source code running on the teensy."""
1317+
return self._ask("license")
1318+
1319+
def nop(self):
1320+
"""No operation (does nothing)."""
1321+
self._ask("nop")
Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,34 @@
1-
import teensytoany
1+
import pytest
22

3-
# import pytest
3+
import teensytoany
4+
from teensytoany import TeensyToAny
45

56

67
def test_project_import():
78
assert teensytoany.__version__
89
assert teensytoany.TeensyToAny
10+
11+
12+
@pytest.mark.hardware
13+
def test_nop():
14+
with TeensyToAny() as t:
15+
t.nop()
16+
17+
18+
@pytest.mark.hardware
19+
@pytest.mark.parametrize("i", range(256, 2048, 256))
20+
def test_nop_buffer_size(i):
21+
# We shouldn't fail with up to 2048 bytes of input
22+
with TeensyToAny() as t:
23+
# pylint: disable=protected-access
24+
t._ask("nop" + " " * (2048 - len("nop\n") - i))
25+
26+
27+
@pytest.mark.hardware
28+
@pytest.mark.parametrize("i", range(2048, 8096 + 1, 2048))
29+
def test_nop_buffer_size_fail(i):
30+
with TeensyToAny() as t:
31+
# We should fail with more than 2048 bytes of input
32+
with pytest.raises(Exception):
33+
# pylint: disable=protected-access
34+
t._ask("nop" + " " * (i + 1 - len("nop\n")))

0 commit comments

Comments
 (0)