Skip to content

Commit 2ebc9e8

Browse files
committed
Move command completion to command queue
The base frontend had a somewhat complicated mechanism for tracking outstanding commands in order to process completions and allow for the callers to wait for command completions. This required managing multiple lists with their associated locks. In order to avoid any completions triggering prior to the frontend being able to insert the command into the appropriate list. However, it is much simpler to move all of that logic to the command queue. The queue already knows about all submitted commands since frontends submit all commands through it. It can also managed completions using a single since there is no possible way for the command to complete prior to it being de-queued. This makes the entire management and frontend simpler.
1 parent bff51db commit 2ebc9e8

6 files changed

Lines changed: 83 additions & 54 deletions

File tree

emulator/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,9 @@ def __init__(self, config, frontend, sequential=False):
4646
self._frontend = frontends.create_frontend(frontend)
4747
if self._frontend is None:
4848
raise EmulatorError(f"Failed to create frontend '{frontend}'")
49+
command_queue = self._frontend.get_queue()
4950
mcu = load_mcu(machine.controller)
50-
self._controller = mcu(config, self._frontend.complete_command)
51+
self._controller = mcu(config, command_queue.complete_command)
5152
if self._controller is None:
5253
raise EmulatorError(f"Controller creation failure")
5354
self._frontend.set_sequential_mode(sequential)

frontends/__init__.py

Lines changed: 3 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,12 @@
1717
import importlib
1818
import select
1919
import os
20-
import time
2120
import pickle
2221
import threading
2322
import vortex.core.lib.logging as logging
2423
from vortex.core import ObjectKlass
2524
from vortex.frontends.lib import create_pty
26-
from vortex.frontends.queues import CommandQueue, Command
25+
from vortex.frontends.queues import CommandQueue
2726
from vortex.frontends.proto import *
2827

2928
class BaseFrontend:
@@ -36,8 +35,6 @@ def __init__(self, queue_size=0):
3635
self._obj_id_2_name = {x: {} for x in ObjectKlass}
3736
self._command_completion = {}
3837
self._command_completion_lock = threading.Lock()
39-
self._command_results = {}
40-
self._command_results_lock = threading.Lock()
4138
self._stop = threading.Event()
4239
self._run_sequential = False
4340
self._queue = CommandQueue(queue_size)
@@ -65,8 +62,6 @@ def set_controller(self, controller):
6562
# return the base class method instead of the overriden
6663
# one.
6764
def reset(self, *args, **kwargs):
68-
with self._command_completion_lock:
69-
self._command_results.clear()
7065
self._queue.clear()
7166
return self._controller.reset(*args, **kwargs)
7267

@@ -150,9 +145,6 @@ def stop(self):
150145
if self._thread:
151146
self._stop.set()
152147
self._thread.join()
153-
cmds = list(self._command_completion.keys())
154-
for cmd in cmds:
155-
self.complete_command(cmd, -1)
156148

157149
def set_sequential_mode(self, mode):
158150
self._run_sequential = mode
@@ -205,36 +197,13 @@ def queue_command(self, klass, object, cmd, opts, callback=None):
205197
opts = {_o:_v for _o, _v in (s.split('=') for s in opts.split(','))} if opts else {}
206198
elif not isinstance(opts, dict):
207199
return False
208-
self.log.debug(f"Submitting command: {self.get_object_name(klass, obj_id)} {cmd_id} {opts}")
209-
with self._command_completion_lock:
210-
cmd_id, cmd = self._queue.queue_command(obj_id, cmd_id, opts)
211-
self.log.debug(f"Command ID:{cmd_id}")
212-
self._command_completion[cmd_id] = (cmd, callback)
200+
cmd_id, cmd = self._queue.queue_command(obj_id, cmd_id, opts, callback)
213201
if self._run_sequential:
214202
self.wait_for_command(cmd_id)
215203
return cmd_id
216204

217-
def complete_command(self, id, result, data=None):
218-
self.log.debug(f"Completing command: {id} {result}")
219-
with self._command_completion_lock:
220-
_, callback = self._command_completion.pop(id)
221-
if callback is not None:
222-
callback(id, result, data)
223-
else:
224-
with self._command_results_lock:
225-
self._command_results[id] = (result, data)
226-
227205
def wait_for_command(self, cmd_set):
228-
if isinstance(cmd_set, int):
229-
cmd_set = [cmd_set]
230-
if not isinstance(cmd_set, (list, tuple, set)):
231-
cmd_set = list(cmd_set)
232-
cmd_set = set(cmd_set)
233-
completed = set()
234-
while not self._stop.is_set() and not cmd_set & completed:
235-
with self._command_results_lock:
236-
completed = set(self._command_results.keys())
237-
return [self._command_results.pop(i) for i in cmd_set & completed]
206+
return self._queue.wait_for_command(cmd_set)
238207

