This repository was archived by the owner on Jan 22, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserve.py
More file actions
306 lines (245 loc) · 11.8 KB
/
serve.py
File metadata and controls
306 lines (245 loc) · 11.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
import os
import re
import sys
import json
import logging
import threading
from sqlite3 import connect as sqlConnect
from http.server import HTTPServer, BaseHTTPRequestHandler
import telebot
# Logger--------------------------------------------
class MyFormatter(logging.Formatter):
def __init__(self, fmt=None, datefmt="%I:%M:%S %p %d-%m-%Y"):
logging.Formatter.__init__(self, fmt, datefmt)
def format(self, record):
self._style._fmt = "[%(asctime)s] - %(message)s"
result = logging.Formatter.format(self, record)
return result
def makeLogger(logFile, stdout=True):
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
formatter = MyFormatter()
fh = logging.FileHandler(logFile)
fh.setFormatter(formatter)
logger.addHandler(fh)
if stdout:
ch = logging.StreamHandler(sys.stdout)
ch.setFormatter(formatter)
logger.addHandler(ch)
return logger
# Custom http server for tracing incoming connection from client----
class MyServer(BaseHTTPRequestHandler):
def _set_headers(self, code=200):
self.send_response(code)
self.send_header("Content-type", "application/json")
self.end_headers()
def do_HEAD(self):
self._set_headers()
def do_POST(self):
try:
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
data = json.loads(post_data)
userId = int(data.get("id"))
status = data.get("status")
job = data.get("job")
host = data.get("host")
# job status used: C: Complete; F: Failed; R: Running
if db.checkIfRegisteredID(userId):
if(status=='S'): # newly submitted job
jobID = db.addJob(userId, host, job)
self._set_headers()
self.wfile.write(str(jobID).encode())
logger.info(f'New job added for user {userId} at {host} : {job}')
bot.send_message(userId, f'A new job <i>{job}</i> is submitted on <b>{host}</b>')
elif status in ["C","F"]: # check if already closed
jobID = data.get("jobID") # if not starting, request must contain a job ID
db.closeJob(jobID, status) # jobID is primary key so no other info is required
txt = 'is now complete.' if status=='C' else 'has failed.'
logger.info(f'Job closed for user {userId} at {host} : {job}, job={job}, jobID={jobID}')
bot.send_message(userId, f'Your job <i>{job}</i> on <b>{host}</b> {txt}')
self._set_headers()
else:
logger.info(f"Warning: Incoming unknows status: {status}. User ID={userId}, Host={host} Job={job}")
self._set_headers(503)
else:
logger.info(f"Incoming request for unregistered user: {userId}")
self._set_headers(503)
except Exception as e:
logger.exception("Failed to parse request.")
self._set_headers(500)
def runServer(addr='0.0.0.0',port=8123):
server_address = (addr, port)
# would fail if port is occupied
httpd = HTTPServer(server_address, MyServer)
print(f"Starting httpd server on {addr}:{port}")
httpd.serve_forever()
# Database to keep track of all jobs for all users-----
class DataBase:
def __init__(self, dbFile):
self.dbFile = dbFile
if not os.path.exists(dbFile): # create the database, if doesn't exist
with sqlConnect(self.dbFile) as con:
cur = con.cursor()
cur.executescript( "CREATE TABLE JOBINFO("
"jobID integer NOT NULL PRIMARY KEY AUTOINCREMENT,"
"userId INTEGER NOT NULL,"
"host TEXT NOT NULL,"
"status TEXT NOT NULL,"
"job TEXT NOT NULL);"
"CREATE TABLE USERIDS ( userid NOT NULL UNIQUE);")
# add the Admin ID to the database
cur.execute('INSERT into USERIDS (userid) values (?)',(ADMIN,))
def listRunningJobs(self, userID):
with sqlConnect(self.dbFile) as con:
cur = con.cursor()
cur.execute('Select host,job from JOBINFO where userId=? and status="R"',(userID,))
data = cur.fetchall()
count = len(data)
if count:
data = [[f'{l}. {i}',j] for l,(i,j) in enumerate(data, start=1)]
data = [[trimMe(i) for i in j ] for j in data]
lens = [max([len(i)+1 for i in a]) for a in list(zip(*data))]
txt = [[i.ljust(lens[k]) for k,i in enumerate(j)] for j in data]
header = ' '.join(['Host'.center(lens[0]), "Job".center(lens[1])])
txt = "The follwing jobs are running:\n\n <pre>" +header+\
'\n'+'-'*30+'\n'+'\n'.join([' '.join(i) for i in txt])+'</pre>'
else:
txt = "No running jobs found"
return txt,count
def listAllJobs(self,userID):
with sqlConnect(self.dbFile) as con:
cur = con.cursor()
cur.execute('Select host,status,job from JOBINFO where userId=?',(userID,))
data = cur.fetchall()
count = len(data)
if count:
data = [[f'{l}. {i}',j,k] for l,(i,j,k) in enumerate(data, start=1)]
data = [[trimMe(i) for i in j ] for j in data]
lens = [max([len(i)+1 for i in a]) for a in list(zip(*data))]
txt = [[i.ljust(lens[k]) for k,i in enumerate(j)] for j in data]
header = ' '.join(['Host'.center(lens[0]),"S".center(lens[1]) , "Job".center(lens[2])])
txt = "List of Jobs:\n\n <pre>" +header+\
'\n'+'-'*30+'\n'+'\n'.join([' '.join(i) for i in txt])+'</pre>'
else:
txt = "Job List empty"
return txt,count
def addJob(self,userId, host, job):
# Adds new job to the database and returns the job ID
with sqlConnect(self.dbFile) as con:
cur = con.cursor()
cur.execute('Insert into JOBINFO (userId, host, status, job) values (?,?,?,?)',(userId,host,'R',job))
# jobID is a primary key in JOBINFO, so sqlite should keep that information in `sqlite_sequence` table
cur.execute("select seq from sqlite_sequence where name='JOBINFO'")
jobID, = cur.fetchall()[0]
return jobID
def closeJob(self, jobID, status):
with sqlConnect(self.dbFile) as con:
cur = con.cursor()
cur.execute("UPDATE JOBINFO SET status=? where jobID=?",(status,jobID))
def removeJob(self, userId, index):
with sqlConnect(self.dbFile) as con:
cur = con.cursor()
cur.execute('Select jobID from JOBINFO where userId=?',(userId,))
jobIds = cur.fetchall()
jobIdsToRemove= [jobIds[i-1] for i in index]
cur.executemany("Delete from JOBINFO where jobID=? ",jobIdsToRemove)
logger.info(f'Job(s) removed for user {userId} jobIDs : {" ".join([str(i) for (i,) in jobIdsToRemove])}')
def checkIfRegisteredID(self, userID):
with sqlConnect(self.dbFile) as con:
cur = con.cursor()
cur.execute('SELECT userid from USERIDS')
userids = [int(i) for (i,) in cur.fetchall()]
return userID in userids
def checkIfRegisteredUser(self, user):
if self.checkIfRegisteredID(user.id):
return True
else:
logger.info(f"Incoming request for unregistered user: {user.first_name} {user.last_name} ({user.id})")
bot.send_message(ADMIN, f'Registration requested for {user.first_name} {user.last_name} ({user.id})')
return False
def registerUser(self, userID):
with sqlConnect(self.dbFile) as con:
cur = con.cursor()
cur.execute('SELECT userid from USERIDS')
userids = [i for (i,) in cur.fetchall()]
if userID in userids:
bot.send_message(ADMIN, f'User ID {userID} is already in database.')
logger.info(f'User ID {userID} is already in database.')
else:
cur.execute('INSERT into USERIDS (userid) values (?)',(userID,))
bot.send_message(ADMIN, f"User {userID} added to database.")
bot.send_message(userID, 'You are succesfully added to the bot to submit jobs.')
def trimMe(myStr):
return myStr[:10]+'...' if len(myStr)>13 else myStr
with open('.key') as f:
# file written as <bot_API_key> <my_key>
myKey,ADMIN = f.read().strip().split()
bot= telebot.TeleBot(myKey, parse_mode='HTML')
logger = makeLogger('stat.log')
db = DataBase('sqlite3.db')
# Core Telegram bot message handlers
@bot.message_handler(commands='start')
def send_welcome(message):
# Send a welcome message and request registration to admin
user = message.from_user
bot.send_message(user.id, f"Hi there {user.first_name} {user.last_name}. "
"Welcome to this automated bot. This bot keeps track of your running jobs "
"and send you notification when your job is complete or failed. "
f"Your id is <b>{user.id}</b>. Use this when submitting jobs.")
if not db.checkIfRegisteredUser(user):
bot.send_message(user.id,"Note: You are not authorised to submit job with the bot "
"Please wait for the admin to accept your request.")
@bot.message_handler(commands='listjobs')
def send_listRunningJobs(message):
# List Running jobs for the current user
user = message.from_user
logger.info(f'List of running jobs requested for user={user.id}')
if db.checkIfRegisteredUser(user):
jobs,_ = db.listRunningJobs(user.id)
bot.send_message(user.id,jobs)
else:
bot.send_message(user.id,'You are not authorised to use this option.')
@bot.message_handler(commands='listalljobs')
def send_listAllJobs(message):
# List all jobs for the current user
user = message.from_user
logger.info(f'List of all jobs requested for user={user.id}')
if db.checkIfRegisteredUser(user):
jobs,_ = db.listAllJobs(user.id)
bot.send_message(user.id,jobs)
else:
bot.send_message(user.id,'You are not authorised to use this option.')
@bot.message_handler(commands='myinfo')
def send_userinfo(message):
# Send User Id of the user
user = message.from_user
logger.info(f'Information requested for {user.first_name} {user.last_name} ({user.id})')
bot.send_message(user.id, f"Hi there {user.first_name} {user.last_name}. "
f"Your id is <b>{user.id}</b>. Use this when submitting jobs")
@bot.message_handler(commands='remove')
def start(message):
# Remove jobs for the users from database
user = message.from_user
logger.info(f'Requested to remove jobs for user={user.id}')
if db.checkIfRegisteredUser(user):
txt, count = db.listAllJobs(user.id)
sent = bot.send_message(user.id, 'Provide serial number of jobs to remove.\n'+txt)
if count : bot.register_next_step_handler(sent, removewithIDs)
else:
bot.send_message(user.id,'You are not authorised to use this option.')
def removewithIDs(message):
# Remove jobs handlers
toRemoveIds = [int(i) for i in re.split('[, ]+',message.text)]
db.removeJob(message.from_user.id,toRemoveIds)
bot.send_message(message.from_user.id, f'These jobs are removed {",".join([str(i) for i in toRemoveIds])}')
@bot.message_handler(func=lambda message: True)
def echo_all(message):
# User registration only for the Admin
if message.from_user.id==int(ADMIN) and message.text.lower().startswith('register'):
newUserID = message.text.split()[1]
logger.info(f'New user registration requested for {newUserID}')
db.registerUser(newUserID)
# start the bot and http server in different thread
threading.Thread(target=bot.infinity_polling,daemon=True).start()
runServer()