-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathasync_runner.py
More file actions
74 lines (60 loc) · 2.78 KB
/
async_runner.py
File metadata and controls
74 lines (60 loc) · 2.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#!/usr/bin/env python3
########################################################################
# #
# Copyright 2016 (c) Araluc Ltd. All rights reserved. #
# For copying and use see: #
# https://github.com/dlinten/pytools/blob/master/LICENSE #
# #
# Company : Araluc Ltd. #
# Web site : araluc.com #
# Email : info@araluc.com #
# Author : David Linten #
# Date : 2016-12-05 16:20:27.475138 #
# #
# Name : async_runner.py #
# Description : Library to run commands in an asynchronous way. #
# #
########################################################################
import sys
import subprocess
import re
ARG_PATTERN = re.compile(r'(?:[^\s,"]|"(?:\\.|[^"])*")+')
class Execute:
"""Class for running commands on a system
"""
@staticmethod
def execute_async(cmd: str) -> str:
"""Executes a command asynchronously without returning command state on completion
:param cmd: Command to run
:returns str: Iterator to lines returned from stdout
:raises CalledProcessError
"""
args = list()
for val in ARG_PATTERN.finditer(cmd):
args.append(val.group().replace("\"", "").replace("\'", ""))
popen = subprocess.Popen(args,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True)
lines = iter(popen.stdout.readline, "")
for line in lines:
yield line
popen.stdout.close()
return_code = popen.wait()
if return_code != 0:
raise subprocess.CalledProcessError(return_code, cmd.split(' '))
@staticmethod
def execute_with_info_async(cmd: str) -> str:
"""Executes a command asynchronously with returning command state on completion
:param cmd: Command to run
:returns str: Iterator to lines returned from stdout
:raises CalledProcessError
"""
try:
for line in Execute.execute_async(cmd):
yield line
except KeyboardInterrupt:
yield "Stopped!"
sys.exit(0)
except subprocess.CalledProcessError as err:
yield "Failed!"