Skip to content

Commit c1a7f28

Browse files
committed
Merge branch 'master' of https://github.com/openUC2/UC2-REST
2 parents 3d791c8 + b186099 commit c1a7f28

14 files changed

Lines changed: 501 additions & 113 deletions

conda/meta.yaml

Lines changed: 0 additions & 43 deletions
This file was deleted.

uc2rest/MockSerial.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import random
2+
from threading import Thread
3+
import time
4+
5+
class MockSerial:
6+
def __init__(self, port, baudrate, timeout=1):
7+
self.port = port
8+
self.baudrate = baudrate
9+
self.timeout = timeout
10+
self.is_open = False
11+
self.data_buffer = []
12+
self.thread = Thread(target=self._simulate_data)
13+
self.thread.daemon = True
14+
self.thread.start()
15+
self.is_open = True
16+
self.manufacturer = "UC2Mock"
17+
self.BAUDRATES = -1
18+
19+
def flush(self):
20+
pass
21+
22+
def isOpen(self):
23+
return self.is_open
24+
25+
def open(self):
26+
self.is_open = True
27+
28+
def close(self):
29+
self.is_open = False
30+
31+
def readline(self, timeout=1):
32+
if not self.is_open:
33+
raise Exception("Device not connected")
34+
if len(self.data_buffer) == 0:
35+
return b''
36+
data = self.data_buffer
37+
self.data_buffer = self.data_buffer
38+
time.sleep(.05)
39+
return bytes(data)
40+
41+
def read(self, num_bytes):
42+
if not self.is_open:
43+
raise Exception("Device not connected")
44+
if len(self.data_buffer) == 0:
45+
return b''
46+
data = self.data_buffer[:num_bytes]
47+
self.data_buffer = self.data_buffer[num_bytes:]
48+
return bytes(data)
49+
50+
def write(self, data):
51+
if not self.is_open:
52+
raise Exception("Device not connected")
53+
pass # Do nothing, as it's a mock
54+
55+
def _simulate_data(self):
56+
while self.is_open:
57+
if random.random() < 0.2: # Simulate occasional data availability
58+
self.data_buffer.extend([random.randint(0, 255) for _ in range(10)])
59+
time.sleep(0.1)
60+
61+
def __enter__(self):
62+
self.open()
63+
return self
64+
65+
def __exit__(self, exc_type, exc_val, exc_tb):
66+
self.close()
67+
68+
69+
if __name__ == '__main__':
70+
with MockSerial("COM1", 9600) as ser:
71+
while True:
72+
data = ser.readline()
73+
print(data)
74+
time.sleep(1)
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
#%%
2+
import uc2rest
3+
import numpy as np
4+
import time
5+
6+
port = "unknown"
7+
port = "/dev/cu.SLAB_USBtoUART"
8+
ESP32 = uc2rest.UC2Client(serialport=port, baudrate=500000, DEBUG=True)
9+
time.sleep(2)
10+
#ESP32.serial.sendMessage('{"task":"/home_act", "home": {"steppers": [{"stepperid":1, "timeout": 20000, "speed": 15000, "direction":1, "endposrelease":3000}]}}')
11+
# display random pattern
12+
for i in range(10):
13+
led_pattern = np.random.randint(0,55, (64,3))
14+
ESP32.led.send_LEDMatrix_array(led_pattern=led_pattern, getReturn=True, timeout=4)
15+
16+
#
17+
''' ################
18+
LED
19+
################'''
20+
# test LED
21+
ESP32.led.send_LEDMatrix_full(intensity=(255, 255, 255))
22+
ESP32.led.send_LEDMatrix_full(intensity=(0, 0, 0))
23+
24+
# single LED
25+
for iLED in range(5):
26+
# timeout = 0 means no timeout => mResult will be rubish!
27+
mResult = ESP32.led.send_LEDMatrix_single(indexled=iLED, intensity=(255, 255, 255))#, timeout=0.)
28+
mResult = ESP32.led.send_LEDMatrix_single(indexled=iLED, intensity=(0, 0, 0))#, timeout=0.)
29+
30+
# display random pattern
31+
for i in range(50):
32+
led_pattern = np.random.randint(0,55, (48,3))
33+
ESP32.led.send_LEDMatrix_array(led_pattern=led_pattern, getReturn=False)
34+
35+
ESP32.close()
36+

uc2rest/TEST/TEST_ESP32_Serial.py

Lines changed: 21 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -6,29 +6,30 @@
66
port = "unknown"
77
port = "/dev/cu.SLAB_USBtoUART"
88
#port = "COM3"
9-
ESP32 = uc2rest.UC2Client(serialport=port, baudrate=500000, DEBUG=True)
9+
print("start")
10+
ESP32 = uc2rest.UC2Client(serialport=port, baudrate=115200, DEBUG=True)
1011
#ESP32.serial.sendMessage('{"task":"/home_act", "home": {"steppers": [{"stepperid":1, "timeout": 20000, "speed": 15000, "direction":1, "endposrelease":3000}]}}')
1112

12-
#%% TEMPERATURE
13-
ESP32.temperature.start_temperature_polling()
14-
time.sleep(5)
15-
mTemperature = ESP32.temperature.get_temperature()
16-
print(mTemperature)
17-
18-
#
19-
20-
ESP32.home.home_x(speed =15000, direction = 1, endposrelease = 3000, timeout=20, isBlocking=True)
21-
ESP32.home.home_x(speed =15000, direction = -1, endposrelease = 3000, timeout=20, isBlocking=True)
2213

