-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprogram.py
More file actions
140 lines (119 loc) · 4.65 KB
/
program.py
File metadata and controls
140 lines (119 loc) · 4.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
from os.path import exists as pathexists, isfile, join as path_join, splitext
from os import strerror, unlink, listdir
import errno
import json
import threading
from config import MultifileConfig as Config
from coderbot import CoderBot
# Default values, overwrited by admin configuration
PROGRAM_PATH = 'data'
PROGRAM_EXTENSION = 'bot'
class Program(object):
@staticmethod
def listdir(path=None):
if path is None: path = Config().get('program_path', PROGRAM_PATH)
files = [f for f in listdir(path) if isfile(path_join(path, f))]
files = [splitext(f)[0] for f in files if splitext(f)[1].lower() == ".%s" % Config().get('program_extension', PROGRAM_EXTENSION)]
return files
@staticmethod
def _getpath(filename):
return path_join(Config().get('program_path', PROGRAM_PATH), "%s.%s" % (filename, Config().get('program_extension', PROGRAM_EXTENSION)))
@staticmethod
def new(filename, overwrite=False, dom=None, code=None):
# check if file exists
path = Program._getpath(filename)
if pathexists(path) and (not overwrite or not isfile(path)):
raise OSError(errno.EEXIST, strerror(errno.EEXIST), path)
# create an empty file for filename reservation
#open(path, 'w').close()
return Program(filename, dom, code)
@staticmethod
def load(filename):
# check if file exists
path = Program._getpath(filename)
if not isfile(path):
raise OSError(errno.ENOENT, strerror(errno.ENOENT), path)
# try to open the file and load datas
with open(path, 'r') as f:
data = json.load(f)
return Program(filename, data.get('dom_code'), data.get('py_code'))
@staticmethod
def delete(filename):
# check if file exists
path = Program._getpath(filename)
if not isfile(path):
raise OSError(errno.EEXIST, strerror(errno.EEXIST), path)
# try to delete the file on system
return unlink(path) # probably return always None
def __init__(self, filename, dom=None, code=None):
self._name = filename
self._dom = dom
self._code = code
self._running = False
self._shutdown = False
self._thread = None
def update(self, dom=None, code=None):
# save in self instance dom and/or code
if dom is not None: self._dom = dom
if code is not None: self._code = code
def get(self):
return {
'name': self._name,
'dom_code': self._dom,
'py_code': self._code
}
def save(self):
# save datas in the file
path = Program._getpath(self._name)
data = {'name': self._name,
'dom_code': self._dom,
'py_code': self._code}
with open(path, 'w') as f:
json.dump(data, f, indent=4, sort_keys=True)
# This can be done by the use of the 'new' static method and 'save'
# def save_as(self, filename, overwrite=False):
# # check if file exists
# # empty the file if overwrite
# # save datas in the file
# pass
def start(self):
# if file is loaded
if self._running: raise RuntimeError('already running')
if not self._code: return False
# exec in a separate thread the _run method
self._running = True
self._thread = threading.Thread(target=self._run)
self._thread.start()
return True
def stop(self):
# try to abort the program
# and wait for it (perhaps with timeout to force shutdown)
if self._running:
self._shutdown = True
self._thread.join()
def isRunning(self):
# get the running status of the program
return self._running
def infinite_loop_trap(self, msg=""):
if self._shutdown:
raise KeyboardInterrupt(msg)
def _run(self):
# exec in a secure environment the coderbot
coderbot = CoderBot()
glbs = {'__name__': self._name,
'program': self,
'config': Config(),
'coderbot': coderbot
}
if Config().get('program_video_rec', False):
coderbot.camera.start_recording()
try: exec(self._code, glbs)
except: raise
finally:
coderbot.camera.stop_recording()
coderbot.motors.stop()
for sensor in coderbot.sensors.itervalues():
try: sensor._stop()
except AttributeError: pass
self._running = False
self._shutdown = False