-
Notifications
You must be signed in to change notification settings - Fork 383
Expand file tree
/
Copy pathexternal_search_command.py
More file actions
251 lines (200 loc) · 7.95 KB
/
external_search_command.py
File metadata and controls
251 lines (200 loc) · 7.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
# Copyright © 2011-2026 Splunk, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"): you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from logging import getLogger
import os
import sys
import traceback
from . import splunklib_logger as logger
if sys.platform == "win32":
from signal import signal, CTRL_BREAK_EVENT, SIGBREAK, SIGINT, SIGTERM
from subprocess import Popen
import atexit
# P1 [ ] TODO: Add ExternalSearchCommand class documentation
class ExternalSearchCommand:
def __init__(self, path, argv=None, environ=None):
if not isinstance(path, (bytes, str)):
raise ValueError(f"Expected a string value for path, not {repr(path)}")
self._logger = getLogger(self.__class__.__name__)
self._path = str(path)
self._argv = None
self._environ = None
self.argv = argv
self.environ = environ
# region Properties
@property
def argv(self):
return getattr(self, "_argv")
@argv.setter
def argv(self, value):
if not (value is None or isinstance(value, (list, tuple))):
raise ValueError(
f"Expected a list, tuple or value of None for argv, not {repr(value)}"
)
self._argv = value
@property
def environ(self):
return getattr(self, "_environ")
@environ.setter
def environ(self, value):
if not (value is None or isinstance(value, dict)):
raise ValueError(
f"Expected a dictionary value for environ, not {repr(value)}"
)
self._environ = value
@property
def logger(self):
return self._logger
@property
def path(self):
return self._path
# endregion
# region Methods
def execute(self):
# noinspection PyBroadException
try:
if self._argv is None:
self._argv = os.path.splitext(os.path.basename(self._path))[0]
self._execute(self._path, self._argv, self._environ)
except:
error_type, error, tb = sys.exc_info()
message = f"Command execution failed: {str(error)}"
self._logger.error(
message + "\nTraceback:\n" + "".join(traceback.format_tb(tb))
)
sys.exit(1)
if sys.platform == "win32":
@staticmethod
def _execute(path, argv=None, environ=None):
"""Executes an external search command.
:param path: Path to the external search command.
:type path: unicode
:param argv: Argument list.
:type argv: list or tuple
The arguments to the child process should start with the name of the command being run, but this is not
enforced. A value of :const:`None` specifies that the base name of path name :param:`path` should be used.
:param environ: A mapping which is used to define the environment variables for the new process.
:type environ: dict or None.
This mapping is used instead of the current process’s environment. A value of :const:`None` specifies that
the :data:`os.environ` mapping should be used.
:return: None
"""
search_path = os.getenv("PATH") if environ is None else environ.get("PATH")
found = ExternalSearchCommand._search_path(path, search_path)
if found is None:
raise ValueError(f"Cannot find command on path: {path}")
path = found
logger.debug(f'starting command="{path}", arguments={argv}')
def terminate(signal_number):
sys.exit(
f"External search command is terminating on receipt of signal={signal_number}."
)
def terminate_child():
if p.pid is not None and p.returncode is None:
logger.debug(
'terminating command="%s", arguments=%d, pid=%d',
path,
argv,
p.pid,
)
os.kill(p.pid, CTRL_BREAK_EVENT)
p = Popen(
argv,
executable=path,
env=environ,
stdin=sys.stdin,
stdout=sys.stdout,
stderr=sys.stderr,
)
atexit.register(terminate_child)
signal(SIGBREAK, terminate)
signal(SIGINT, terminate)
signal(SIGTERM, terminate)
logger.debug(
'started command="%s", arguments=%s, pid=%d', path, argv, p.pid
)
p.wait()
logger.debug(
'finished command="%s", arguments=%s, pid=%d, returncode=%d',
path,
argv,
p.pid,
p.returncode,
)
if p.returncode != 0:
sys.exit(p.returncode)
@staticmethod
def _search_path(executable, paths):
"""Locates an executable program file.
:param executable: The name of the executable program to locate.
:type executable: unicode
:param paths: A list of one or more directory paths where executable programs are located.
:type paths: unicode
:return:
:rtype: Path to the executable program located or :const:`None`.
"""
directory, filename = os.path.split(executable)
extension = os.path.splitext(filename)[1].upper()
executable_extensions = ExternalSearchCommand._executable_extensions
if directory:
if len(extension) and extension in executable_extensions:
return None
for extension in executable_extensions:
path = executable + extension
if os.path.isfile(path):
return path
return None
if not paths:
return None
directories = [
directory for directory in paths.split(";") if len(directory)
]
if len(directories) == 0:
return None
if len(extension) and extension in executable_extensions:
for directory in directories:
path = os.path.join(directory, executable)
if os.path.isfile(path):
return path
return None
for directory in directories:
path_without_extension = os.path.join(directory, executable)
for extension in executable_extensions:
path = path_without_extension + extension
if os.path.isfile(path):
return path
return None
_executable_extensions = (".COM", ".EXE")
else:
@staticmethod
def _execute(path, argv, environ):
if environ is None:
os.execvp(path, argv)
else:
os.execvpe(path, argv, environ)
# endregion
def execute(path, argv=None, environ=None, command_class=ExternalSearchCommand):
"""
:param path:
:type path: basestring
:param argv:
:type: argv: list, tuple, or None
:param environ:
:type environ: dict
:param command_class: External search command class to instantiate and execute.
:type command_class: type
:return:
:rtype: None
"""
assert issubclass(command_class, ExternalSearchCommand)
command_class(path, argv, environ).execute()