Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 23 additions & 21 deletions qasync/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
"""
Implementation of the PEP 3156 Event-Loop with Qt.

This package is originally at https://github.com/CabbageDevelopment/qasync,
licensed under the BSD license;
It is modified since the original was unmaintained for two years. References
to Event Loop Policies was removed, since it's scheduled to be deprecated in
Python 3.16.

Copyright (c) 2025 Xinyuan Zhou <johnzhou721@gmail.com>
Comment thread
johnzhou721 marked this conversation as resolved.
Outdated
Copyright (c) 2018 Gerard Marull-Paretas <gerard@teslabs.com>
Copyright (c) 2014 Mark Harviston <mark.harviston@gmail.com>
Copyright (c) 2014 Arve Knudsen <arve.knudsen@gmail.com>
Expand All @@ -13,6 +20,7 @@
"Gerard Marull-Paretas <gerard@teslabs.com>, "
"Mark Harviston <mark.harviston@gmail.com>, "
"Arve Knudsen <arve.knudsen@gmail.com>",
"Xinyuan Zhou <johnzhou721@gmail.com>",
Comment thread
johnzhou721 marked this conversation as resolved.
Outdated
)
__all__ = ["QEventLoop", "QThreadExecutor", "asyncSlot", "asyncClose"]

Expand Down Expand Up @@ -812,29 +820,23 @@ def wrapper(*args, **kwargs):

return outer_decorator


class QEventLoopPolicyMixin:
def new_event_loop(self):
return QEventLoop(QApplication.instance() or QApplication(sys.argv))


class DefaultQEventLoopPolicy(
QEventLoopPolicyMixin,
asyncio.DefaultEventLoopPolicy,
):
pass


@contextlib.contextmanager
def _set_event_loop_policy(policy):
old_policy = asyncio.get_event_loop_policy()
asyncio.set_event_loop_policy(policy)
def _use_qeventloop():
app = QApplication.instance() or QApplication([])
loop = QEventLoop(app)
old_loop = asyncio.get_event_loop()
asyncio.set_event_loop(loop)
try:
yield
yield loop
finally:
asyncio.set_event_loop_policy(old_policy)
loop.close()
asyncio.set_event_loop(old_loop)


def run(main, *args, **kwargs):
"""
Run the given coroutine using a QEventLoop without setting a global policy.
"""
with _use_qeventloop() as loop:
return loop.run_until_complete(main(*args, **kwargs))

def run(*args, **kwargs):
with _set_event_loop_policy(DefaultQEventLoopPolicy()):
return asyncio.run(*args, **kwargs)