239208
def respond(self, code, data):
240209
response = Response(code, data)

frontends/direct/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ def _process_command(self, data):
4545
return
4646

4747
klass = ObjectKlass[klass.upper()]
48-
cmd_id = self.queue_command(klass, object, cmd, opts)
48+
cmd_id = self.queue_command(klass, object, cmd, opts, self.complete_command)
4949
if cmd_id is False:
5050
self.log.error("Failed to queue command")
5151
super().respond(CommandStatus.FAIL, False)

frontends/gcode/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ def M204(self, cmd):
275275
self.log.error("Failed to queue command")
276276
return CommandStatus.FAIL
277277
cmds.append(cmd_id)
278-
self.wait_for_commands(cmds)
278+
self.wait_for_command(cmds)
279279
return CommandStatus.SUCCESS
280280
def M400(self, cmd):
281281
while self._command_completion:

frontends/klipper/helpers.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -228,9 +228,9 @@ def __init__(self, frontend, queue, oid, obj_id, klass, name):
228228
self.name, "use_pins",
229229
{"enable": True})
230230
result = self.frontend.wait_for_command(cmd_id)
231-
if not result or result[0][0] != 0:
231+
if not result or result[0].id != cmd_id or result[0].result != 0:
232232
raise ValueError(f"Heater {self.name} pin enable error")
233-
self.pin_word = ctypes.cast(result[0][1]["pin_addr"], ctypes.POINTER(ctypes.c_uint8))
233+
self.pin_word = ctypes.cast(result[0].data["pin_addr"], ctypes.POINTER(ctypes.c_uint8))
234234
def _set_pin(self, value):
235235
self.pin_word.contents.value = int(not (not (Flags.ON & value)))
236236
def shutdown(self):
@@ -293,9 +293,9 @@ def __init__(self, frontend, queue, oid, obj_id, klass, name):
293293
self.name, "use_pins",
294294
{"enable": True})
295295
result = self.frontend.wait_for_command(cmd_id)
296-
if not result or result[0][0] != 0:
296+
if not result or result[0].id != cmd_id or result[0].result != 0:
297297
raise ValueError(f"Stepper {self.name} pin enable error")
298-
self.pin_word = atomics.Atomic(32, var=result[0][1]["pin_addr"])
298+
self.pin_word = atomics.Atomic(32, var=result[0].data["pin_addr"])
299299
def owns_pin(self, pin):
300300
return pin in self.pins.values()
301301
def configure_pin(self, oid, pin):
@@ -605,7 +605,7 @@ def reset(self, value):
605605
result = self.frontend.wait_for_command(cmd_id)
606606
if not result:
607607
return -1
608-
return result[0][0]
608+
return result[0].result
609609

610610
class ButtonsState(enum.IntEnum):
611611
NONE = 0
@@ -721,8 +721,8 @@ def finish_config(self):
721721
if cmd_id == -1:
722722
raise HelperException("Failed to set PWM object id.")
723723
result = self.frontend.wait_for_command(cmd_id)
724-
if not result or result[0][0] != 0:
725-
raise HelperException(f"Failed command to set PWM object id ({result[0]})")
724+
if not result or result[0].id != cmd_id or result[0].result != 0:
725+
raise HelperException(f"Failed command to set PWM object id ({result[0].id})")
726726
self.pin = DigitalPin(self.frontend, -1, pin_id, ObjectKlass.DIGITAL_PIN, obj_name)
727727
self.pin.set_max_duration(self.pin_max_duration)
728728
self.pin.set_initial_value(self.pin_value, self.pin_default)
@@ -756,7 +756,7 @@ def event(self, ticks):
756756
self.frontend.shutdown("Failed to set PWM duty cycle")
757757
return 0
758758
result = self.frontend.wait_for_command(cmd_id)
759-
if not result or result[0][0] != 0:
759+
if not result or result[0].id != cmd_id or result[0].result != 0:
760760
self.frontend.shutdown("Failed to set PWM duty cycle.")
761761
return 0
762762
if self.move_queue.empty(self.oid):
@@ -796,4 +796,4 @@ def send(self):
796796
{"index": int(i / self.unit_len),
797797
"color": self.data[i:i+self.unit_len]})
798798
completion = self.frontend.wait_for_command(cmd_id)
799-
return completion[0][0]
799+
return completion[0].result

