22import logging
33import platform
44import threading
5+ from dataclasses import dataclass
56from typing import Awaitable , Dict , Iterator , Optional , Tuple , Union
67
78import scrapy
@@ -104,6 +105,13 @@ async def _get_header_value(
104105 return None
105106
106107
108+ @dataclass
109+ class _QueueItem :
110+ coro : Awaitable
111+ promise : Deferred | asyncio .Future
112+ loop : asyncio .AbstractEventLoop | None = None
113+
114+
107115class _ThreadedLoopAdapter :
108116 """Utility class to start an asyncio event loop in a new thread and redirect coroutines.
109117 This allows to run Playwright in a different loop than the Scrapy crawler, allowing to
@@ -116,32 +124,60 @@ class _ThreadedLoopAdapter:
116124 _stop_events : Dict [int , asyncio .Event ] = {}
117125
118126 @classmethod
119- async def _handle_coro (cls , coro : Awaitable , dfd : Deferred ) -> None :
127+ async def _handle_coro_deferred (cls , queue_item : _QueueItem ) -> None :
120128 from twisted .internet import reactor
121129
130+ dfd : Deferred = queue_item .promise
131+
122132 try :
123- result = await coro
133+ result = await queue_item . coro
124134 except Exception as exc :
125135 reactor .callFromThread (dfd .errback , failure .Failure (exc ))
126136 else :
127137 reactor .callFromThread (dfd .callback , result )
128138
139+ @classmethod
140+ async def _handle_coro_future (cls , queue_item : _QueueItem ) -> None :
141+ future : asyncio .Future = queue_item .promise
142+ loop : asyncio .AbstractEventLoop = queue_item .loop # type: ignore[assignment]
143+ try :
144+ result = await queue_item .coro
145+ except Exception as exc :
146+ loop .call_soon_threadsafe (future .set_exception , exc )
147+ else :
148+ loop .call_soon_threadsafe (future .set_result , result )
149+
129150 @classmethod
130151 async def _process_queue (cls ) -> None :
131152 while any (not ev .is_set () for ev in cls ._stop_events .values ()):
132- coro , dfd = await cls ._coro_queue .get ()
133- asyncio .create_task (cls ._handle_coro (coro , dfd ))
153+ queue_item = await cls ._coro_queue .get ()
154+ if isinstance (queue_item .promise , asyncio .Future ):
155+ asyncio .create_task (cls ._handle_coro_future (queue_item ))
156+ elif isinstance (queue_item .promise , Deferred ):
157+ asyncio .create_task (cls ._handle_coro_deferred (queue_item ))
134158 cls ._coro_queue .task_done ()
135159
136160 @classmethod
137- def _deferred_from_coro (cls , coro ) -> Deferred :
161+ def _deferred_from_coro (cls , coro : Awaitable ) -> Deferred :
138162 dfd : Deferred = Deferred ()
139- asyncio .run_coroutine_threadsafe (cls ._coro_queue .put ((coro , dfd )), cls ._loop )
163+ queue_item = _QueueItem (coro = coro , promise = dfd )
164+ asyncio .run_coroutine_threadsafe (cls ._coro_queue .put (queue_item ), cls ._loop )
140165 return dfd
141166
142167 @classmethod
143- def start (cls , caller_id : int ) -> None :
144- cls ._stop_events [caller_id ] = asyncio .Event ()
168+ def _future_from_coro (cls , coro : Awaitable ) -> asyncio .Future :
169+ target_loop = asyncio .get_running_loop () # Scrapy thread loop
170+ future : asyncio .Future = asyncio .Future ()
171+ queue_item = _QueueItem (coro = coro , promise = future , loop = target_loop )
172+ asyncio .run_coroutine_threadsafe (cls ._coro_queue .put (queue_item ), cls ._loop )
173+ return future
174+
175+ @classmethod
176+ def start (cls , download_handler_id : int ) -> None :
177+ """Start the event loop in a new thread if not already started.
178+ Should be called from the Scrapy thread.
179+ """
180+ cls ._stop_events [download_handler_id ] = asyncio .Event ()
145181 if not getattr (cls , "_loop" , None ):
146182 policy = asyncio .DefaultEventLoopPolicy ()
147183 if platform .system () == "Windows" :
@@ -155,9 +191,11 @@ def start(cls, caller_id: int) -> None:
155191 asyncio .run_coroutine_threadsafe (cls ._process_queue (), cls ._loop )
156192
157193 @classmethod
158- def stop (cls , caller_id : int ) -> None :
159- """Wait until all handlers are closed to stop the event loop and join the thread."""
160- cls ._stop_events [caller_id ].set ()
194+ def stop (cls , download_handler_id : int ) -> None :
195+ """Wait until all handlers are closed to stop the event loop and join the thread.
196+ Should be called from the Scrapy thread.
197+ """
198+ cls ._stop_events [download_handler_id ].set ()
161199 if all (ev .is_set () for ev in cls ._stop_events .values ()):
162200 asyncio .run_coroutine_threadsafe (cls ._coro_queue .join (), cls ._loop )
163201 cls ._loop .call_soon_threadsafe (cls ._loop .stop )
0 commit comments