Skip to content

Commit 8a3be74

Browse files
committed
QObject Thread safeguards for DB rules enumeration and matching against existing attached list subs
1 parent b30e186 commit 8a3be74

20 files changed

+2351
-295
lines changed

ui/opensnitch/plugins/list_subscriptions/list_subscriptions.py

Lines changed: 151 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,52 @@
7777
# -------------------- plugin core --------------------
7878

7979

80+
class _LogSignalWrapper(QtCore.QObject):
81+
"""Thin QObject container for the (message, level) log signal.
82+
Follows the same pattern as PluginSignal so the runtime can emit
83+
structured log entries directly to the UI status controller.
84+
"""
85+
signal = QtCore.pyqtSignal(str, str, str)
86+
87+
def emit(self, message: str, level: str = "INFO", origin: str = "backend") -> None:
88+
self.signal.emit(message, level, origin)
89+
90+
def connect(self, callback: Any) -> None:
91+
self.signal.connect(callback)
92+
93+
def disconnect(self, callback: Any) -> None: # pyright: ignore[reportIncompatibleMethodOverride]
94+
self.signal.disconnect(callback)
95+
96+
97+
class _UiLogBridgeHandler(logging.Handler):
98+
"""Relays backend logger records into the UI live log signal."""
99+
100+
def __init__(self, sink: _LogSignalWrapper):
101+
super().__init__(level=logging.DEBUG)
102+
self._sink = sink
103+
104+
@staticmethod
105+
def _level_name(record: logging.LogRecord) -> str:
106+
if record.levelno >= logging.ERROR:
107+
return "ERROR"
108+
if record.levelno >= logging.WARNING:
109+
return "WARN"
110+
if record.levelno >= logging.INFO:
111+
return "INFO"
112+
return "DEBUG"
113+
114+
def emit(self, record: logging.LogRecord) -> None:
115+
try:
116+
if bool(getattr(record, "_skip_ui_bridge", False)):
117+
return
118+
message = record.getMessage().strip()
119+
if message == "":
120+
return
121+
self._sink.emit(message, self._level_name(record), "backend")
122+
except Exception:
123+
pass
124+
125+
80126
class SingletonABCMeta(ABCMeta):
81127
_instances: dict[type, object] = {}
82128
_lock = threading.Lock()
@@ -112,6 +158,11 @@ class ListSubscriptions(PluginBase, metaclass=SingletonABCMeta):
112158
# default
113159
TYPE: ClassVar[list[Any]] = [PluginBase.TYPE_GLOBAL]
114160

161+
# UI log signal — connect to DialogStatusController.log to forward
162+
# runtime messages to the main window's live log facility.
163+
log_out: ClassVar[_LogSignalWrapper] = _LogSignalWrapper()
164+
_ui_log_bridge_handler_installed: ClassVar[bool] = False
165+
115166
# runtime state
116167
scheduled_tasks: dict[str, GenericTimer] = {}
117168
default_conf: ClassVar[str] = ACTION_FILE
@@ -132,6 +183,7 @@ def __init__(self, config: dict[str, Any] | None = None):
132183
return
133184

134185
self._initialized = True
186+
self._ensure_ui_log_bridge_handler()
135187
self.signal_in.connect(self.cb_signal)
136188
self._desktop_notifications = DesktopNotifications()
137189
self._db = Database.instance()
@@ -163,6 +215,67 @@ def __init__(self, config: dict[str, Any] | None = None):
163215
else:
164216
self._session.headers.update({"User-Agent": DEFAULT_UA})
165217

