Skip to content

Commit 839490b

Browse files
committed
moved timout, db_path, verbose in kwargs in generators; added sql timeout; added repair
1 parent 6659903 commit 839490b

2 files changed

Lines changed: 100 additions & 29 deletions

File tree

pytrackunit/sqlcache.py

Lines changed: 65 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -312,29 +312,34 @@ def sql_item_to_history_item(obj : Tuple) -> dict:
312312

313313
async def SqlInsertIter(
314314
sqliter : TuIter,
315-
db_path : str,
316315
start_ts_ms : int,
317316
end_ts_ms : int,
318-
verbose : bool,
319317
**kwargs) -> Tuple[list,dict]:
320318
"""Generator which inserts data from an upstream iterator into the database"""
319+
320+
db_path = kwargs['db_path']
321+
verbose = kwargs['verbose']
322+
timeout = kwargs['timeout']
323+
table = kwargs['table']
324+
veh_id = kwargs['id']
325+
321326
assert db_path is not None
322327
assert db_path != ""
323328

324-
async with aiosqlite.connect(db_path) as _db:
329+
async with aiosqlite.connect(db_path,timeout=timeout) as _db:
325330

326331
if verbose:
327332
print("SqlInsertIter __init__ has sqliter:",sqliter)
328-
if kwargs['table'] == "error":
333+
if table == "error":
329334
insert_sql = "INSERT INTO error VALUES (?,?,?,?,?,?,?)"
330335
fconv = error_item_to_sql_item
331-
elif kwargs['table'] == "history":
336+
elif table == "history":
332337
insert_sql = """
333338
INSERT INTO history VALUES
334339
(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
335340
"""
336341
fconv = history_item_to_sql_item
337-
elif kwargs['table'] == "candata":
342+
elif table == "candata":
338343
insert_sql = """
339344
INSERT INTO candata VALUES
340345
(?,?,?,?,?,?)
@@ -343,8 +348,8 @@ async def SqlInsertIter(
343348
else:
344349
raise Exception("Not yet implemented")
345350

346-
await _db.execute(f"INSERT INTO {kwargs['table']}meta VALUES (?,?,?,?)",\
347-
(kwargs['id'],start_ts_ms,end_ts_ms,False))
351+
await _db.execute(f"INSERT INTO {table}meta VALUES (?,?,?,?)",\
352+
(veh_id,start_ts_ms,end_ts_ms,False),)
348353

349354
await _db.commit()
350355

@@ -370,42 +375,47 @@ async def SqlInsertIter(
370375
yield data, meta
371376

372377
await _db.execute(f"""
373-
UPDATE {kwargs['table']}meta SET isloaded=? WHERE unit=? AND start=? AND end=?
374-
""",(True,kwargs['id'],start_ts_ms,end_ts_ms))
378+
UPDATE {table}meta SET isloaded=? WHERE unit=? AND start=? AND end=?
379+
""",(True,veh_id,start_ts_ms,end_ts_ms))
375380

376381
await _db.commit()
377382

378383
if verbose:
379384
print("Loaded",start_ts_ms,end_ts_ms,kwargs)
380385

381386
async def SqlReturnIter(
382-
db_path : str,
383387
start_ts_ms : int,
384388
end_ts_ms : int,
385-
verbose : bool,
386389
**kwargs) -> Tuple[list, dict]:
387390
"""Generator return given data from database"""
391+
392+
db_path = kwargs['db_path']
393+
verbose = kwargs['verbose']
394+
timeout = kwargs['timeout']
395+
table = kwargs['table']
396+
veh_id = kwargs['id']
397+
388398
command = f"""
389-
select * from {kwargs['table']} where unit = ? and time >= ? and time <= ? order by time
399+
select * from {table} where unit = ? and time >= ? and time <= ? order by time
390400
"""
391-
if kwargs['table'] == "error":
401+
if table == "error":
392402
fconv = sql_item_to_error_item
393-
elif kwargs['table'] == "history":
403+
elif table == "history":
394404
fconv = sql_item_to_history_item
395-
elif kwargs['table'] == "candata":
405+
elif table == "candata":
396406
fconv = sql_item_to_candata_item
397407
else:
398408
raise Exception("Table not implemented yet")
399-
async with aiosqlite.connect(db_path) as _db:
409+
async with aiosqlite.connect(db_path,timeout=timeout) as _db:
400410

