Skip to content

Commit b6560ce

Browse files
committed
Merge pull request #17 from Overdrivr/graph-cleanup#3
plotting-improvements
2 parents 505dd95 + d2260a6 commit b6560ce

5 files changed

Lines changed: 144 additions & 26 deletions

File tree

pytelemetrycli/cli.py

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from pytelemetrycli.runner import Runner
99
from serial.tools import list_ports
1010
from pytelemetrycli.ui.superplot import Superplot, PlotType
11+
from threading import Lock
1112

1213
def docopt_cmd(func):
1314
def fn(self, arg):
@@ -44,7 +45,13 @@ def __init__(self):
4445
self.transport = transports.SerialTransport()
4546
self.telemetry = Pytelemetry(self.transport)
4647
self.topics = Topics()
47-
self.runner = Runner(self.transport,self.telemetry)
48+
self.plots = []
49+
self.plotsLock = Lock()
50+
self.runner = Runner(self.transport,
51+
self.telemetry,
52+
self.plots,
53+
self.plotsLock,
54+
self.topics)
4855
self.telemetry.subscribe(None,self.topics.process)
4956

5057
self.types_lookup = {'--s' : 'string',
@@ -127,18 +134,41 @@ def do_plot(self, arg):
127134
128135
Usage: plot <topic>
129136
"""
130-
if not self.topics.exists(arg['<topic>']):
131-
print("Topic ",arg['<topic>']," unknown.")
132137

133-
t = self.topics.xytype(arg['<topic>'])
134-
if t == 'indexed':
135-
self.myplot = Superplot(arg['<topic>'],PlotType.indexed)
136-
else:
137-
self.myplot = Superplot(arg['<topic>'])
138-
q = self.myplot.start()
139-
self.topics.transfer(arg['<topic>'],q)
138+
topic = arg['<topic>']
139+
140+
if not self.topics.exists(topic):
141+
print("Topic ",topic," unknown.")
142+
return
143+
144+
if self.topics.intransfer(topic):
145+
print("Topic already plotted.")
146+
return
147+
148+
plotTypeFlag = self.topics.xytype(arg['<topic>'])
149+
plotType = PlotType.linear
150+
151+
if plotTypeFlag == 'indexed':
152+
plotType = PlotType.indexed
153+
154+
p = Superplot(topic,plotType)
155+
q, ctrl = p.start()
156+
157+
# Protect self.plots from modifications from the runner thread
158+
self.plotsLock.acquire()
159+
160+
self.plots.append({
161+
'topic': topic,
162+
'plot': p, # Plot handler
163+
'queue': q, # Data queue
164+
'ctrl': ctrl # Plot control pipe
165+
})
166+
167+
self.plotsLock.release()
168+
169+
self.topics.transfer(topic,q)
140170

141-
print("Plotting:", arg['<topic>'],' in mode [',t,']')
171+
print("Plotting:", topic,' in mode [',plotTypeFlag,']')
142172

143173
@docopt_cmd
144174
def do_pub(self, arg):

pytelemetrycli/runner.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,13 @@
33

44
# Main class
55
class Runner:
6-
def __init__(self, transport, telemetry):
6+
def __init__(self, transport, telemetry, plots, plotsLock, topics):
77

88
self.transport = transport
99
self.telemetryWrapper = telemetry
10+
self.plots = plots
11+
self.plotsLock = plotsLock
12+
self.topics = topics
1013

1114
self.thread = None
1215
self.running = threading.Event()
@@ -40,6 +43,28 @@ def terminate(self):
4043
def run(self):
4144
while self.running.is_set():
4245
if self.connected.is_set():
46+
# Update protocol decoding
4347
self.telemetryWrapper.update()
48+
49+
# Protect the self.plots data structure from
50+
# being modified from the main thread
51+
self.plotsLock.acquire()
52+
53+
# Poll each poll pipe to see if user closed them
54+
plotToDelete = None
55+
for p, i in zip(self.plots,range(len(self.plots))):
56+
if p['ctrl'].poll():
57+
if p['ctrl'].recv() == "closing":
58+
plotToDelete = i
59+
break
60+
61+
# Delete a plot if needed
62+
if plotToDelete is not None:
63+
self.plots[plotToDelete]['ctrl'].close()
64+
topic = self.plots[plotToDelete]['topic']
65+
self.topics.untransfer(topic)
66+
self.plots.pop(plotToDelete)
67+
68+
self.plotsLock.release()
4469
else:
4570
time.sleep(0.5)

pytelemetrycli/test/test_topics.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ def test_transfert_queue_indexed_data():
6969

7070
topic.transfer(t1,q)
7171

72+
assert topic.intransfer(t1) == True
73+
7274
assert q.qsize() > 0
7375

7476
assert q.get() == [5, 123]
@@ -86,3 +88,14 @@ def test_transfert_queue_indexed_data():
8688
assert q.get() == [6, 222]
8789
assert q.get() == [7, 333]
8890
assert q.get() == [8, 333]
91+
92+
topic.untransfer(t1)
93+
94+
assert topic.intransfer(t1) == False
95+
96+
topic.process(t1,111, {'index': 5})
97+
topic.process(t1,222, {'index': 6})
98+
topic.process(t1,333, {'index': 7})
99+
topic.process(t1,333, {'index': 8})
100+
101+
assert q.qsize() == 0

pytelemetrycli/topics.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,5 +68,14 @@ def transfer(self,topic,queue):
6868
queue.put([self.transfers[topic]['lastindex'], item])
6969
self.transfers[topic]['lastindex'] += 1
7070

71+
def untransfer(self,topic):
72+
# If the topic data is already transfered to some queue
73+
if topic in self.transfers:
74+
# Remove it from the transfer list
75+
del self.transfers[topic]
76+
77+
def intransfer(self,topic):
78+
return topic in self.transfers
79+
7180
def xytype(self,topic):
7281
return self.topics[topic]['type']

pytelemetrycli/ui/superplot.py

Lines changed: 55 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
from pyqtgraph.Qt import QtGui, QtCore
33
import numpy as np
44
import pyqtgraph as pg
5-
from multiprocessing import Process, Manager, Queue
6-
import sched, time, threading
5+
from multiprocessing import Process, Queue, Pipe
6+
import time, threading
77
from sortedcontainers import SortedDict
88
from enum import Enum
99

@@ -12,12 +12,20 @@ class PlotType(Enum):
1212
indexed = 1
1313

1414
class Superplot():
15+
"""
16+
Self-contained plotting class that runs in its own process.
17+
Plotting functionality (reset the graph, .. ?) can be controlled
18+
by issuing message-based commands using a multiprocessing Pipe
19+
20+
"""
1521
def __init__(self,name,plottype=PlotType.indexed):
1622
self.name = name
1723
self.plottype = plottype
24+
self._clear()
1825

26+
def _clear(self):
1927
# Process-local buffers used to host the displayed data
20-
if plottype == PlotType.linear:
28+
if self.plottype == PlotType.linear:
2129
self.set = True
2230
self.x = []
2331
self.y = []
@@ -30,15 +38,20 @@ def __init__(self,name,plottype=PlotType.indexed):
3038
self.set = False
3139

3240
def start(self):
41+
# The queue that will be used to transfer data from the main process
42+
# to the plot
3343
self.q = Queue()
44+
main_pipe, self.in_process_pipe = Pipe()
3445
self.p = Process(target=self.run)
3546
self.p.start()
36-
return self.q
47+
# Return a handle to the data queue and the control pipe
48+
return self.q, main_pipe
3749

3850
def join(self):
3951
self.p.join()
4052

4153
def _update(self):
54+
# Empty data queue and process received data
4255
while not self.q.empty():
4356
item = self.q.get()
4457
if self.plottype == PlotType.linear:
@@ -50,16 +63,34 @@ def _update(self):
5063
# TODO : Eventually, need to find high performance alternative. Maybe numpy based
5164
self.xy[item[0]] = item[1]
5265

66+
# Initialize view on data dictionnary only once for increased performance
5367
if not self.set:
5468
self.set = True
5569
self.x = self.xy.keys()
5670
self.y = self.xy.values()
5771

72+
# Refresh plot data
5873
self.curve.setData(self.x,self.y)
5974

75+
try:
76+
if self.in_process_pipe.poll():
77+
msg = self.in_process_pipe.recv()
78+
self._process_msg(msg)
79+
except:
80+
# If the polling failed, then the application most likely shut down
81+
# So close the window and terminate as well
82+
self.app.quit()
83+
84+
def _process_msg(self, msg):
85+
if msg == "exit":
86+
# TODO : Remove this line ? Redundant with send after app.exec_() ?
87+
self.in_process_pipe.send("closing")
88+
self.app.quit()
89+
elif msg == "clear":
90+
self._clear()
6091

6192
def run(self):
62-
app = QtGui.QApplication([])
93+
self.app = QtGui.QApplication([])
6394
win = pg.GraphicsWindow(title="Basic plotting examples")
6495
win.resize(1000,600)
6596
win.setWindowTitle('pyqtgraph example: Plotting')
@@ -70,9 +101,12 @@ def run(self):
70101
timer.timeout.connect(self._update)
71102
timer.start(50)
72103

73-
app.exec_()
74-
75-
104+
self.app.exec_()
105+
try:
106+
self.in_process_pipe.send("closing")
107+
except:
108+
pass
109+
# Process was done, no need to process exception
76110

77111
if __name__ == '__main__':
78112
# This is function is responsible for faking some data (IO, serial port, etc)
@@ -105,19 +139,26 @@ def io_indexed(running,q):
105139
run.set()
106140

107141
# create the plot
108-
#s = Superplot("somePlot",PlotType.linear)
109-
s = Superplot("somePlot",PlotType.indexed)
142+
s = Superplot("somePlot",PlotType.linear)
143+
#s = Superplot("somePlot",PlotType.indexed)
110144

111145
# get the queue used to exchange data
112-
q = s.start()
146+
q, ctrlPipe = s.start()
113147

114148
# start IO thread
115-
#t = threading.Thread(target=io_linear, args=(run,q))
116-
t = threading.Thread(target=io_indexed, args=(run,q))
149+
t = threading.Thread(target=io_linear, args=(run,q))
150+
#t = threading.Thread(target=io_indexed, args=(run,q))
117151
t.start()
118152

153+
while True:
154+
action = input("Type 'q' to quit. Type 'clear' to reset the graph. Type 'exit' to close the graph but stay on main thread.")
155+
if action == 'clear':
156+
ctrlPipe.send('clear')
157+
elif action == 'exit':
158+
ctrlPipe.send('exit')
159+
elif action == 'q':
160+
break
119161

120-
input("Type Enter to quit.")
121162
run.clear()
122163
print("Waiting for IO thread to join...")
123164
t.join()

0 commit comments

Comments
 (0)