Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions ipcmagic/magics.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class IPClusterMagics(Magics):
-v --version Show version.
-n --num_engines <int> Number of engines
--launcher <str> Job launcher (mpirun | srun | local)
--profile <str> The IPython profile to use
--dask Create a dask.distributed cluster
"""

Expand Down Expand Up @@ -51,6 +52,7 @@ def parse_args(self, line):
'start': None,
'stop': None,
'launcher': 'srun',
'profile': 'default',
'dask': False
}

Expand All @@ -76,7 +78,8 @@ def parse_args(self, line):

def _wait_for_cluster_start(self, timeout=60):
try:
self.client = ipp.Client(timeout=timeout)
self.client = ipp.Client(profile=self.args.profile,
timeout=timeout)
except ipp.TimeoutError:
self.stop_cluster()
print('The connection request to the cluster controller '
Expand All @@ -95,20 +98,24 @@ def _wait_for_cluster_start(self, timeout=60):
return

def _launch_engines_local(self):
self.controller = run_command_async('ipcontroller --log-to-file')
self.controller = run_command_async(
f'ipcontroller --log-to-file --profile={self.args.profile}'
)
# Let some seconds pass between launching the ipcontroller
# and launching the ipengines. Otherwise the controller
# might not notice that the engines have started.
time.sleep(3)
self.engines = [run_command_async('ipengine --log-to-file')
self.engines = [run_command_async('ipengine --log-to-file '
f'--profile={self.args.profile}')
for i in range(int(self.args.num_engines))]
time.sleep(1)
self.running = True
self._wait_for_cluster_start(timeout=60)

def _launch_engines_mpi(self):
self.controller = run_command_async('ipcontroller --ip="*" '
'--log-to-file')
'--log-to-file '
f'--profile={self.args.profile}')
# Let some seconds pass between launching the ipcontroller
# and launching the ipengines. Otherwise the controller
# might not notice that the engines have started.
Expand All @@ -122,7 +129,8 @@ def _launch_engines_mpi(self):
try:
self.engines = run_command_async(
f'{self.args.launcher} {np_opt} {self.args.num_engines} ipengine ' # noqa: E501
f'--location={hostname} --log-to-file')
f'--location={hostname} --log-to-file '
f'--profile={self.args.profile}')
except FileNotFoundError:
print(f'Launcher not supported in this system: '
f'{self.args.launcher}')
Expand Down