Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 27 additions & 5 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,14 +223,24 @@ class ClientState:
#: Output of :func:`distributed.versions.get_versions` on the client
versions: dict[str, Any]

#: Remote address of the client connection as seen by the scheduler.
address: str | None

__slots__ = tuple(__annotations__)

def __init__(self, client: str, *, versions: dict[str, Any] | None = None):
def __init__(
self,
client: str,
*,
versions: dict[str, Any] | None = None,
address: str | None = None,
):
self.client_key = client
self._hash = hash(client)
self.wants_what = set()
self.last_seen = time()
self.versions = versions or {}
self.address = address

def __hash__(self) -> int:
return self._hash
Expand Down Expand Up @@ -5907,9 +5917,12 @@ async def add_client(
"""
assert client is not None
comm.name = "Scheduler->Client"
logger.info("Receive client connection: %s", client)
client_address = comm.peer_address
logger.info("Receive client connection: %s at %s", client, client_address)
self.log_event(["all", client], {"action": "add-client", "client": client})
self.clients[client] = ClientState(client, versions=versions)
self.clients[client] = ClientState(
client, versions=versions, address=client_address
)
self._client_connections_added_total += 1

for plugin in list(self.plugins.values()):
Expand Down Expand Up @@ -5944,15 +5957,24 @@ async def add_client(
await self.client_comms[client].close()
del self.client_comms[client]
if self.status == Status.running:
logger.info("Close client connection: %s", client)
logger.info(
"Close client connection: %s at %s",
client,
client_address,
)
except TypeError: # comm becomes None during GC
pass

def remove_client(self, client: str, stimulus_id: str | None = None) -> None:
"""Remove client from network"""
stimulus_id = stimulus_id or f"remove-client-{time()}"
client_state = self.clients.get(client)
client_address = client_state.address if client_state is not None else None
if self.status == Status.running:
logger.info("Remove client %s", client)
if client_address is not None:
logger.info("Remove client %s at %s", client, client_address)
else:
logger.info("Remove client %s", client)
self.log_event(["all", client], {"action": "remove-client", "client": client})
try:
cs: ClientState = self.clients[client]
Expand Down
12 changes: 12 additions & 0 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,18 @@ async def test_clear_events_client_removal(c, s, a, b):
assert time() < start + 2


@gen_cluster(nthreads=[])
async def test_client_connection_logs_include_address(s):
with captured_logger("distributed.scheduler", level=logging.INFO) as caplog:
async with Client(s.address, asynchronous=True) as c:
client_id = c.id

logs = caplog.getvalue()
assert f"Receive client connection: {client_id} at " in logs
assert f"Remove client {client_id} at " in logs
assert f"Close client connection: {client_id} at " in logs


@gen_cluster(client=True, nthreads=[])
async def test_add_worker(c, s):
x = c.submit(inc, 1, key="x")
Expand Down