401411
# Wait until the block is fully loaded
402412

403413
start_time = time.time()
404414
isloaded = False
405415
while not isloaded:
406416
async with _db.execute(f"""
407-
select * from {kwargs['table']}meta where unit = ? and start <= ? and end >= ?
408-
""",(kwargs['id'],start_ts_ms,end_ts_ms)) as cur:
417+
select * from {table}meta where unit = ? and start <= ? and end >= ?
418+
""",(veh_id,start_ts_ms,end_ts_ms)) as cur:
409419
first = await cur.fetchone()
410420
if verbose:
411421
print(first)
@@ -415,7 +425,7 @@ async def SqlReturnIter(
415425
raise Exception(f"Timout waiting for block {start_ts_ms} {end_ts_ms} {kwargs}")
416426
await asyncio.sleep(0)
417427

418-
async with _db.execute(command,(kwargs['id'],start_ts_ms, end_ts_ms)) as _cur:
428+
async with _db.execute(command,(veh_id,start_ts_ms, end_ts_ms)) as _cur:
419429
async for x in _cur:
420430
yield [fconv(x)], kwargs
421431

@@ -430,14 +440,18 @@ def __init__(self,**kwargs):
430440
self.cache1 = kwargs.get('sql_cache1',TuCache(**cache1_kwargs))
431441
self.cache2 = kwargs.get('sql_cache2',TuCache(**kwargs))
432442
self.db_connection = None
443+
self.timeout = kwargs.get('sql_timeout',30)
433444
self.connect()
445+
if kwargs.get('sql_repair_on_start',True):
446+
for table in ['error','history','candata']:
447+
self.repair(table)
434448

435449
def connect(self) -> None:
436450
"""Connects the SqlCache and sets up db_connection"""
437451
if self.db_connection is not None:
438452
raise Exception("Cant connect twice")
439453
no_db_present = not os.path.isfile(self.db_path)
440-
self.db_connection = sqlite3.connect(self.db_path)
454+
self.db_connection = sqlite3.connect(self.db_path,timeout=self.timeout)
441455
if no_db_present:
442456
create_tables(self.db_path)
443457

@@ -461,6 +475,26 @@ def reset(self,remove_data : bool=False) -> None:
461475
os.remove(self.db_path)
462476
self.connect()
463477

478+
def repair(self,table : str) -> None:
479+
"""
480+
repairs the database after crash.
481+
deletes blocks with unloaded data
482+
"""
483+
cur = self.db_connection.execute(f"""
484+
select * from {table}meta where
485+
isloaded = 0
486+
""")
487+
for veh_id, start_ts_ms, end_ts_ms, _ in cur:
488+
self.db_connection.execute(f"""
489+
DELETE FROM {table} WHERE
490+
unit=? AND time>=? AND time<=?
491+
""",(veh_id,start_ts_ms,end_ts_ms))
492+
self.db_connection.execute(f"""
493+
DELETE FROM {table}meta WHERE
494+
unit=? AND start=? AND end=?
495+
""",(veh_id,start_ts_ms,end_ts_ms))
496+
self.db_connection.commit()
497+
464498
async def get_unitlist(self):
465499
"""returns a list of vehicles"""
466500
return await self.cache2.get_unitlist()
@@ -491,10 +525,11 @@ def get_general_upstream(
491525

492526
wrap_iter = SqlInsertIter(
493527
int_data_iter,
494-
self.db_path,
495528
start_ts_ms,
496529
end_ts_ms,
497-
self.verbose,
530+
db_path=self.db_path,
531+
verbose=self.verbose,
532+
timeout=self.timeout,
498533
**kwargs)
499534

500535
if previter is None:
@@ -527,7 +562,13 @@ def get_general_sql(self,
527562
previter = TuIter()
528563

529564
if cnt > 0:
530-
previter.add(SqlReturnIter(self.db_path,start_ts_ms,end_ts_ms,self.verbose,**kwargs))
565+
previter.add(SqlReturnIter(
566+
start_ts_ms,
567+
end_ts_ms,
568+
db_path=self.db_path,
569+
verbose=self.verbose,
570+
timeout=self.timeout,
571+
**kwargs))
531572
elif self.verbose:
532573
print("could not find any item in block ",start_ts_ms,end_ts_ms,"for unit",kwargs['id'])
533574

tests/test_sqlcache.py

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,13 @@ async def test_sqlinsertiter():
281281

282282
cache = CacheForTests()
283283
create_tables(db_path)
284-
it = SqlInsertIter(db_path=db_path,sqliter=cache.ret_val_generator(0,MID),verbose=True,table="error",**cache.testmeta)
284+
it = SqlInsertIter(
285+
db_path=db_path,
286+
sqliter=cache.ret_val_generator(0,MID),
287+
verbose=True,
288+
table="error",
289+
timeout=1,
290+
**cache.testmeta)
285291
async for x in it:
286292
print(x)
287293

@@ -297,7 +303,13 @@ async def test_sqlinsertiter_isloaded():
297303

298304
cache = CacheForTests()
299305
create_tables(db_path)
300-
it = SqlInsertIter(db_path=db_path,sqliter=cache.ret_val_generator(0,MID),verbose=True,table="error",**cache.testmeta)
306+
it = SqlInsertIter(
307+
db_path=db_path,
308+
sqliter=cache.ret_val_generator(0,MID),
309+
verbose=True,
310+
table="error",
311+
timeout=1,
312+
**cache.testmeta)
301313
async for x in it:
302314
print(x)
303315

@@ -319,12 +331,24 @@ async def test_sqlinsertiter_integrity_error():
319331

320332
cache = CacheForTests()
321333
create_tables(db_path)
322-
it = SqlInsertIter(db_path=db_path,sqliter=cache.ret_val_generator(0,MID),verbose=True,table="error",**cache.testmeta)
334+
it = SqlInsertIter(
335+
db_path=db_path,
336+
sqliter=cache.ret_val_generator(0,MID),
337+
verbose=True,
338+
table="error",
339+
timeout=1,
340+
**cache.testmeta)
323341
async for x in it:
324342
print(x)
325343

326344
with pytest.raises(sqlite3.IntegrityError) as exc:
327-
it = SqlInsertIter(db_path=db_path,sqliter=cache.ret_val_generator(0,MID),verbose=True,table="error",**cache.testmeta)
345+
it = SqlInsertIter(
346+
db_path=db_path,
347+
sqliter=cache.ret_val_generator(0,MID),
348+
verbose=True,
349+
table="error",
350+
timeout=1,
351+
**cache.testmeta)
328352
async for x in it:
329353
print(x)
330354

@@ -335,7 +359,13 @@ async def test_sqlinsertiter_integrity_error():
335359
cur = db.execute("select count(*) from error")
336360
assert 0 == cur.fetchone()[0]
337361

338-
it = SqlInsertIter(db_path=db_path,sqliter=cache.ret_val_generator(0,MID),verbose=True,table="error",**cache.testmeta)
362+
it = SqlInsertIter(
363+
db_path=db_path,
364+
sqliter=cache.ret_val_generator(0,MID),
365+
verbose=True,
366+
table="error",
367+
timeout=1,
368+
**cache.testmeta)
339369
async for x in it:
340370
print(x)
341371

0 commit comments

Comments
 (0)