Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 182 additions & 17 deletions dronecan/driver/slcan.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def __init__(self, device, baudrate, bitrate):
self.bitrate = bitrate
self.timeout = 0
self.lock = threading.RLock()
self.supports_ack_nack = None # Will be set during first init

def retry(self):
logger.info("Reopening %s at %u" % (self.device, self.baudrate))
Expand All @@ -102,7 +103,9 @@ def retry(self):
try:
self.conn = serial.Serial(self.device, self.baudrate)
self.conn.timeout = self.timeout
_init_adapter(self.conn, self.bitrate)
# Use stored supports_ack_nack value if available
supports_ack_nack = getattr(self, 'supports_ack_nack', None)
self.supports_ack_nack = _init_adapter(self.conn, self.bitrate, supports_ack_nack)
logger.info("Reopen OK")
except Exception as ex:
logger.info("Reopen failed", ex)
Expand Down Expand Up @@ -205,12 +208,13 @@ class RxWorker:
SELECT_TIMEOUT = 0.1
READ_BUFFER_SIZE = 1024 * 8 # Arbitrary large number

def __init__(self, conn, rx_queue, ts_estimator_mono, ts_estimator_real, termination_condition):
def __init__(self, conn, rx_queue, ts_estimator_mono, ts_estimator_real, termination_condition, supports_ack_nack=True):
self._conn = conn
self._output_queue = rx_queue
self._ts_estimator_mono = ts_estimator_mono
self._ts_estimator_real = ts_estimator_real
self._termination_condition = termination_condition
self._supports_ack_nack = supports_ack_nack

if RUNNING_ON_WINDOWS:
# select() doesn't work on serial ports under Windows, so we have to resort to workarounds. :(
Expand All @@ -233,12 +237,89 @@ def _read_port(self):
data = self._conn.read(self.READ_BUFFER_SIZE)
return data, ts_mono, ts_real

def _parse_slcan_messages_by_length(self, data):
"""
Parse SLCAN messages by length instead of relying on ACK terminators.
Returns a list of complete messages and any remaining partial data.
"""
messages = []
pos = 0

while pos < len(data):
# Need at least 5 characters for minimum message: t000r (type + id + dlc)
if pos + 5 > len(data):
break

# Check message type and get ID length
msg_type = data[pos:pos+1]
if msg_type == b'T':
id_len = 8
elif msg_type == b't':
id_len = 3
elif msg_type == b'D':
id_len = 8
else:
# Skip unknown byte
pos += 1
continue

# Need type + id + dlc
header_len = 1 + id_len + 1
if pos + header_len > len(data):
break

# Extract DLC and calculate data length
try:
dlc_char = data[pos + 1 + id_len:pos + 1 + id_len + 1]
if len(dlc_char) != 1:
pos += 1
continue
dlc = int(chr(dlc_char[0]), 16)
data_len = CANFrame.dlc_to_datalength(dlc)
except (ValueError, IndexError):
pos += 1
continue

# Calculate expected message length
# Format: <type><id><dlc><data>[timestamp]
# Timestamp is optional 4-char hex field
min_msg_len = header_len + (data_len * 2)
max_msg_len = min_msg_len + 4 # With timestamp

# Check if we have enough data for the minimum message
if pos + min_msg_len > len(data):
break

# Try to find the actual message end
# Look for the next message start or end of data
msg_end = pos + min_msg_len

# Check if there's a timestamp (4 hex chars)
if msg_end + 4 <= len(data):
# Check if next 4 characters look like hex timestamp
potential_ts = data[msg_end:msg_end + 4]
try:
int(potential_ts, 16)
msg_end += 4
except (ValueError, TypeError):
pass

# Extract the message
message = data[pos:msg_end]
messages.append(message)
pos = msg_end

return messages, data[pos:]

def _process_slcan_line(self, line, local_ts_mono, local_ts_real):
line = line.strip().strip(NACK).strip(CLI_END_OF_TEXT)
line_len = len(line)

if line_len < 1:
return

# Only log problematic lines for debugging
# logger.debug('SLCAN line: %r (len=%d)', line, line_len)

canfd = False
# Checking the header, ignore all irrelevant lines
Expand Down Expand Up @@ -280,6 +361,17 @@ def _process_slcan_line(self, line, local_ts_mono, local_ts_real):
ts_real = local_ts_real

frame = CANFrame(packet_id, packet_data, (id_len == 8), ts_monotonic=ts_mono, ts_real=ts_real, canfd=canfd)

# Optional detailed frame logging (uncomment for debugging)
# if packet_data:
# tail_byte = packet_data[-1] if packet_data else 0
# start_bit = bool(tail_byte & 0x80)
# end_bit = bool(tail_byte & 0x40)
# toggle_bit = bool(tail_byte & 0x20)
# transfer_id = tail_byte & 0x1F
# logger.debug('SLCAN RX: ID=%08X, len=%d, tail=%02X, start=%s, end=%s, toggle=%s, tid=%d',
# packet_id, len(packet_data), tail_byte, start_bit, end_bit, toggle_bit, transfer_id)

