Skip to content

Commit ddadbfa

Browse files
committed
NE03B-27 Fix internal audit log listener event loop handling
1 parent 1bd3b42 commit ddadbfa

File tree

1 file changed

+8
-6
lines changed

1 file changed

+8
-6
lines changed

mgmtworker/cloudify_system_workflows/snapshots/audit_listener.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
from datetime import datetime
44
from queue import Queue
55
from threading import Event, Thread
6-
from time import sleep
76
from typing import Any
87

98
from cloudify.exceptions import NonRecoverableError
@@ -41,12 +40,15 @@ def start(self, tenant_clients: dict[str, CloudifyClient] | None = None):
4140
super().start()
4241

4342
def run(self):
44-
self._loop.run_until_complete(self._stream_logs())
43+
asyncio.set_event_loop(self._loop)
44+
try:
45+
self._loop.run_until_complete(self._stream_logs())
46+
finally:
47+
self._loop.run_until_complete(self._loop.shutdown_asyncgens())
48+
self._loop.close()
4549

4650
def stop(self):
4751
self._stopped.set()
48-
self._loop.call_soon_threadsafe(self._loop.stop)
49-
self._loop.call_soon_threadsafe(self._loop.close)
5052

5153
def added_snapshot_entity(
5254
self,
@@ -87,7 +89,7 @@ async def _stream_logs(self):
8789
while not self._stopped.is_set():
8890
try:
8991
if not self.__snapshot_entities:
90-
sleep(WAIT_FOR_SNAPSHOT_ENTITIES_SECONDS)
92+
await asyncio.sleep(WAIT_FOR_SNAPSHOT_ENTITIES_SECONDS)
9193
continue
9294
response = await self._client.auditlog.stream(
9395
timeout=self._stream_timeout, since=since)
@@ -99,7 +101,7 @@ async def _stream_logs(self):
99101
since = audit_log.get('created_at')
100102
except BaseException:
101103
pass
102-
self._client.auditlog.close()
104+
self._client.auditlog.close()
103105

104106
def _ref_in_snapshot(self, audit_log: dict) -> bool:
105107
ref_identifier = audit_log.get('ref_identifier', {})

0 commit comments

Comments
 (0)