Skip to content

Commit cfc50cf

Browse files
author
Nathan Lee
committed
Revised signal implementation and added PULSE for determining if a duplicate job is running.
1 parent f4b9af9 commit cfc50cf

4 files changed

Lines changed: 78 additions & 26 deletions

File tree

pyrunner/core/config.py

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
from subprocess import Popen, PIPE
2020
from collections import deque
2121

22-
2322
class Config:
2423
"""
2524
Captures framework-level configurations.
@@ -165,6 +164,8 @@ def __setitem__(self, key, value):
165164
self._attr[key]['value'] = True
166165
else:
167166
self._attr[key]['value'] = self._attr[key]['type'](value)
167+
elif key == 'tickrate' and value < 1:
168+
self._attr['tickrate']['value'] = 1
168169
else:
169170
self._attr[key]['value'] = self._attr[key]['type'](value)
170171

@@ -238,17 +239,6 @@ def ctx_file(self):
238239
else:
239240
return '{}/{}.ctx'.format(self['temp_dir'], self['app_name'])
240241

241-
@property
242-
def abort_sig_file(self):
243-
"""
244-
Path/filename of job's .sig file.
245-
This file should only appear in case any signal is to be sent to a running job.
246-
"""
247-
if not self['temp_dir'] or not self['app_name']:
248-
return None
249-
else:
250-
return '{}/.{}.sig.abort'.format(self['temp_dir'], self['app_name'])
251-
252242
def source_config_file(self, config_file):
253243
"""
254244
Sources config file to export environment variables.

pyrunner/core/engine.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import pyrunner.core.constants as constants
1818
from pyrunner.core.config import Config
1919
from pyrunner.core.context import Context
20+
from pyrunner.core.signal import SignalHandler, SIG_ABORT, SIG_PAUSE
2021
from multiprocessing import Manager
2122

2223
import os, sys, glob
@@ -45,6 +46,7 @@ def __init__(self):
4546
def initiate(self, **kwargs):
4647
"""Begins the execution loop."""
4748

49+
signal_handler = SignalHandler(self.config)
4850
sys.path.append(self.config['worker_dir'])
4951
self.start_time = time.time()
5052
wait_interval = 1.0/self.config['tickrate'] if self.config['tickrate'] > 0 else 0
@@ -56,9 +58,10 @@ def initiate(self, **kwargs):
5658
# Execution loop
5759
try:
5860
while self.register.running_nodes or self.register.pending_nodes:
61+
sig_set = signal_handler.consume()
5962

6063
# Check for abort signals
61-
if os.path.exists(self.config.abort_sig_file):
64+
if SIG_ABORT in sig_set:
6265
print('ABORT signal received! Terminating all running Workers.')
6366
self._abort_all_workers()
6467
return -1

pyrunner/core/pyrunner.py

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from pyrunner.core.engine import ExecutionEngine
3030
from pyrunner.core.config import Config
3131
from pyrunner.core.register import NodeRegister
32+
from pyrunner.core.signal import SignalHandler, SIG_ABORT, SIG_PAUSE, SIG_PULSE
3233
from pyrunner.version import __version__
3334

3435
from datetime import datetime as datetime
@@ -41,23 +42,25 @@ def __init__(self, **kwargs):
4142
self._environ = os.environ.copy()
4243
self.config = Config()
4344
self.notification = notification.EmailNotification()
45+
self.signal_handler = SignalHandler(self.config)
4446

4547
self.serde_obj = serde.ListSerDe()
4648
self.register = NodeRegister()
4749
self.engine = ExecutionEngine()
4850

4951
self._init_params = {
50-
'config_file' : kwargs.get('config_file'),
51-
'proc_file' : kwargs.get('proc_file'),
52-
'restart' : kwargs.get('restart', False),
53-
'cvar_list' : [],
54-
'exec_proc_name' : None,
55-
'exec_only_list' : [],
56-
'exec_disable_list' : [],
57-
'exec_from_id' : None,
58-
'exec_to_id' : None
52+
'config_file' : kwargs.get('config_file'),
53+
'proc_file' : kwargs.get('proc_file'),
54+
'restart' : kwargs.get('restart', False),
55+
'cvar_list' : [],
56+
'exec_proc_name' : None,
57+
'exec_only_list' : [],
58+
'exec_disable_list' : [],
59+
'exec_from_id' : None,
60+
'exec_to_id' : None
5961
}
6062

