Skip to content

Commit b41308e

Browse files
PawelPlesniakemmuhamm
authored andcommitted
Merge pull request #941 from DUNE-DAQ/emmuhamm/fix-lcs
Add multi host and multi user support to the process manager
2 parents a4dbd2c + 58fc2ce commit b41308e

11 files changed

Lines changed: 160 additions & 40 deletions

File tree

src/drunc/controller/controller.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import multiprocessing
2+
import os
3+
import socket
24
import threading
35
import time
46
from concurrent.futures import ThreadPoolExecutor
@@ -129,9 +131,17 @@ def __init__(self, configuration, name: str, session: str, token: Token):
129131
self.connectivity_service_thread = None
130132
self.uri = ""
131133
if self.configuration.session.connectivity_service:
132-
connection_server = self.configuration.session.connectivity_service.host
133-
connection_port = (
134-
self.configuration.session.connectivity_service.service.port
134+
# Remaps the localhost into the correct server
135+
# and also grabs the correct port from the right environment from the config
136+
137+
connection_server_host = (
138+
self.configuration.session.connectivity_service.host
139+
)
140+
connection_port = os.getenv("CONNECTION_PORT")
141+
connection_server = (
142+
socket.gethostname()
143+
if connection_server_host == "localhost"
144+
else connection_server_host
135145
)
136146
log_init.info(
137147
f"Connectivity server {connection_server}:{connection_port} is enabled"
@@ -326,6 +336,10 @@ def advertise_control_address(self, address):
326336
if not self.connectivity_service:
327337
return
328338

339+
if not self.connectivity_service.is_ready(timeout=10):
340+
raise ValueError(
341+
"Connectivity service unavailable for control address advertising."
342+
)
329343
self.log.info(
330344
f"Registering {self.name} ({address}) to the connectivity service at {self.connectivity_service.address}"
331345
)

src/drunc/process_manager/configuration.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from drunc.exceptions import DruncCommandException
1616
from drunc.process_manager.exceptions import UnknownProcessManagerType
1717
from drunc.utils.configuration import ConfHandler
18-
from drunc.utils.utils import get_logger
18+
from drunc.utils.utils import get_logger, touch_and_chmod
1919

2020
if TYPE_CHECKING:
2121
import conffwk
@@ -104,6 +104,9 @@ def _parse_dict(self, data):
104104
self.opmon_conf,
105105
)
106106
else:
107+
# ensures users can access the opmon files (permissions)
108+
# This is for the PM opmon file
109+
touch_and_chmod(self.opmon_conf.path)
107110
new_data.opmon_publisher = OpMonPublisher(
108111
conf=self.opmon_conf, rich_handler=True
109112
)

src/drunc/process_manager/interface/commands.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from drunc.process_manager.interface.context import ProcessManagerContext
1414
from drunc.process_manager.utils import tabulate_process_instance_list
1515
from drunc.utils.shell_utils import InterruptedCommand, log_pm_cmd
16-
from drunc.utils.utils import get_logger
16+
from drunc.utils.utils import get_logger, resolve_context_peer
1717

1818

1919
@click.command("boot")
@@ -45,11 +45,14 @@ def boot(
4545
) -> None:
4646
log = get_logger("process_manager.shell")
4747
log_pm_cmd(obj)
48-
processes = obj.get_driver("process_manager").ps(ProcessQuery(user=user))
48+
processes = obj.get_driver("process_manager").ps(
49+
ProcessQuery(user=user, session=session_name)
50+
)
4951

52+
# The run control will validate this in the session manager in the future
5053
if len(processes.values) > 0:
5154
click.confirm(
52-
f"You already have {len(processes.values)} processes running, are you sure you want to boot a session?",
55+
f"You already have {len(processes.values)} processes running for {session_name}, are you sure you want to boot a session?",
5356
abort=True,
5457
)
5558

@@ -78,6 +81,7 @@ def boot(
7881
raise e
7982

8083
controller_address = obj.get_driver("process_manager").controller_address
84+
controller_address = resolve_context_peer(controller_address)
8185
if controller_address:
8286
obj.print(
8387
Panel(
@@ -286,7 +290,7 @@ def logs_impl(
286290
log = get_logger("process_manager.shell")
287291
# TODO: MOVE BACK TO DEBUG BEFORE MERGE
288292
# THIS IS USEFUL FOR TESTING THOUGH
289-
log.error(f"Running logs with query {query}")
293+
log.debug(f"Running logs with query {query}")
290294
log_req = LogRequest(
291295
how_far=how_far,
292296
query=query,

src/drunc/process_manager/process_manager_driver.py

Lines changed: 51 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from collections.abc import Iterator
88
from time import sleep
99
from typing import Dict, List
10+
from urllib.parse import urlparse
1011

1112
import conffwk
1213
import grpc
@@ -49,6 +50,7 @@
4950
resolve_localhost_and_127_ip_to_network_ip,
5051
resolve_localhost_to_hostname,
5152
strip_non_drunc_loggers,
53+
touch_and_chmod,
5254
)
5355

5456

@@ -141,6 +143,9 @@ def boot(
141143
# Step 3 - check for port conflicts and update configuration/DAL as needed
142144
db, session_dal = self.check_port_conflicts(db, session_dal)
143145

146+
# step 3.5 update localhost mapping
147+
session_dal = self.resolve_localhost(session_dal)
148+
144149
# Step 4 - connect to the connection service
145150
csc, connection_server, connection_port = self._connect_to_service(
146151
session_dal, session_name
@@ -190,6 +195,24 @@ def boot(
190195
previous_host = this_host
191196
last_boot_on_host_at[this_host] = time.time()
192197

198+
# ensures users can access the opmon files (permissions)
199+
# This is for the opmon files of the apps
200+
201+
if session_dal.opmon_uri.type == "file":
202+
# For future, this should probably be taken from the metadata
203+
opmon_file = (
204+
f"{request.process_description.process_execution_directory}/info."
205+
+ request.process_description.metadata.session
206+
+ "."
207+
+ request.process_description.metadata.name
208+
+ ".json"
209+
)
210+
211+
self.log.debug(
212+
f"Touching and changing permissions for {opmon_file} because opmon is of type {session_dal.opmon_uri.type}"
213+
)
214+
touch_and_chmod(opmon_file)
215+
193216
try:
194217
response = self.stub.boot(request, timeout=timeout)
195218
yield response
@@ -287,7 +310,11 @@ def _build_boot_request(
287310
override_logs: bool,
288311
pwd: str,
289312
) -> BootRequest:
290-
host = format_hostname(app["restriction"])
313+
# Run mapping to physical hostname to enable multi host usage
314+
host = resolve_localhost_to_hostname(format_hostname(app["restriction"]))
315+
self.log.info(f"boot resolve {host}") # keep this until big PR gets merged
316+
317+
# this is one of the two minimal changes needed to get this working in general?
291318
name = app["name"]
292319
exe = app["type"]
293320
args = app["args"]
@@ -403,19 +430,29 @@ def _consolidate_config(self, session_name, conf_file: str) -> str | None:
403430
)
404431
return
405432

406-
def update_connectivity_port_dal(
407-
self,
408-
env_variables: list["conffwk.dal.Variable | conffwk.dal.VariableSet"],
409-
new_port: int,
410-
) -> None:
411-
"""Process a dal::Variable object, placing key/value pairs in a dictionary"""
412-
for item in env_variables:
413-
if item.className() == "VariableSet":
414-
self.update_connectivity_port_dal(item.contains, new_port)
415-
else:
416-
if item.className() == "Variable":
417-
if item.name == "CONNECTION_PORT":
418-
item.value = new_port
433+
def resolve_localhost(self, session_dal):
434+
def dal_localhost_mapping(dal_host: str):
435+
if dal_host != "localhost":
436+
return dal_host
437+
438+
resolved_address = resolve_localhost_to_hostname(dal_host)
439+
if "://" not in resolved_address:
440+
resolved_address = "grpc://" + resolved_address
441+
442+
resolved_server = urlparse(resolved_address).hostname
443+
self.log.debug(
444+
f"Resolved connection server 'localhost' to '{resolved_server}' to avoid K8s hairpinning."
445+
)
446+
return resolved_server
447+
448+
session_dal.connectivity_service.host = dal_localhost_mapping(
449+
session_dal.connectivity_service.host
450+
)
451+
session_dal.segment.controller.runs_on.runs_on.id = dal_localhost_mapping(
452+
session_dal.segment.controller.runs_on.runs_on.id
453+
)
454+
455+
return session_dal
419456

420457
def check_port_conflicts(
421458
self, db: conffwk.Configuration, session_dal: "conffwk.dal.Session"
@@ -545,13 +582,6 @@ def _connect_to_service(
545582
connection_server = session_dal.connectivity_service.host
546583
connection_port = session_dal.connectivity_service.service.port
547584

548-
if connection_server == "localhost":
549-
resolved_server = resolve_localhost_to_hostname(connection_server)
550-
self.log.debug(
551-
f"Resolved connection server 'localhost' to '{resolved_server}' to avoid K8s hairpinning."
552-
)
553-
connection_server = resolved_server
554-
555585
client = ConnectivityServiceClient(
556586
session_name, f"{connection_server}:{connection_port}"
557587
)

src/drunc/process_manager/ssh_process_manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ def __boot(self, boot_request: BootRequest, uuid: str) -> ProcessInstance:
390390
self.log.info(
391391
f"Booted '{boot_request.process_description.metadata.name}' "
392392
f"from session '{boot_request.process_description.metadata.session}' "
393-
f"with UUID {uuid}"
393+
f"with UUID {uuid} in host '{hostname}"
394394
)
395395

396396
# Query current process status

src/drunc/processes/ssh_process_lifetime_manager_shell.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1191,6 +1191,7 @@ def _execute_bootrequest_via_ssh(
11911191

11921192
remote_cmd += (
11931193
f"mkdir -p ${{XDG_RUNTIME_DIR:-/tmp}}/drunc ; "
1194+
f"rm {log_file}; " # delete log file so no issues on ovewriting in th next line
11941195
f"{command} &> {log_file} & PID=$! ; "
11951196
f"trap 'kill -HUP $PID 2>/dev/null || true; wait $PID 2>/dev/null || true' HUP TERM INT QUIT ; "
11961197
f"echo '{remote_metadata_json}' > {metadata_file} ; "
@@ -1200,6 +1201,39 @@ def _execute_bootrequest_via_ssh(
12001201
arguments = self._build_ssh_arguments(hostname, user_host)
12011202
arguments.append(remote_cmd)
12021203

1204+
# Test access to CMD
1205+
cd_path = f"{boot_request.process_description.process_execution_directory}"
1206+
touch_cmd = [
1207+
arguments[0], # assume first arg is username@host
1208+
f"touch {cd_path}/.write_test && rm {cd_path}/.write_test",
1209+
]
1210+
self.log.debug(f"running {touch_cmd} for CMD access test")
1211+
try:
1212+
access = self.ssh(
1213+
*touch_cmd,
1214+
_out=self.log.warning,
1215+
_err=self.log.error,
1216+
_bg=True,
1217+
_bg_exc=False,
1218+
_new_session=True,
1219+
_preexec_fn=on_parent_exit(signal.SIGTERM)
1220+
if not is_macos
1221+
else None,
1222+
)
1223+
1224+
access.wait()
1225+
if access.exit_code != 0:
1226+
raise RuntimeError("SSH error fails to finish successfully")
1227+
except Exception as e:
1228+
err_msg = (
1229+
f"No access to {cd_path}"
1230+
"for multiusers to work, the above path needs elevated permissions for"
1231+
" the PM superuser to cd and write into. "
1232+
"Please change the permissions to allow for this."
1233+
)
1234+
self.log.error(err_msg)
1235+
raise RuntimeError from e
1236+
12031237
process = self.ssh(
12041238
*arguments,
12051239
_out=self.log.debug,

src/drunc/unified_shell/commands.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,13 @@ def boot(
5858
override_logs_boot = obj.override_logs
5959
else:
6060
override_logs_boot = override_logs
61-
# if len(processes.values) > 0:
62-
# log.error(
63-
# f"Cannot boot: session {session_name} already has {len(processes.values)} processes running. "
64-
# "Please terminate the existing session first."
65-
# )
66-
# return
61+
# The run control will validate this in the session manager in the future
62+
if len(processes.values) > 0:
63+
log.error(
64+
f"Cannot boot: session {session_name} already has {len(processes.values)} processes running. "
65+
"Please terminate the existing session first."
66+
)
67+
return
6768

6869
try:
6970
results = obj.get_driver("process_manager").boot(

src/drunc/utils/utils.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,18 @@ def parent_death_pact(signal: int = signal.SIGHUP) -> None:
364364
raise Exception("prctl() returned nonzero retcode %d" % retcode)
365365

366366

367+
# 777 PERMISSIONS ARE COMPLETELY TEMPORARY
368+
# An established procedure for multi users will need to be discussed with sysadmins
369+
# will be removed when done
370+
def touch_and_chmod(filepath: str, mode=0o777):
371+
"""Makes and sets the permissions of a file.
372+
This is used to ensure multiuser support when accessing files etc."""
373+
374+
with open(filepath, "a"):
375+
os.utime(filepath, None)
376+
os.chmod(filepath, mode)
377+
378+
367379
class IncorrectAddress(DruncException):
368380
"""Exception raised when an address is invalid."""
369381

@@ -724,6 +736,18 @@ def resolve_context_peer(peer: str) -> str:
724736
str: The original peer string, or a resolved ``host:port`` representation.
725737
"""
726738

739+
if not peer:
740+
return peer
741+
742+
# Some callers pass a plain host:port string without a transport prefix.
743+
# Handle those directly instead of assuming the first token is always a transport.
744+
if peer.startswith("[") or peer.count(":") == 1:
745+
parsed = _parse_host_port(peer)
746+
if parsed is not None:
747+
host, port = parsed
748+
resolved_host = _resolve_host(host)
749+
return f"{resolved_host}:{port}"
750+
727751
match = re.match(r"^(?P<transport>[^:]+):(?P<address>.+)$", peer)
728752
if not match:
729753
return peer

tests/process_manager/conftest.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
to be back in line with druncschema definitions.
88
"""
99

10+
import socket
11+
1012
import google.protobuf.any_pb2
1113
import pytest
1214
from druncschema.description_pb2 import Description
@@ -36,7 +38,7 @@ def app_data():
3638
Provides a mock application dictionary with required keys.
3739
"""
3840
return {
39-
"restriction": "localhost",
41+
"restriction": socket.gethostname(),
4042
"name": "TestApp",
4143
"type": "binary",
4244
"args": ["--arg1"],
@@ -55,7 +57,7 @@ def bootrequest(app_data):
5557
user="test_user",
5658
session="session1",
5759
name=app_data["name"],
58-
hostname="localhost",
60+
hostname=socket.gethostname(),
5961
tree_id=app_data["tree_id"],
6062
),
6163
executable_and_arguments=[{"exec": "binary", "args": ["--arg1"]}],

tests/process_manager/interface/test_commands.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,8 +237,9 @@ def test_boot_exiting_processes_abort(boot_arguments):
237237
# check that 'boot' was never called
238238
mock_driver.boot.assert_not_called()
239239

240+
# conf-id-123 from session name in this file
240241
assert (
241-
"You already have 2 processes running, are you sure you want to boot a session?"
242+
"You already have 2 processes running for conf-id-123, are you sure you want to boot a session?"
242243
in result.output
243244
)
244245

@@ -267,7 +268,7 @@ def test_boot_exiting_processes_user_confirm(boot_arguments):
267268
mock_driver.boot.assert_called()
268269

269270
assert (
270-
"You already have 2 processes running, are you sure you want to boot a session?"
271+
"You already have 2 processes running for conf-id-123, are you sure you want to boot a session?"
271272
in result.output
272273
)
273274

0 commit comments

Comments
 (0)