Skip to content

Commit 43a4619

Browse files
committed
service helper changes test
1 parent 67c74b0 commit 43a4619

1 file changed

Lines changed: 19 additions & 16 deletions

File tree

dev_utils/dev_utils/service_helper.py

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,29 @@ def __init__(
1515
event_loop=None,
1616
executor=None,
1717
):
18-
self._event_loop = event_loop or asyncio.get_event_loop()
18+
# Keep the passed-in loop as a fallback, but prefer the currently running loop when
19+
# scheduling work so we never attach futures to a different loop than the caller.
20+
self._event_loop_override = event_loop
1921
self._executor = executor or concurrent.futures.ThreadPoolExecutor()
2022
self._inner_object = ServiceHelperSync(
2123
iothub_connection_string, eventhub_connection_string, eventhub_consumer_group
2224
)
2325

26+
def _get_event_loop(self):
27+
try:
28+
return asyncio.get_running_loop()
29+
except RuntimeError:
30+
return self._event_loop_override or asyncio.get_event_loop()
31+
32+
async def _run_in_executor(self, func, *args):
33+
loop = self._get_event_loop()
34+
return await loop.run_in_executor(self._executor, func, *args)
35+
2436
def set_identity(self, device_id, module_id):
2537
return self._inner_object.set_identity(device_id, module_id)
2638

2739
async def set_desired_properties(self, desired_props):
28-
return await self._event_loop.run_in_executor(
29-
self._executor,
30-
self._inner_object.set_desired_properties,
31-
desired_props,
32-
)
40+
return await self._run_in_executor(self._inner_object.set_desired_properties, desired_props)
3341

3442
async def invoke_method(
3543
self,
@@ -38,8 +46,7 @@ async def invoke_method(
3846
connect_timeout_in_seconds=None,
3947
response_timeout_in_seconds=None,
4048
):
41-
return await self._event_loop.run_in_executor(
42-
self._executor,
49+
return await self._run_in_executor(
4350
self._inner_object.invoke_method,
4451
method_name,
4552
payload,
@@ -52,25 +59,21 @@ async def send_c2d(
5259
payload,
5360
properties,
5461
):
55-
return await self._event_loop.run_in_executor(
56-
self._executor, self._inner_object.send_c2d, payload, properties
57-
)
62+
return await self._run_in_executor(self._inner_object.send_c2d, payload, properties)
5863

5964
async def wait_for_eventhub_arrival(self, message_id, timeout=60):
60-
return await self._event_loop.run_in_executor(
61-
self._executor,
65+
return await self._run_in_executor(
6266
self._inner_object.wait_for_eventhub_arrival,
6367
message_id,
6468
timeout,
6569
)
6670

6771
async def get_next_reported_patch_arrival(self, block=True, timeout=240):
68-
return await self._event_loop.run_in_executor(
69-
self._executor,
72+
return await self._run_in_executor(
7073
self._inner_object.get_next_reported_patch_arrival,
7174
block,
7275
timeout,
7376
)
7477

7578
async def shutdown(self):
76-
return await self._event_loop.run_in_executor(self._executor, self._inner_object.shutdown)
79+
return await self._run_in_executor(self._inner_object.shutdown)

0 commit comments

Comments
 (0)