-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathreplicator.py
More file actions
executable file
·103 lines (79 loc) · 3.58 KB
/
replicator.py
File metadata and controls
executable file
·103 lines (79 loc) · 3.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import argparse
import sys, subprocess
import asyncio
import yaml
from notifier import TelegramNotifier
from meta import version
reportLinesLimit = 20
# TODO: create log file for each execution
async def main():
parser = argparse.ArgumentParser(
description='Util to replicate backups from primary storage to a mirrors',
)
parser.add_argument('--version', '-v', action='version', version='%(prog)s ' + version)
parser.add_argument('config', help="Path to config file")
args = parser.parse_args()
config = yaml.load(open(args.config), Loader=yaml.Loader)
# Configure notifier
token = None
users = None
notifierPrefix = None
if ('notifications' in config):
token = config['notifications']['telegram']['botToken']
users = config['notifications']['telegram']['userIds']
notifierPrefix = config['notifications'].get('prefix')
notifier = TelegramNotifier(token, users)
notifier.setPrefix(notifierPrefix)
# Run tasks
for taskId, task in enumerate(config['tasks']):
taskName = task['name'] if 'name' in task else task['run']
escapedTaskName = notifier.escapeText(taskName)
newLinePrefix = '\n' if taskId != 0 else ''
print(newLinePrefix + f'Task "{taskName}"', flush=True)
# Skip by condition
if 'if' in task:
ifCmd = subprocess.Popen(task['if'], shell=True)
ifCmd.wait()
if ifCmd.returncode != 0:
print(f'Skip task "{taskName}"', flush=True)
await notifier.notify(f'Skip task "{escapedTaskName}"')
continue
# Run command
commands = task['run'] if isinstance(task['run'], list) else [task['run']]
for cmdIndex, command in enumerate(commands):
print(f'Run command "{command}"', flush=True)
# TODO: add timeout to stop process
replicationProcess = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
outLines = []
for rawLine in iter(replicationProcess.stdout.readline, b''):
line = rawLine.decode('utf-8')
# Print output to console
sys.stdout.write(line)
# Remove old lines out of limit
if (reportLinesLimit > 0):
freeSlots = reportLinesLimit - len(outLines)
if (freeSlots <= 0):
slotsToRemove = -freeSlots + 1
outLines = outLines[slotsToRemove:]
# Add lines
outLines.append(line)
replicationProcess.stdout.close()
replicationProcess.wait()
isRunSuccessful = replicationProcess.returncode == 0
# Notify result
totalCommandsLen = len(commands)
messageTargetSuffix = f'- command {cmdIndex + 1}/{totalCommandsLen}' if totalCommandsLen > 1 else ''
messageTarget = f'Task "{escapedTaskName}"' + notifier.escapeText(messageTargetSuffix)
if isRunSuccessful:
await notifier.notify(f'{messageTarget} final successful')
else:
# add last few lines to explain context
lastLog = notifier.escapeText(''.join(outLines))
await notifier.notify(f'⚠️ {messageTarget} has failed\n\n```\n...\n{lastLog}\n```')
# Stop the program for fails
if not isRunSuccessful:
exit(replicationProcess.returncode)
# Final
await notifier.notify(f'🎉 All tasks has final successful')
def cli():
asyncio.run(main())