self._output_queue.put_nowait(frame)

def _process_many_slcan_lines(self, lines, ts_mono, ts_real):
Expand Down Expand Up @@ -323,8 +415,13 @@ def run(self):

# Processing in normal mode if there's no outstanding command; using much slower CLI mode otherwise
if outstanding_command is None:
slcan_lines = data.split(ACK)
slcan_lines, data = slcan_lines[:-1], slcan_lines[-1]
if self._supports_ack_nack:
# Traditional ACK-based splitting
slcan_lines = data.split(ACK)
slcan_lines, data = slcan_lines[:-1], slcan_lines[-1]
else:
# Length-based message parsing for adapters without ACK/NACK
slcan_lines, data = self._parse_slcan_messages_by_length(data)

self._process_many_slcan_lines(slcan_lines, ts_mono=ts_mono, ts_real=ts_real)

Expand All @@ -336,8 +433,14 @@ def run(self):

# Processing the mix of SLCAN and CLI lines
for ln in split_lines:
tmp = ln.split(ACK)
slcan_lines, cli_line = tmp[:-1], tmp[-1]
if self._supports_ack_nack:
tmp = ln.split(ACK)
slcan_lines, cli_line = tmp[:-1], tmp[-1]
else:
# For adapters without ACK/NACK, we need to separate SLCAN from CLI differently
# This is a more complex case that may need refinement
slcan_lines, remaining = self._parse_slcan_messages_by_length(ln)
cli_line = remaining

self._process_many_slcan_lines(slcan_lines, ts_mono=ts_mono, ts_real=ts_real)

Expand Down Expand Up @@ -375,8 +478,12 @@ def run(self):
# there is no reason not to process SLCAN ones immediately.
# The last byte could be beginning of an \r\n sequence, so it's excluded from parsing.
data, last_byte = data[:-1], data[-1:]
slcan_lines = data.split(ACK)
slcan_lines, data = slcan_lines[:-1], slcan_lines[-1] + last_byte
if self._supports_ack_nack:
slcan_lines = data.split(ACK)
slcan_lines, data = slcan_lines[:-1], slcan_lines[-1] + last_byte
else:
slcan_lines, data = self._parse_slcan_messages_by_length(data)
data = data + last_byte

self._process_many_slcan_lines(slcan_lines, ts_mono=ts_mono, ts_real=ts_real)

Expand All @@ -401,11 +508,12 @@ def run(self):
class TxWorker:
QUEUE_BLOCK_TIMEOUT = 0.1

def __init__(self, conn, rx_queue, tx_queue, termination_condition):
def __init__(self, conn, rx_queue, tx_queue, termination_condition, supports_ack_nack=True):
self._conn = conn
self._rx_queue = rx_queue
self._tx_queue = tx_queue
self._termination_condition = termination_condition
self._supports_ack_nack = supports_ack_nack

def _send_frame(self, frame):
marker = 'D' if frame.canfd else 'T'
Expand All @@ -414,6 +522,7 @@ def _send_frame(self, frame):
dlc_len,
binascii.b2a_hex(frame.data).decode('ascii'))

# Rely on natural backpressure from serial connection
self._conn.write(line.encode('ascii'))
self._conn.flush()

Expand Down Expand Up @@ -464,8 +573,51 @@ def _raise_self_process_priority():
os.nice(IO_PROCESS_NICENESS_INCREMENT)


def _init_adapter(conn, bitrate):
def _detect_ack_nack_support(conn):
"""
Detect if the adapter supports ACK/NACK responses by sending an invalid 'X' command.
If no NACK (\x07) is received, assume the adapter doesn't support ACK/NACK.
"""
logger.info('Init: Detecting ACK/NACK support...')

# Flush any existing input
conn.flushInput()

# Send invalid 'X' command
conn.write(b'X\r')
conn.flush()

# Wait for response with timeout
conn.timeout = ACK_TIMEOUT
start_time = time.monotonic()

while time.monotonic() - start_time < ACK_TIMEOUT:
b = conn.read(1)
if not b:
continue
if b == NACK:
logger.info('Init: Adapter supports ACK/NACK (received NACK for invalid command)')
return True
if b == ACK:
logger.info('Init: Adapter supports ACK/NACK (received ACK for invalid command)')
return True
# Ignore other bytes

logger.info('Init: Adapter does not support ACK/NACK (no response to invalid command)')
return False