23-
24-
ESP32.motor.move_x(steps=10000, speed=10000, is_blocking=True)
14+
#%% TEMPERATURE
15+
if 0:
16+
ESP32.temperature.start_temperature_polling()
17+
time.sleep(5)
18+
mTemperature = ESP32.temperature.get_temperature()
19+
print(mTemperature)
2520

2621

2722
#%%
2823
''' ################
2924
HOME
3025
################'''
31-
ESP32.home.home_y(speed =15000, direction = 1, endposrelease = 3000, timeout=2, isBlocking=True)
26+
if 0:
27+
ESP32.home.home_y(speed =15000, direction = 1, endposrelease = 3000, timeout=2, isBlocking=True)
28+
ESP32.home.home_x(speed =15000, direction = 1, endposrelease = 3000, timeout=20, isBlocking=True)
29+
ESP32.home.home_x(speed =15000, direction = -1, endposrelease = 3000, timeout=20, isBlocking=True)
30+
31+
heapSize = ESP32.state.getHeap()
32+
print("Heap size: ", heapSize)
3233

3334
# setting debug output of the serial to true - all message will be printed
3435
ESP32.serial.DEBUG=True
@@ -40,33 +41,20 @@
4041
mState = ESP32.state.get_state()
4142

4243

43-
''' ################
44-
MODULES
45-
################'''
46-
#load modules from pyhton
47-
mModules = ESP32.modules.get_default_modules()
48-
assert mModules["home"] == 0 or mModules["home"] == 1, "Failed loading the default modules"
49-
print(mModules) #{'led': True, 'motor': True, 'home': True, 'analogin': False, 'pid': False, 'laser': True, 'dac': False, 'analogout': False, 'digitalout': False, 'digitalin': True, 'scanner': False, 'joy': False}
50-
51-
# load modules from device
52-
mModulesDevice = ESP32.modules.get_modules()
53-
#assert mModulesDevice["home"] == 0 or mModulesDevice["home"] == 1, "Failed loading the modules from the device"
54-
print(mModulesDevice) #{'led': True, 'motor': True, 'home': True, 'analogin': False, 'pid': False, 'laser': True, 'dac': False, 'analogout': False, 'digitalout': False, 'digitalin': True, 'scanner': False, 'joy': False}
55-
mModules['home']=1 # activate home module
56-
#%%
5744

5845

5946
ESP32.motor.move_x(steps=10000, speed=10000, is_blocking=True)
6047

6148
# {"task":"/ledarr_act", "led":{"LEDArrMode":1, "led_array":[{"id":0, "r":255, "g":255, "b":255}]}}
6249
mResult = ESP32.led.send_LEDMatrix_full(intensity=(255, 255, 255))
50+
print("Heap size: ", ESP32.state.getHeap())
6351
mResult = ESP32.led.send_LEDMatrix_full(intensity=(0, 0, 0), getReturn=False)
64-
52+
print("Heap size: ", ESP32.state.getHeap())
6553

6654
# check if we are connected
6755
# see if it's the right device
6856
mState = ESP32.state.get_state()
69-
assert mState["identifier_name"] == "UC2_Feather", "Wrong device connected"
57+
#assert mState["state"]["identifier_name"] == "UC2_Feather", "Wrong device connected"
7058

7159
#%%
7260
# test Motor
@@ -112,11 +100,12 @@
112100
################'''
113101
# test LED
114102
mResult = ESP32.led.send_LEDMatrix_full(intensity=(255, 255, 255))
115-
assert mResult["success"] == 1, "Failed sending LED command"
103+
assert mResult["idsuccess"] == 1, "Failed sending LED command"
116104
time.sleep(0.5)
105+
print("Heap size: ", ESP32.state.getHeap())
117106
mResult = ESP32.led.send_LEDMatrix_full(intensity=(0, 0, 0))
118107
assert mResult["success"] == 1, "Failed sending LED command"
119-
108+
print("Heap size: ", ESP32.state.getHeap())
120109
# single LED
121110
ESP32.setDebugging(False)
122111
for iLED in range(5):

uc2rest/TEST/TEST_PURE_SERIAL.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import serial
2+
import threading
3+
import time
4+
import json
5+
6+
# Configuration parameters
7+
port = "/dev/cu.wchusbserial110" # Adjust this to your device's serial port
8+
baudrate = 500000
9+
10+
# Open the serial port
11+
ser = serial.Serial(port, baudrate, timeout=1)
12+
13+
lock = threading.Lock()
14+
# Function to handle incoming messages
15+
def read_from_port(ser):
16+
while True:
17+
try:
18+
with lock:
19+
reading = ser.readline().decode('utf-8').rstrip()
20+
if reading:
21+
print(f"Received: {reading}")
22+
except Exception as e:
23+
print("Error reading from port")
24+
break
25+
# Start the thread for reading
26+
thread = threading.Thread(target=read_from_port, args=(ser,))
27+
thread.start()
28+
29+
iiter = 0
30+
try:
31+
print("Serial Terminal Running. Type 'exit' to quit.")
32+
while True:
33+
iiter+=1
34+
# Get input from the user
35+
input_data = {'task': '/ledarr_act', 'led': {'LEDArrMode': 0, 'led_array': [{'id': 0, 'r': 25, 'g': 42, 'b': 4}, {'id': 1, 'r': 52, 'g': 26, 'b': 11}, {'id': 2, 'r': 51, 'g': 10, 'b': 17}]}, 'qid': 1}
36+
if input_data == 'exit':
37+
break
38+
# Send data if the user typed something
39+
if input_data:
40+
with lock:
41+
ser.write(f"{json.dumps(input_data)}\n".encode('utf-8'))
42+
print(f"Sent: {iiter}")
43+
finally:
44+
ser.close()
45+
print("Serial Terminal Closed.")

0 commit comments

Comments
 (0)