Skip to content

Commit 57b5175

Browse files
authored
Asyncio Client Fixes (1) (#777)
This PR fixes the following issues with the asyncio and asyncore clients: 1. Create tasks for conn.close_connection calls. 2. Cluster `get_member` must use the correct variable 3. Fix the deadlock in asyncio listener 4. Wrong retry methods in VectorCollection 5. Remaining retries in schema replicaiton 6. Blocking cloud discovery refresh
1 parent 132494d commit 57b5175

9 files changed

Lines changed: 67 additions & 56 deletions

File tree

hazelcast/cluster.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ def start(self, connection_manager, membership_listeners):
140140
self.add_listener(*listener)
141141

142142
def get_member(self, member_uuid):
143-
check_not_none(uuid, "UUID must not be null")
143+
check_not_none(member_uuid, "UUID must not be null")
144144
snapshot = self._member_list_snapshot
145145
return snapshot.members.get(member_uuid, None)
146146

hazelcast/internal/asyncio_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class HazelcastClient:
6464
6565
from hazelcast.asyncio import HazelcastClient
6666
67-
client = await HazelcastClient.crate_and_start(
67+
client = await HazelcastClient.create_and_start(
6868
cluster_name="a-cluster",
6969
)
7070

hazelcast/internal/asyncio_cluster.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -134,14 +134,17 @@ def __init__(self, client, config):
134134
self._listeners = {}
135135
self._member_list_snapshot = _EMPTY_SNAPSHOT
136136
self._initial_list_fetched = asyncio.Event()
137+
# asyncio tasks are weakly referenced; keep strong refs until they finish.
138+
# see: https://docs.python.org/3/library/asyncio-task.html#creating-tasks
139+
self._close_tasks: typing.Set[asyncio.Task] = set()
137140

138141
def start(self, connection_manager, membership_listeners):
139142
self._connection_manager = connection_manager
140143
for listener in membership_listeners:
141144
self.add_listener(*listener)
142145

143146
def get_member(self, member_uuid):
144-
check_not_none(uuid, "UUID must not be null")
147+
check_not_none(member_uuid, "UUID must not be null")
145148
snapshot = self._member_list_snapshot
146149
return snapshot.members.get(member_uuid, None)
147150

@@ -282,14 +285,18 @@ def _detect_membership_events(self, previous_members, current_members):
282285
for dead_member in dead_members:
283286
connection = self._connection_manager.get_connection(dead_member.uuid)
284287
if connection:
285-
connection.close_connection(
286-
None,
287-
TargetDisconnectedError(
288-
"The client has closed the connection to this member, "
289-
"after receiving a member left event from the cluster. "
290-
"%s" % connection
291-
),
288+
task = asyncio.create_task(
289+
connection.close_connection(
290+
None,
291+
TargetDisconnectedError(
292+
"The client has closed the connection to this member, "
293+
"after receiving a member left event from the cluster. "
294+
"%s" % connection
295+
),
296+
)
292297
)
298+
self._close_tasks.add(task)
299+
task.add_done_callback(self._close_tasks.discard)
293300

294301
if (len(new_members) + len(dead_members)) > 0:
295302
if len(current_members) > 0:

hazelcast/internal/asyncio_compact.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ async def _replicate_schema(
102102
# is not known to be replicated yet. We should retry
103103
# sending it in a random member.
104104
await asyncio.sleep(self._invocation_retry_pause)
105+
remaining_retries -= 1
105106

106107
# We tried to send it a couple of times, but the member list
107108
# in our local and the member list returned by the initiator

hazelcast/internal/asyncio_discovery.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,6 @@ async def translate(self, address):
5353
async def refresh(self):
5454
"""Refreshes the internal lookup table if necessary."""
5555
try:
56-
self._private_to_public = self.cloud_discovery.discover_nodes()
56+
self._private_to_public = await asyncio.to_thread(self.cloud_discovery.discover_nodes)
5757
except Exception as e:
5858
_logger.warning("Failed to load addresses from Hazelcast Cloud: %s", e)

hazelcast/internal/asyncio_listener.py

Lines changed: 40 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -82,51 +82,54 @@ async def register_listener(
8282
tg.create_task(task)
8383
return registration_id
8484
except Exception:
85-
await self.deregister_listener(registration_id)
85+
await self._deregister_listener_unsafe(registration_id)
8686
raise HazelcastError("Listener cannot be added")
8787

8888
async def deregister_listener(self, user_registration_id):
8989
check_not_none(user_registration_id, "None user_registration_id is not allowed!")
9090
async with self._registration_lock:
91-
listener_registration = self._active_registrations.pop(user_registration_id, None)
92-
if not listener_registration:
93-
return False
94-
95-
async def handle(inv: Invocation, conn: AsyncioConnection):
96-
try:
97-
await inv.future
98-
except Exception as e:
99-
if not isinstance(
100-
e, (HazelcastClientNotActiveError, IOError, TargetDisconnectedError)
101-
):
102-
_logger.warning(
103-
"Deregistration of listener with ID %s has failed for address %s: %s",
104-
user_registration_id,
105-
conn.remote_address,
106-
e,
107-
)
91+
return await self._deregister_listener_unsafe(user_registration_id)
10892

109-
async with asyncio.TaskGroup() as tg:
110-
items = listener_registration.connection_registrations.items()
111-
for connection, event_registration in items:
112-
# Remove local handler
113-
self.remove_event_handler(event_registration.correlation_id)
114-
# The rest is for deleting the remote registration
115-
server_registration_id = event_registration.server_registration_id
116-
deregister_request = listener_registration.encode_deregister_request(
117-
server_registration_id
118-
)
119-
if deregister_request is None:
120-
# None means no remote registration (e.g. for backup acks)
121-
continue
122-
invocation = Invocation(
123-
deregister_request, connection=connection, timeout=sys.maxsize, urgent=True
93+
async def _deregister_listener_unsafe(self, user_registration_id):
94+
listener_registration = self._active_registrations.pop(user_registration_id, None)
95+
if not listener_registration:
96+
return False
97+
98+
async def handle(inv: Invocation, conn: AsyncioConnection):
99+
try:
100+
await inv.future
101+
except Exception as e:
102+
if not isinstance(
103+
e, (HazelcastClientNotActiveError, IOError, TargetDisconnectedError)
104+
):
105+
_logger.warning(
106+
"Deregistration of listener with ID %s has failed for address %s: %s",
107+
user_registration_id,
108+
conn.remote_address,
109+
e,
124110
)
125-
self._invocation_service.invoke(invocation)
126-
tg.create_task(handle(invocation, connection))
127111

128-
listener_registration.connection_registrations.clear()
129-
return True
112+
async with asyncio.TaskGroup() as tg:
113+
items = listener_registration.connection_registrations.items()
114+
for connection, event_registration in items:
115+
# Remove local handler
116+
self.remove_event_handler(event_registration.correlation_id)
117+
# The rest is for deleting the remote registration
118+
server_registration_id = event_registration.server_registration_id
119+
deregister_request = listener_registration.encode_deregister_request(
120+
server_registration_id
121+
)
122+
if deregister_request is None:
123+
# None means no remote registration (e.g. for backup acks)
124+
continue
125+
invocation = Invocation(
126+
deregister_request, connection=connection, timeout=sys.maxsize, urgent=True
127+
)
128+
self._invocation_service.invoke(invocation)
129+
tg.create_task(handle(invocation, connection))
130+
131+
listener_registration.connection_registrations.clear()
132+
return True
130133

131134
def handle_client_message(self, message: InboundMessage, correlation_id: int):
132135
handler = self._event_handlers.get(correlation_id, None)

hazelcast/internal/asyncio_proxy/vector_collection.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ def handler(message):
390390
key_data = self._to_data(key)
391391
value_data = self._to_data(document.value)
392392
except SchemaNotReplicatedError as e:
393-
return await self._send_schema_and_retry(e, self.set, key, document)
393+
return await self._send_schema_and_retry(e, self.put, key, document)
394394
document = copy.copy(document)
395395
document.value = value_data
396396
request = vector_collection_put_codec.encode_request(
@@ -409,7 +409,7 @@ def handler(message):
409409
key_data = self._to_data(key)
410410
value_data = self._to_data(document.value)
411411
except SchemaNotReplicatedError as e:
412-
return await self._send_schema_and_retry(e, self.set, key, document)
412+
return await self._send_schema_and_retry(e, self.put_if_absent, key, document)
413413
document.value = value_data
414414
request = vector_collection_put_if_absent_codec.encode_request(
415415
self.name,

hazelcast/internal/asyncio_statistics.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ def _add_system_and_process_metrics(self, attributes, compressor):
182182
self._add_system_or_process_metric(
183183
attributes, compressor, gauge_name, value, value_type
184184
)
185-
except:
185+
except Exception:
186186
_logger.exception("Error while collecting '%s'.", gauge_name)
187187

188188
if not self._registered_process_gauges:
@@ -197,7 +197,7 @@ def _add_system_and_process_metrics(self, attributes, compressor):
197197
self._add_system_or_process_metric(
198198
attributes, compressor, gauge_name, value, value_type
199199
)
200-
except:
200+
except Exception:
201201
_logger.exception("Error while collecting '%s'.", gauge_name)
202202

203203
def _add_system_or_process_metric(self, attributes, compressor, gauge_name, value, value_type):
@@ -343,7 +343,7 @@ def _add_near_cache_metric(
343343
try:
344344
self._add_metric(compressor, descriptor, value, value_type)
345345
self._add_attribute(attributes, metric, value, nc_name_with_prefix)
346-
except:
346+
except Exception:
347347
_logger.exception(
348348
"Error while collecting %s metric for near cache '%s'.", metric, nc_name
349349
)
@@ -362,7 +362,7 @@ def _add_tcp_metric(
362362
)
363363
try:
364364
self._add_metric(compressor, descriptor, value, value_type)
365-
except:
365+
except Exception:
366366
_logger.exception("Error while collecting '%s.%s'.", _TCP_METRICS_PREFIX, metric)
367367

368368
def _add_metric(self, compressor, descriptor, value, value_type):

hazelcast/proxy/vector_collection.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ def handler(message):
391391
key_data = self._to_data(key)
392392
value_data = self._to_data(document.value)
393393
except SchemaNotReplicatedError as e:
394-
return self._send_schema_and_retry(e, self.set, key, document)
394+
return self._send_schema_and_retry(e, self.put, key, document)
395395
document = copy.copy(document)
396396
document.value = value_data
397397
request = vector_collection_put_codec.encode_request(
@@ -410,7 +410,7 @@ def handler(message):
410410
key_data = self._to_data(key)
411411
value_data = self._to_data(document.value)
412412
except SchemaNotReplicatedError as e:
413-
return self._send_schema_and_retry(e, self.set, key, document)
413+
return self._send_schema_and_retry(e, self.put_if_absent, key, document)
414414
document.value = value_data
415415
request = vector_collection_put_if_absent_codec.encode_request(
416416
self.name,

0 commit comments

Comments
 (0)