63+
# Lifecycle hooks
6164
self._on_create_func = None
6265
self._on_start_func = None
6366
self._on_restart_func = None
@@ -66,11 +69,23 @@ def __init__(self, **kwargs):
6669
self._on_destroy_func = None
6770

6871
self.parse_args()
72+
73+
if self.dup_proc_is_running():
74+
raise OSError('Another process for "{}" is already running!'.format(self.config['app_name']))
6975

7076
def reset_env(self):
7177
os.environ.clear()
7278
os.environ.update(self._environ)
7379

80+
def dup_proc_is_running(self):
81+
self.signal_handler.emit(SIG_PULSE)
82+
time.sleep(1.1)
83+
if SIG_PULSE not in self.signal_handler.peek():
84+
print(self.signal_handler.peek())
85+
return True
86+
else:
87+
return False
88+
7489
def load_proc_file(self, proc_file, restart=False):
7590
if not proc_file or not os.path.isfile(proc_file):
7691
return False
@@ -367,7 +382,7 @@ def parse_args(self):
367382
longopt_list = [
368383
'setup', 'help', 'nozip', 'interactive', 'abort',
369384
'restart', 'version', 'dryrun', 'debug',
370-
'preserve-context', 'dump-logs', 'disable-exclusive-jobs',
385+
'preserve-context', 'dump-logs', 'allow-duplicate-jobs',
371386
'email=', 'email-on-fail=', 'email-on-success=', 'ef=', 'es=',
372387
'env=', 'cvar=', 'context=',
373388
'to=', 'from=', 'descendants=', 'ancestors=',
@@ -424,8 +439,8 @@ def parse_args(self):
424439
self.config['tickrate'] = int(arg)
425440
elif opt in ['--preserve-context']:
426441
self.preserve_context = True
427-
elif opt in ['--disable-exclusive-jobs']:
428-
self.disable_exclusive_jobs = True
442+
elif opt in ['--allow-duplicate-jobs']:
443+
self._init_params['allow_duplicate_jobs'] = True
429444
elif opt in ['--exec-proc-name']:
430445
self._init_params['exec_proc_name'] = arg
431446
elif opt == '--abort':
@@ -452,7 +467,7 @@ def parse_args(self):
452467

453468
if abort:
454469
print('Submitting ABORT signal to running job for: {}'.format(self.config['app_name']))
455-
open(self.config.abort_sig_file, 'a').close()
470+
self.signal_handler.emit(SIG_ABORT)
456471
sys.exit(0)
457472

458473
# Check if restart is possible (ctllog/ctx files exist)

pyrunner/core/signal.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
# Copyright 2019 Comcast Cable Communications Management, LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
#
15+
# SPDX-License-Identifier: Apache-2.0
16+
17+
import os
18+
19+
SIG_ABORT = 'sig.abort'
20+
SIG_PAUSE = 'sig.pause'
21+
SIG_PULSE = 'sig.pulse'
22+
23+
_valid_signals = (SIG_ABORT, SIG_PAUSE, SIG_PULSE)
24+
25+
class SignalHandler:
26+
27+
def __init__(self, config):
28+
self.config = config
29+
30+
def sig_file(self, sig):
31+
return '{}/.{}.{}'.format(self.config['temp_dir'], self.config['app_name'], sig)
32+
33+
def emit(self, sig):
34+
if sig not in _valid_signals: return ValueError('Unknown signal type: {}'.format(sig))
35+
open(self.sig_file(sig), 'a').close()
36+
37+
def consume(self):
38+
sig_set = self.peek()
39+
for sig in sig_set:
40+
os.remove(self.sig_file(sig))
41+
return sig_set
42+
43+
def peek(self):
44+
return set([ s for s in _valid_signals if os.path.exists(self.sig_file(s)) ])

0 commit comments

Comments
 (0)