-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathcli.py
More file actions
398 lines (347 loc) · 15.6 KB
/
cli.py
File metadata and controls
398 lines (347 loc) · 15.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
import json
import os
import re
import inquirer
from inquirer import errors
import paramiko
import colorama
from colorama import Fore, Style
import shutil
from datetime import datetime
from prettytable import PrettyTable
colorama.init()
CONFIG_DIR = "./configuration/sftp/"
BACKUP_DIR = "./configuration/backup/"
def print_colored(text, color=Fore.WHITE):
print(f"{color}{text}{Style.RESET_ALL}")
def load_config(file_name):
file_path = os.path.join(CONFIG_DIR, file_name)
if os.path.exists(file_path):
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
return {}
def save_config(config, file_name):
os.makedirs(CONFIG_DIR, exist_ok=True)
file_path = os.path.join(CONFIG_DIR, file_name)
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=4)
print_colored(f"Configuration saved to {file_path}", Fore.GREEN)
def get_config_files():
if not os.path.exists(CONFIG_DIR):
return []
return [f for f in os.listdir(CONFIG_DIR) if f.endswith('.json')]
def validate_url(answers, current):
if not current:
raise errors.ValidationError('', reason='URL cannot be empty')
if not re.match(r'^([a-zA-Z0-9]([-a-zA-Z0-9]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}(?::\d+)?$', current):
raise errors.ValidationError('', reason='Invalid URL format. Please enter a valid FQDN[:port] without protocol.')
return True
def validate_name(answers, current):
if not current or not current.strip():
raise errors.ValidationError('', reason='Name cannot be empty')
return True
def safe_name(name):
return re.sub(r'[^a-z0-9-]', '', name.lower())
def safe_bucket_name(name):
return re.sub(r'^s3://', '', name.lower())
def safe_prefix(prefix):
return prefix.strip('/')
def safe_remote_folder(folder):
return '/' + folder.strip('/')
def validate_schedule(answers, current):
if not current:
return True
predefined_schedules = {
'@monthly': '0 0 1 * ? *',
'@daily': '0 0 * * ? *',
'@hourly': '0 * * * ? *',
'@minutely': '* * * * ? *',
'@sunday': '0 0 ? * 1 *',
'@monday': '0 0 ? * 2 *',
'@tuesday': '0 0 ? * 3 *',
'@wednesday': '0 0 ? * 4 *',
'@thursday': '0 0 ? * 5 *',
'@friday': '0 0 ? * 6 *',
'@saturday': '0 0 ? * 7 *',
'@every10min': '0/10 * * * ? *'
}
if current in predefined_schedules:
return True
cron_regex = r'^(\S+\s){5}\S+$'
if re.match(cron_regex, current):
parts = current.split()
if len(parts) == 6:
return True
raise errors.ValidationError('', reason='Invalid schedule. Use predefined tags or a valid AWS Cron expression.')
def validate_kms_key_arn(answers, current):
if not current:
return True
kms_arn_pattern = r'^arn:aws:kms:[a-z0-9-]+:\d{12}:key/[a-f0-9-]{36}$'
if not re.match(kms_arn_pattern, current):
raise errors.ValidationError('', reason='Invalid KMS Key ARN format. Please enter a valid ARN.')
return True
def edit_sync_settings(sync_settings):
while True:
choices = [f"{s['LocalRepository']['BucketName']} -> {s['RemoteFolders']['Folder']} (KMS: {s['LocalRepository'].get('KmsKeyArn', 'None')})" for s in sync_settings]
choices.append("Add new sync setting")
choices.append("Finish editing")
questions = [
inquirer.List('choice',
message="Select a sync setting to edit or choose an action",
choices=choices,
)
]
answers = inquirer.prompt(questions)
choice = answers['choice']
if choice == "Finish editing":
if not sync_settings:
print_colored("At least one SyncSetting is required. Please add a sync setting.", Fore.YELLOW)
continue
break
elif choice == "Add new sync setting":
new_setting = prompt_sync_setting()
sync_settings.append(new_setting)
else:
index = choices.index(choice)
updated_setting = prompt_sync_setting(sync_settings[index])
sync_settings[index] = updated_setting
def prompt_sync_setting(existing=None):
questions = [
inquirer.Text('bucket_name', message="Enter the target S3 Bucket name",
default=existing['LocalRepository']['BucketName'] if existing else None),
inquirer.Text('prefix', message="Enter the target Prefix",
default=existing['LocalRepository']['Prefix'] if existing else None),
inquirer.Text('kms_key_arn', message="Enter KMS Key ARN (optional)",
default=existing['LocalRepository'].get('KmsKeyArn', '') if existing else '',
validate=validate_kms_key_arn),
inquirer.Text('remote_folder', message="Enter Remote Folder (%year%, %month% and %day% tags are supported)",
default=existing['RemoteFolders']['Folder'] if existing else None),
inquirer.Confirm('recursive', message="Is it recursive?",
default=existing['RemoteFolders']['Recursive'] if existing else True),
]
answers = inquirer.prompt(questions)
sync_setting = {
"LocalRepository": {
"BucketName": safe_bucket_name(answers['bucket_name']),
"Prefix": safe_prefix(answers['prefix'])
},
"RemoteFolders": {
"Folder": safe_remote_folder(answers['remote_folder']),
"Recursive": answers['recursive']
}
}
if answers['kms_key_arn']:
sync_setting['LocalRepository']['KmsKeyArn'] = answers['kms_key_arn']
return sync_setting
def fetch_host_key(hostname, port=22):
try:
transport = paramiko.Transport((hostname, port))
transport.start_client()
key = transport.get_remote_server_key()
return f"{key.get_name()} {key.get_base64()}"
except Exception as e:
print_colored(f"Error fetching host key: {e}", Fore.RED)
return None
finally:
if transport:
transport.close()
def confirm_config(config):
table = PrettyTable()
table.field_names = ["Setting", "Value"]
table.align["Setting"] = "r"
table.align["Value"] = "l"
table.max_width["Value"] = 100
for key, value in config.items():
if key != 'SyncSettings':
table.add_row([key, str(value)])
print_colored("\nConfiguration Summary:", Fore.CYAN)
print(table)
print_colored("\nSyncSettings:", Fore.CYAN)
sync_table = PrettyTable()
sync_table.field_names = ["Local", "Remote", "KMS Key ARN"]
sync_table.align = "l"
for setting in config.get('SyncSettings', []):
local = f"{setting['LocalRepository']['BucketName']}/{setting['LocalRepository']['Prefix']}"
remote = setting['RemoteFolders']['Folder']
kms_key_arn = setting['LocalRepository'].get('KmsKeyArn', 'None')
sync_table.add_row([local, remote, kms_key_arn])
print(sync_table)
return inquirer.confirm("Do you want to save this configuration?", default=True)
def backup_config(file_name):
source_path = os.path.join(CONFIG_DIR, file_name)
if os.path.exists(source_path):
os.makedirs(BACKUP_DIR, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_name = f"{os.path.splitext(file_name)[0]}_{timestamp}.json"
backup_path = os.path.join(BACKUP_DIR, backup_name)
shutil.copy2(source_path, backup_path)
print_colored(f"Backup created: {backup_path}", Fore.YELLOW)
def select_config_file():
config_files = get_config_files()
if not config_files:
print_colored("No existing configuration files found.", Fore.YELLOW)
return None
return inquirer.list_input(
message="Select a configuration file",
choices=config_files
)
def display_config_summary():
table = PrettyTable()
table.field_names = ["File Name", "Name", "URL", "Schedule"]
for file_name in get_config_files():
config = load_config(file_name)
table.add_row([file_name, config.get('Name', ''), config.get('Url', ''), config.get('Schedule', '')])
print(table)
def main():
while True:
questions = [
inquirer.List('action',
message="Choose an action",
choices=['Create new configuration', 'Modify existing configuration', 'View configuration', 'Delete configuration', 'Display all configurations', 'Exit'],
)
]
answers = inquirer.prompt(questions)
if answers['action'] == 'Exit':
break
elif answers['action'] == 'Create new configuration':
config = {}
file_name = None
elif answers['action'] == 'View configuration':
file_name = select_config_file()
if file_name:
config = load_config(file_name)
table = PrettyTable()
table.field_names = ["Setting", "Value"]
table.align["Setting"] = "r"
table.align["Value"] = "l"
table.max_width["Value"] = 100
for key, value in config.items():
if key != 'SyncSettings':
table.add_row([key, str(value)])
print(table)
print_colored("\nSyncSettings:", Fore.CYAN)
sync_table = PrettyTable()
sync_table.field_names = ["Local", "Remote", "KMS Key ARN"]
sync_table.align = "l"
for setting in config.get('SyncSettings', []):
local = f"{setting['LocalRepository']['BucketName']}/{setting['LocalRepository']['Prefix']}"
remote = setting['RemoteFolders']['Folder']
kms_key_arn = setting['LocalRepository'].get('KmsKeyArn', 'None')
sync_table.add_row([local, remote, kms_key_arn])
print(sync_table)
continue
elif answers['action'] == 'Delete configuration':
file_name = select_config_file()
if file_name and inquirer.confirm(f"Are you sure you want to delete {file_name}?", default=False):
os.remove(os.path.join(CONFIG_DIR, file_name))
print_colored(f"Deleted {file_name}", Fore.YELLOW)
continue
elif answers['action'] == 'Display all configurations':
display_config_summary()
continue
else:
file_name = select_config_file()
if not file_name:
continue
config = load_config(file_name)
questions = [
inquirer.Text('Name', message="Enter Name", validate=validate_name, default=config.get('Name', '')),
inquirer.Text('Description', message="Enter Description", default=config.get('Description', '')),
inquirer.Text('Url', message="Enter Url and optionally port (FQDN[:port])", validate=validate_url, default=config.get('Url', '')),
inquirer.List('SecurityPolicyName',
message="Select SecurityPolicyName",
choices=['TransferSFTPConnectorSecurityPolicy-2024-03', 'TransferSFTPConnectorSecurityPolicy-2023-07'],
default=config.get('SecurityPolicyName', 'TransferSFTPConnectorSecurityPolicy-2024-03')),
]
answers = inquirer.prompt(questions)
answers['Name'] = safe_name(answers['Name'])
config.update(answers)
schedule_choices = [
'@monthly', '@daily', '@hourly', '@minutely',
'@sunday', '@monday', '@tuesday', '@wednesday', '@thursday', '@friday', '@saturday',
'@every10min', 'Custom AWS Cron expression'
]
schedule = inquirer.list_input(
message="Select Schedule",
choices=schedule_choices,
default=config.get('Schedule', '@daily')
)
if schedule == 'Custom AWS Cron expression':
while True:
custom_schedule = inquirer.text(
message="Enter custom AWS Cron expression",
)
try:
if validate_schedule(None, custom_schedule):
schedule = custom_schedule
break
except errors.ValidationError as e:
print_colored(e.reason, Fore.RED)
if not inquirer.confirm(message="Do you want to try again?"):
print_colored("Using default schedule: @daily", Fore.YELLOW)
schedule = '@daily'
break
config['Schedule'] = schedule
public_keys = config.get('PublicKey', [])
if public_keys:
print_colored("Existing public keys:", Fore.CYAN)
for key in public_keys:
print(key)
add_new_key = inquirer.confirm(message="Do you want to add a new public key?", default=False)
else:
add_new_key = True
if add_new_key:
if ':' in config['Url']:
host, port = config['Url'].split(':')
host_key = fetch_host_key(host, int(port))
else:
host_key = fetch_host_key(config['Url'])
if host_key:
if host_key in public_keys:
print_colored("The fetched host key already exists in the configuration:", Fore.YELLOW)
print(host_key)
else:
add_fetched_key = inquirer.confirm(
message=f"Do you want to add the following new host key? (Y/n) \n{host_key}",
default=True
)
if add_fetched_key:
public_keys.append(host_key)
else:
manual_key = inquirer.text(message="Enter the public key manually")
if manual_key and manual_key not in public_keys:
public_keys
public_keys.append(manual_key)
elif manual_key in public_keys:
print_colored("This key already exists in the configuration.", Fore.YELLOW)
else:
manual_key = inquirer.text(message="Enter the public key manually")
if manual_key and manual_key not in public_keys:
public_keys.append(manual_key)
elif manual_key in public_keys:
print_colored("This key already exists in the configuration.", Fore.YELLOW)
if public_keys:
config['PublicKey'] = public_keys
else:
config.pop('PublicKey', None) # Remove PublicKey if no keys are specified
if 'SyncSettings' not in config:
config['SyncSettings'] = []
edit_sync_settings(config['SyncSettings'])
if confirm_config(config):
if file_name:
backup_config(file_name)
while not file_name:
file_name = inquirer.text(message="Enter file name to save (e.g., config.json)")
if not file_name:
print_colored("Please enter a valid file name.", Fore.YELLOW)
elif not file_name.endswith('.json'):
file_name += '.json'
save_config(config, file_name)
else:
print_colored("Configuration not saved.", Fore.YELLOW)
if __name__ == "__main__":
try:
main()
except Exception as e:
print_colored(f"An error occurred: {str(e)}", Fore.RED)
print_colored("Please check the log file for details.", Fore.RED)