-
Notifications
You must be signed in to change notification settings - Fork 45
Expand file tree
/
Copy pathaudit_log.py
More file actions
160 lines (147 loc) · 4.45 KB
/
audit_log.py
File metadata and controls
160 lines (147 loc) · 4.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import re
from datetime import datetime, timedelta
import click
from cloudify_cli import env
from cloudify_cli.cli import cfy, helptexts
from cloudify_cli.exceptions import CloudifyCliError
from cloudify_cli.table import print_data
AUDITLOG_COLUMNS = ['ref_table', 'ref_id', 'operation', 'creator_name',
'execution_id', 'created_at']
def _parse_before(ctx, _, spec):
"""Parse the --before/--since parameter"""
if not spec:
return spec
if spec == "now":
return datetime.utcnow()
r = re.match(r'^([.\d]+)([hdw])$', spec, re.IGNORECASE)
if r:
# timestamp specification e.g. 10.5h, 15d, 7w
count, unit = float(r.groups()[0]), r.groups()[1].lower()
if unit == 'h':
delta = timedelta(hours=count)
elif unit == 'd':
delta = timedelta(days=count)
else: # 'w'
delta = timedelta(weeks=count)
return datetime.utcnow() - delta
elif spec.startswith('@'):
try:
return datetime.utcfromtimestamp(int(spec[1:]))
except ValueError:
raise CloudifyCliError('Failed to parse timestamp: {0}'
.format(spec))
else:
return spec
@cfy.group(name='auditlog')
@cfy.assert_manager_active()
def auditlog():
"""Manage the audit log"""
pass
@auditlog.command(name='list',
short_help='List audit log entries')
@click.option('-c', '--creator-name',
help=helptexts.AUDIT_CREATOR_NAME)
@click.option('-e', '--execution-id',
help=helptexts.AUDIT_EXECUTION_ID)
@click.option('-i', '--since',
help=helptexts.AUDIT_SINCE,
callback=_parse_before)
@click.option('-f', '--follow',
help=helptexts.AUDIT_FOLLOW,
is_flag=True)
@cfy.options.timeout(default=300)
@cfy.options.sort_by()
@cfy.options.descending
@cfy.options.pagination_offset
@cfy.options.pagination_size
@cfy.options.common_options
@cfy.pass_logger
@cfy.pass_client()
def list_logs(
creator_name,
execution_id,
since,
follow,
timeout,
sort_by,
descending,
pagination_offset,
pagination_size,
logger,
client,
):
client = env.get_rest_client(async_client=True)
if follow:
from cloudify_cli.async_commands.audit_log import stream_logs
stream_logs(creator_name,
execution_id,
since,
timeout,
logger,
client)
else:
import asyncio
loop = asyncio.get_event_loop()
loop.run_until_complete(_list_logs(
creator_name,
execution_id,
since,
sort_by,
descending,
pagination_offset,
pagination_size,
logger,
client,
))
async def _list_logs(
creator_name,
execution_id,
since,
sort_by,
descending,
pagination_offset,
pagination_size,
logger,
client,
):
"""List audit_log entries"""
logger.info('Listing audit log entries...')
logs = await client.auditlog.list(
creator_name=creator_name,
execution_id=execution_id,
since=since,
order_by=sort_by,
desc=descending,
offset=pagination_offset,
size=pagination_size,
)
print_data(AUDITLOG_COLUMNS, logs, 'AuditLogs:')
logger.info('Showing %d of %d audit log entries',
len(logs), logs.metadata.pagination.total)
@auditlog.command(name='truncate',
short_help='Truncate audit log')
@click.option('-b', '--before',
required=True,
help=helptexts.AUDIT_TRUNCATE_BEFORE,
callback=_parse_before)
@click.option('-c', '--creator-name',
help=helptexts.AUDIT_CREATOR_NAME)
@click.option('-e', '--execution-id',
help=helptexts.AUDIT_EXECUTION_ID)
@cfy.pass_logger
@cfy.pass_client()
def truncate_logs(before,
creator_name,
execution_id,
logger,
client
):
"""Truncate audit_log entries"""
logger.info("Truncating audit log entries...")
params = {'before': before}
if creator_name:
params.update({'creator_name': creator_name})
if execution_id:
params.update({'execution_id': execution_id})
result = client.auditlog.delete(**params)
logger.info('%d audit log entries have been truncated', result.deleted)