def _init_adapter(conn, bitrate, supports_ack_nack=None):
# Auto-detect ACK/NACK support by sending invalid 'X' command if not already known
if supports_ack_nack is None:
supports_ack_nack = _detect_ack_nack_support(conn)
# Store the result if this is a RetrySerial instance
if hasattr(conn, 'supports_ack_nack'):
conn.supports_ack_nack = supports_ack_nack
def wait_for_ack():
if not supports_ack_nack:
# Adapter doesn't support ACK/NACK, skip wait and rely on natural backpressure
return
logger.info('Init: Waiting for ACK...')
conn.timeout = ACK_TIMEOUT
while True:
Expand All @@ -481,6 +633,7 @@ def wait_for_ack():
def send_command(cmd):
logger.info('Init: Sending command %r', cmd)
conn.write(cmd + b'\r')
conn.flush()

speed_code = {
1000000: 8,
Expand All @@ -502,7 +655,9 @@ def send_command(cmd):
try:
wait_for_ack()
except DriverError:
pass
if supports_ack_nack:
# Only log error if we expect ACK/NACK support
logger.warning('Init: Failed to get ACK for empty command')
time.sleep(0.1)
conn.flushInput()

Expand Down Expand Up @@ -542,6 +697,8 @@ def send_command(cmd):
# Discarding all input again
time.sleep(0.1)
conn.flushInput()

return supports_ack_nack


def _stop_adapter(conn):
Expand Down Expand Up @@ -571,6 +728,12 @@ def _io_process(device,
getLogger().setLevel('INFO')

logger.info('IO process started with PID %r', os.getpid())

# Enable debug logging if requested via environment variable
if os.environ.get('DRONECAN_DEBUG'):
import logging
logging.getLogger('dronecan.driver.slcan').setLevel(logging.DEBUG)
logger.info('SLCAN debug logging enabled via DRONECAN_DEBUG environment variable')

# We don't need stdin
try:
Expand Down Expand Up @@ -612,23 +775,22 @@ def is_parent_process_alive():
# Preparing the RX thread
#
should_exit = False
supports_ack_nack = True # Default value

def rx_thread_wrapper():
rx_worker = RxWorker(conn=conn,
rx_queue=rx_queue,
ts_estimator_mono=ts_estimator_mono,
ts_estimator_real=ts_estimator_real,
termination_condition=lambda: should_exit)
termination_condition=lambda: should_exit,
supports_ack_nack=supports_ack_nack)
try:
rx_worker.run()
except Exception as ex:
logger.error('RX thread failed, exiting', exc_info=True)
# Propagating the exception to the parent process
rx_queue.put(ex)

rxthd = threading.Thread(target=rx_thread_wrapper, name='slcan_rx')
rxthd.daemon = True

try:
if auto_reopen:
conn = RetrySerial(device, baudrate or DEFAULT_BAUDRATE, bitrate)
Expand All @@ -643,8 +805,10 @@ def rx_thread_wrapper():
# Actual work is here
#
try:
_init_adapter(conn, bitrate)
supports_ack_nack = _init_adapter(conn, bitrate)

rxthd = threading.Thread(target=rx_thread_wrapper, name='slcan_rx')
rxthd.daemon = True
rxthd.start()

logger.info('IO process initialization complete')
Expand All @@ -655,7 +819,8 @@ def rx_thread_wrapper():
tx_queue=tx_queue,
termination_condition=lambda: (should_exit or
not rxthd.is_alive() or
not is_parent_process_alive()))
not is_parent_process_alive()),
supports_ack_nack=supports_ack_nack)
tx_worker.run()
except Exception as ex:
logger.error('IO process failed', exc_info=True)
Expand Down
19 changes: 16 additions & 3 deletions dronecan/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,23 @@ def periodic(self, period_seconds, callback):
priority = 0

def caller(scheduled_deadline):
# Event MUST be re-registered first in order to ensure that it can be cancelled from the callback
# Always reschedule first to maintain periodic timing
scheduled_deadline += period_seconds
event_holder[0] = self._scheduler.enterabs(scheduled_deadline, priority, caller, (scheduled_deadline,))
callback()

# Prevent callback overlap that can cause runaway scheduling
if callback_running[0]:
# Skip this execution if previous callback is still running
return

callback_running[0] = True
try:
callback()
finally:
callback_running[0] = False

first_deadline = self._scheduler.timefunc() + period_seconds
callback_running = [False]
event_holder = [self._scheduler.enterabs(first_deadline, priority, caller, (first_deadline,))]
return self._make_sched_handle(lambda: event_holder[0])

Expand Down Expand Up @@ -438,7 +449,9 @@ def execute_once():
while time.monotonic() < deadline:
execute_once()
else:
while True:
# Process available frames with a reasonable limit to prevent GUI blocking
max_frames_per_spin = 100 # Limit frames processed per spin(0) call
while count < max_frames_per_spin:
frame = self._can_driver.receive(0)
if frame:
self._recv_frame(frame)
Expand Down
Loading