1+ import json
12import os
23import time
34from typing import Any , List
@@ -27,9 +28,30 @@ def __init__(self, db_path: str):
2728 if db_dir :
2829 os .makedirs (db_dir , exist_ok = True )
2930 self ._db_path = db_path
30- self ._db = TinyDB ( db_path )
31+ self ._db = self . _open_db_with_retry ( )
3132 self ._table = self ._db .table ("events" )
3233
34+ def _open_db_with_retry (self , max_retries : int = 3 ) -> TinyDB :
35+ """Open TinyDB with retry logic to handle transient JSON decode errors."""
36+ last_error : Exception | None = None
37+ for attempt in range (max_retries ):
38+ try :
39+ return TinyDB (self ._db_path )
40+ except json .JSONDecodeError as e :
41+ last_error = e
42+ logger .warning (f"TinyDB JSON decode error on attempt { attempt + 1 } : { e } " )
43+ # Wait a bit and retry - the file might be mid-write
44+ time .sleep (0.1 * (attempt + 1 ))
45+ # Try to recover by removing the corrupted file
46+ if attempt == max_retries - 1 and os .path .exists (self ._db_path ):
47+ try :
48+ logger .warning (f"Removing corrupted TinyDB file: { self ._db_path } " )
49+ os .remove (self ._db_path )
50+ return TinyDB (self ._db_path )
51+ except Exception :
52+ pass
53+ raise last_error if last_error else RuntimeError ("Failed to open TinyDB" )
54+
3355 def publish_event (self , event_type : str , data : Any , process_id : str ) -> None :
3456 """Publish an event to the database using atomic transaction."""
3557 try :
@@ -55,38 +77,48 @@ def publish_event(self, event_type: str, data: Any, process_id: str) -> None:
5577 logger .warning (f"Failed to publish event to database: { e } " )
5678
5779 def get_unprocessed_events (self , process_id : str ) -> List [dict ]:
58- """Get unprocessed events from other processes."""
59- try :
60- # Clear query cache to force fresh read from disk
61- # TinyDB caches query results, so we need to clear cache to see
62- # events written by other processes. The search() method will
63- # automatically call _read_table() on a cache miss.
64- self ._table .clear_cache ()
65-
66- Event = Query ()
67- results = self ._table .search ((Event .process_id != process_id ) & (Event .processed == False )) # noqa: E712
68-
69- logger .debug (
70- f"TinyDBEventBusDatabase: Found { len (results )} unprocessed events for process_id: { process_id } in database: { self ._db_path } "
71- )
72-
73- events = []
74- # Sort by timestamp
75- for event in sorted (results , key = lambda x : x .get ("timestamp" , 0 )):
76- events .append (
77- {
78- "event_id" : event ["event_id" ],
79- "event_type" : event ["event_type" ],
80- "data" : event ["data" ],
81- "timestamp" : event ["timestamp" ],
82- "process_id" : event ["process_id" ],
83- }
80+ """Get unprocessed events from other processes with retry logic."""
81+ max_retries = 3
82+ for attempt in range (max_retries ):
83+ try :
84+ # Clear query cache to force fresh read from disk
85+ # TinyDB caches query results, so we need to clear cache to see
86+ # events written by other processes. The search() method will
87+ # automatically call _read_table() on a cache miss.
88+ self ._table .clear_cache ()
89+
90+ Event = Query ()
91+ results = self ._table .search ((Event .process_id != process_id ) & (Event .processed == False )) # noqa: E712
92+
93+ logger .debug (
94+ f"TinyDBEventBusDatabase: Found { len (results )} unprocessed events for process_id: { process_id } in database: { self ._db_path } "
8495 )
8596
86- return events
87- except Exception as e :
88- logger .warning (f"Failed to get unprocessed events: { e } " )
89- return []
97+ events = []
98+ # Sort by timestamp
99+ for event in sorted (results , key = lambda x : x .get ("timestamp" , 0 )):
100+ events .append (
101+ {
102+ "event_id" : event ["event_id" ],
103+ "event_type" : event ["event_type" ],
104+ "data" : event ["data" ],
105+ "timestamp" : event ["timestamp" ],
106+ "process_id" : event ["process_id" ],
107+ }
108+ )
109+
110+ return events
111+ except json .JSONDecodeError as e :
112+ logger .warning (f"TinyDB JSON decode error on get_unprocessed_events attempt { attempt + 1 } : { e } " )
113+ if attempt < max_retries - 1 :
114+ time .sleep (0.1 * (attempt + 1 ))
115+ else :
116+ logger .warning ("Failed to read events after retries, returning empty list" )
117+ return []
118+ except Exception as e :
119+ logger .warning (f"Failed to get unprocessed events: { e } " )
120+ return []
121+ return []
90122
91123 def mark_event_processed (self , event_id : str ) -> None :
92124 """Mark an event as processed using atomic transaction."""
0 commit comments