218+
@classmethod
219+
def _ensure_ui_log_bridge_handler(cls) -> None:
220+
if cls._ui_log_bridge_handler_installed:
221+
return
222+
logger.addHandler(_UiLogBridgeHandler(cls.log_out))
223+
cls._ui_log_bridge_handler_installed = True
224+
225+
@staticmethod
226+
def _backend_level(level: str) -> int:
227+
normalized = (level or "INFO").strip().upper()
228+
if normalized in ("TRACE", "DEBUG"):
229+
return logging.DEBUG
230+
if normalized in ("WARN", "WARNING"):
231+
return logging.WARNING
232+
if normalized == "ERROR":
233+
return logging.ERROR
234+
if normalized == "CRITICAL":
235+
return logging.CRITICAL
236+
return logging.INFO
237+
238+
def _log_backend(
239+
self,
240+
message: str,
241+
level: str = "INFO",
242+
*,
243+
suppress_ui_bridge: bool = False,
244+
) -> None:
245+
full_text = (message or "").strip()
246+
if full_text == "":
247+
return
248+
logger.log(
249+
self._backend_level(level),
250+
full_text,
251+
extra={"_skip_ui_bridge": bool(suppress_ui_bridge)},
252+
)
253+
254+
def ingest_ui_log(
255+
self,
256+
message: str,
257+
level: str = "INFO",
258+
origin: str = "ui",
259+
) -> None:
260+
"""UI -> backend bridge: write UI logs to the backend true logger."""
261+
self._log_backend(
262+
f"[{origin}] {message}",
263+
level,
264+
suppress_ui_bridge=True,
265+
)
266+
267+
def log_debug(self, message: str) -> None:
268+
self._log_backend(message, "DEBUG")
269+
270+
def log_info(self, message: str) -> None:
271+
self._log_backend(message, "INFO")
272+
273+
def log_warn(self, message: str) -> None:
274+
self._log_backend(message, "WARN")
275+
276+
def log_error(self, message: str) -> None:
277+
self._log_backend(message, "ERROR")
278+
166279
def _emit_runtime_event(
167280
self,
168281
event: RuntimeEventType,
@@ -275,7 +388,7 @@ def _start_runtime(self, *, recheck: bool):
275388
else:
276389
with self._startup_recheck_lock:
277390
self._startup_recheck_pending = True
278-
logger.warning(
391+
logger.info(
279392
"deferring startup refresh until a local node is connected"
280393
)
281394

@@ -317,7 +430,7 @@ def _on_nodes_updated(self, total: int):
317430
with self._startup_recheck_lock:
318431
pending = self._startup_recheck_pending
319432
if pending and self._has_ready_local_node():
320-
logger.warning("local node connected, running deferred startup refresh")
433+
logger.info("local node connected, running deferred startup refresh")
321434
self._schedule_startup_recheck(delay=0.5)
322435

323436
def _reload_from_action_file(self, action_path: str | None = None):
@@ -614,14 +727,14 @@ def _reload_rules_for_updated_subscription(self, sub: SubscriptionSpec):
614727
)
615728
self._nodes.send_notification(addr, notification, None)
616729
found_match = True
617-
logger.warning(
730+
logger.info(
618731
"signaling affected rule '%s' for updated subscription '%s'",
619732
rule.name,
620733
sub.name,
621734
)
622735
break
623736
if found_match is False:
624-
logger.warning(
737+
logger.info(
625738
"no matching rules found for updated subscription '%s'",
626739
sub.name,
627740
)
@@ -866,15 +979,15 @@ def _startup_recheck_all(self):
866979
if not self._has_ready_local_node():
867980
with self._startup_recheck_lock:
868981
self._startup_recheck_pending = True
869-
logger.warning("startup refresh skipped, no local node is ready yet")
982+
logger.info("startup refresh skipped, no local node is ready yet")
870983
return
871984
for sub in self._config.subscriptions:
872985
if not sub.enabled:
873986
continue
874987
try:
875988
self.refresh_subscriptions(sub, source="startup_recheck")
876989
except Exception as e:
877-
logger.warning(
990+
logger.error(
878991
"startup recheck error name='%s' err=%s",
879992
sub.name,
880993
repr(e),
@@ -909,10 +1022,10 @@ def cb_run_tasks(self, args: tuple[str, SubscriptionSpec]):
9091022
meta = self._load_meta(meta_path)
9101023

9111024
if self._in_backoff(meta):
912-
logger.warning("skip '%s' (in backoff)", sub.name)
1025+
logger.info("skip '%s' (in backoff)", sub.name)
9131026
return
9141027
if not self._is_due(meta, sub):
915-
logger.warning("skip '%s' (not due yet)", sub.name)
1028+
logger.info("skip '%s' (not due yet)", sub.name)
9161029
return
9171030

9181031
th = threading.Thread(target=self.download, args=(sub,))
@@ -931,7 +1044,7 @@ def cb_run_tasks(self, args: tuple[str, SubscriptionSpec]):
9311044
self._resultsQueue.put(item)
9321045

9331046
if not matched:
934-
logger.debug("cb_run_tasks() no result for key=%s sub=%s", key, sub.name)
1047+
logger.debug("cb_run_tasks: no result for key=%s sub=%s", key, sub.name)
9351048
return
9361049

9371050
updated: bool = False
@@ -974,7 +1087,7 @@ def cb_signal(self, signal: dict[str, Any]):
9741087
action_path = signal.get("action_path")
9751088

9761089
if sig == PluginSignal.ENABLE:
977-
logger.warning(
1090+
logger.debug(
9781091
"cb_signal: ENABLE action_path=%r",
9791092
action_path,
9801093
)
@@ -987,13 +1100,16 @@ def cb_signal(self, signal: dict[str, Any]):
9871100
"Plugin runtime enabled.",
9881101
action_path=action_path,
9891102
)
1103+
logger.info("plugin runtime enabled")
9901104
else:
9911105
self._emit_runtime_event(
9921106
RuntimeEventType.RUNTIME_ERROR,
9931107
"Failed to enable plugin runtime.",
9941108
error=err,
9951109
action_path=action_path,
9961110
)
1111+
logger.error("Failed to enable plugin runtime: %s", repr(err))
1112+
9971113
return
9981114

9991115
if sig == self.REFRESH_SUBSCRIPTIONS_SIGNAL:
@@ -1029,7 +1145,7 @@ def cb_signal(self, signal: dict[str, Any]):
10291145
return
10301146

10311147
if sig == PluginSignal.CONFIG_UPDATE:
1032-
logger.warning(
1148+
logger.debug(
10331149
"cb_signal: CONFIG_UPDATE action_path=%r",
10341150
action_path,
10351151
)
@@ -1049,17 +1165,19 @@ def cb_signal(self, signal: dict[str, Any]):
10491165
"Plugin runtime configuration reloaded.",
10501166
action_path=action_path,
10511167
)
1168+
logger.info("plugin runtime configuration reloaded")
10521169
else:
10531170
self._emit_runtime_event(
10541171
RuntimeEventType.RUNTIME_ERROR,
10551172
"Failed to reload plugin runtime configuration.",
10561173
error=err,
10571174
action_path=action_path,
10581175
)
1176+
logger.error("Failed to reload plugin runtime configuration: %s", repr(err))
10591177
return
10601178

10611179
if sig == PluginSignal.DISABLE or sig == PluginSignal.STOP:
1062-
logger.warning(
1180+
logger.debug(
10631181
"cb_signal: %s action_path=%r",
10641182
"DISABLE" if sig == PluginSignal.DISABLE else "STOP",
10651183
action_path,
@@ -1079,6 +1197,10 @@ def cb_signal(self, signal: dict[str, Any]):
10791197
),
10801198
action_path=action_path,
10811199
)
1200+
logger.info(
1201+
"plugin runtime %s",
1202+
"disabled" if sig == PluginSignal.DISABLE else "stopped",
1203+
)
10821204
return
10831205

10841206
if sig == PluginSignal.ERROR:
@@ -1093,7 +1215,7 @@ def cb_signal(self, signal: dict[str, Any]):
10931215

10941216
raise ValueError(f"unrecognized signal: {sig}")
10951217
except Exception as e:
1096-
logger.warning("cb_signal() exception: %s", repr(e))
1218+
logger.error("cb_signal: exception: %s", repr(e))
10971219

10981220
def _in_backoff(self, meta: ListMetadata):
10991221
if not meta.backoff_until:
@@ -1260,7 +1382,7 @@ def _download_one(
12601382
],
12611383
)
12621384
self._resultsQueue.put((key, True, "not_modified"))
1263-
logger.warning("subscription not-modified name='%s'", sub.name)
1385+
logger.info("subscription not-modified name='%s'", sub.name)
12641386
return True
12651387

12661388
if r.status_code != 200:
@@ -1288,7 +1410,7 @@ def _download_one(
12881410
],
12891411
)
12901412
self._resultsQueue.put((key, False, f"http_{r.status_code}"))
1291-
logger.warning(
1413+
logger.error(
12921414
"subscription download http-error name='%s' code=%s",
12931415
sub.name,
12941416
r.status_code,
@@ -1323,7 +1445,7 @@ def _download_one(
13231445
],
13241446
)
13251447
self._resultsQueue.put((key, False, "too_large"))
1326-
logger.warning(
1448+
logger.error(
13271449
"subscription download too-large name='%s' len=%s",
13281450
sub.name,
13291451
cl,
@@ -1391,7 +1513,7 @@ def _download_one(
13911513
],
13921514
)
13931515
self._resultsQueue.put((key, False, "bad_format"))
1394-
logger.warning(
1516+
logger.error(
13951517
"subscription file bad-format name='%s'",
13961518
sub.name,
13971519
)
@@ -1458,7 +1580,7 @@ def _download_one(
14581580
],
14591581
)
14601582
self._resultsQueue.put((key, False, "write_error"))
1461-
logger.warning(
1583+
logger.error(
14621584
"subscription file write-error name='%s' err=%s",
14631585
sub.name,
14641586
repr(e),
@@ -1499,7 +1621,7 @@ def _download_one(
14991621
)
15001622
],
15011623
)
1502-
logger.warning(
1624+
logger.info(
15031625
"subscription updated name='%s' bytes=%s",
15041626
sub.name,
15051627
downloaded,
@@ -1537,7 +1659,7 @@ def _download_one(
15371659
],
15381660
)
15391661
self._resultsQueue.put((key, False, "unexpected_error"))
1540-
logger.warning(
1662+
logger.error(
15411663
"subscription download unexpected-error name='%s' err=%s",
15421664
sub.name,
15431665
repr(e),
@@ -1602,7 +1724,7 @@ def download(
16021724
had_errors = True
16031725
except Exception as exc:
16041726
had_errors = True
1605-
logger.warning(
1727+
logger.error(
16061728
"batch download failed for '%s': %r",
16071729
sub.name,
16081730
exc,
@@ -1639,4 +1761,12 @@ def download(
16391761
state="batch_failed" if had_errors else "batch_finished",
16401762
items=items,
16411763
)
1764+
logger.error(
1765+
"batch subscription refresh failed for %d/%d items",
1766+
sum(1 for i in items if i.get('state') != "updated"),
1767+
len(items),
1768+
) if had_errors else logger.info(
1769+
"batch subscription refresh finished for %d items",
1770+
len(items),
1771+
)
16421772
return not had_errors

0 commit comments

Comments
 (0)