frontends/queues.py

Lines changed: 67 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,39 +14,98 @@
1414
# You should have received a copy of the GNU General Public License
1515
# along with this program. If not, see <http://www.gnu.org/licenses/>.
1616
import queue
17+
from threading import Lock
18+
from collections import namedtuple
19+
import vortex.core.lib.logging as logging
20+
1721
class Command:
18-
def __init__(self, obj_id, cmd_id, opts):
19-
self.obj_id, self.cmd_id, self.opts = \
20-
obj_id, cmd_id, opts
22+
def __init__(self, obj_id, cmd_id, opts, callback):
23+
self.obj_id = obj_id
24+
self.cmd_id = cmd_id
25+
self.opts = opts
26+
self.callback = callback
2127
self.id = id(self)
28+
2229
def __str__(self):
2330
return f"Command({self.id}, {self.obj_id}:{self.cmd_id})"
24-
31+
32+
QueueCompletion = namedtuple("QueueCompletion", ["command", "result", "data"])
33+
Completion = namedtuple("Completion", ["id", "result", "data"])
34+
35+
log = logging.getLogger("vortex.queue")
36+
2537
class CommandQueue(queue.Queue):
2638
def __init__(self, maxsize=0):
2739
super().__init__(maxsize)
2840
self.__cmd_count = 0
2941
self.__max_size = maxsize
42+
self.__lock = Lock()
43+
self.__cmd_queue = {}
44+
self.__comp_queue = {}
45+
3046
def put(self, command):
3147
if not isinstance(command, Command):
3248
raise ValueError("'command' is not a Command instance")
33-
super().put(command)
49+
with self.__lock:
50+
super().put(command)
51+
self.__cmd_queue[command.id] = command
3452
self.__cmd_count += 1
3553
return command.id
54+
3655
def get(self, block=True, timeout=None):
3756
cmd = super().get(block, timeout)
3857
self.__cmd_count -= 1
3958
return cmd
40-
def queue_command(self, obj_id, cmd_id, opts):
41-
cmd = Command(obj_id, cmd_id, opts)
42-
return (self.put(cmd), cmd)
59+
60+
def queue_command(self, obj_id, cmd_id, opts, callback=None):
61+
log.debug(f"Submitting command: {obj_id}: {cmd_id} {opts}")
62+
cmd = Command(obj_id, cmd_id, opts, callback)
63+
self.put(cmd)
64+
log.debug(f"Command queued as {cmd.id}")
65+
return cmd.id, cmd
66+
67+
def complete_command(self, cmd_id, result, data=None):
68+
log.debug(f"Command {cmd_id} competed with {result}")
69+
with self.__lock:
70+
cmd = self.__cmd_queue.pop(cmd_id)
71+
72+
if cmd.callback:
73+
cmd.callback(cmd_id, result, data)
74+
else:
75+
with self.__lock:
76+
self.__comp_queue[cmd_id] = QueueCompletion(cmd, result, data)
77+
78+
def wait_for_command(self, cmd_ids):
79+
if isinstance(cmd_ids, int):
80+
cmd_set = [cmd_ids]
81+
if not isinstance(cmd_ids, (list, tuple, set)):
82+
cmd_set = list(cmd_set)
83+
cmd_set = set(cmd_set)
84+
comp_set = set()
85+
completed = []
86+
while not cmd_set & comp_set:
87+
with self.__lock:
88+
comp_set = set(self.__comp_queue.keys())
89+
for cmd_id in (cmd_set & comp_set):
90+
completion = self.__comp_queue.pop(cmd_id)
91+
completed.append(Completion(completion.command.id,
92+
completion.result,
93+
completion.data))
94+
return completed
95+
4396
def clear(self):
4497
self.shutdown(True)
98+
with self.__lock:
99+
for cmd in self.get():
100+
if cmd.callback:
101+
cmd.callback(cmd.id, -1, None)
45102
self.__cmd_count = 0
46103
self.is_shutdown = False
104+
47105
@property
48106
def size(self):
49107
return self.__cmd_count
108+
50109
@property
51110
def max_size(self):
52111
return self.__max_size

0 commit comments

Comments
 (0)