-
Notifications
You must be signed in to change notification settings - Fork 5.8k
Expand file tree
/
Copy pathpolicy_wrapper.py
More file actions
311 lines (258 loc) · 10.4 KB
/
policy_wrapper.py
File metadata and controls
311 lines (258 loc) · 10.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Purpose
Shows how to use AWS Identity and Access Management (IAM) policies.
"""
# snippet-start:[python.example_code.iam.policy_wrapper.imports]
import json
import logging
import operator
import pprint
import time
import boto3
from botocore.exceptions import ClientError
logger = logging.getLogger(__name__)
iam = boto3.resource("iam")
# snippet-end:[python.example_code.iam.policy_wrapper.imports]
# snippet-start:[python.example_code.iam.CreatePolicy]
def create_policy(name, description, actions, resource_arn):
"""
Creates a policy that contains a single statement.
:param name: The name of the policy to create.
:param description: The description of the policy.
:param actions: The actions allowed by the policy. These typically take the
form of service:action, such as s3:PutObject.
:param resource_arn: The Amazon Resource Name (ARN) of the resource this policy
applies to. This ARN can contain wildcards, such as
'arn:aws:s3:::amzn-s3-demo-bucket/*' to allow actions on all objects
in the bucket named 'amzn-s3-demo-bucket'.
:return: The newly created policy.
"""
policy_doc = {
"Version": "2012-10-17",
"Statement": [{"Effect": "Allow", "Action": actions, "Resource": resource_arn}],
}
try:
policy = iam.create_policy(
PolicyName=name,
Description=description,
PolicyDocument=json.dumps(policy_doc),
)
logger.info("Created policy %s.", policy.arn)
except ClientError:
logger.exception("Couldn't create policy %s.", name)
raise
else:
return policy
# snippet-end:[python.example_code.iam.CreatePolicy]
# snippet-start:[python.example_code.iam.DeletePolicy]
def delete_policy(policy_arn):
"""
Deletes a policy.
:param policy_arn: The ARN of the policy to delete.
"""
try:
iam.Policy(policy_arn).delete()
logger.info("Deleted policy %s.", policy_arn)
except ClientError:
logger.exception("Couldn't delete policy %s.", policy_arn)
raise
# snippet-end:[python.example_code.iam.DeletePolicy]
# snippet-start:[python.example_code.iam.CreatePolicyVersion]
def create_policy_version(policy_arn, actions, resource_arn, set_as_default):
"""
Creates a policy version. Policies can have up to five versions. The default
version is the one that is used for all resources that reference the policy.
:param policy_arn: The ARN of the policy.
:param actions: The actions to allow in the policy version.
:param resource_arn: The ARN of the resource this policy version applies to.
:param set_as_default: When True, this policy version is set as the default
version for the policy. Otherwise, the default
is not changed.
:return: The newly created policy version.
"""
policy_doc = {
"Version": "2012-10-17",
"Statement": [{"Effect": "Allow", "Action": actions, "Resource": resource_arn}],
}
try:
policy = iam.Policy(policy_arn)
policy_version = policy.create_version(
PolicyDocument=json.dumps(policy_doc), SetAsDefault=set_as_default
)
logger.info(
"Created policy version %s for policy %s.",
policy_version.version_id,
policy_version.arn,
)
except ClientError:
logger.exception("Couldn't create a policy version for %s.", policy_arn)
raise
else:
return policy_version
# snippet-end:[python.example_code.iam.CreatePolicyVersion]
# snippet-start:[python.example_code.iam.ListPolicies]
def list_policies(scope):
"""
Lists the policies in the current account.
:param scope: Limits the kinds of policies that are returned. For example,
'Local' specifies that only locally managed policies are returned.
:return: The list of policies.
"""
try:
policies = list(iam.policies.filter(Scope=scope))
logger.info("Got %s policies in scope '%s'.", len(policies), scope)
except ClientError:
logger.exception("Couldn't get policies for scope '%s'.", scope)
raise
else:
return policies
# snippet-end:[python.example_code.iam.ListPolicies]
# snippet-start:[python.example_code.iam.GetPolicy]
# snippet-start:[python.example_code.iam.GetPolicyVersion]
def get_default_policy_statement(policy_arn):
"""
Gets the statement of the default version of the specified policy.
:param policy_arn: The ARN of the policy to look up.
:return: The statement of the default policy version.
"""
try:
policy = iam.Policy(policy_arn)
# To get an attribute of a policy, the SDK first calls get_policy.
policy_doc = policy.default_version.document
policy_statement = policy_doc.get("Statement", None)
logger.info("Got default policy doc for %s.", policy.policy_name)
logger.info(policy_doc)
except ClientError:
logger.exception("Couldn't get default policy statement for %s.", policy_arn)
raise
else:
return policy_statement
# snippet-end:[python.example_code.iam.GetPolicyVersion]
# snippet-end:[python.example_code.iam.GetPolicy]
# snippet-start:[python.example_code.iam.Scenario_RollbackPolicyVersion]
def rollback_policy_version(policy_arn):
"""
Rolls back to the previous default policy, if it exists.
1. Gets the list of policy versions in order by date.
2. Finds the default.
3. Makes the previous policy the default.
4. Deletes the old default version.
:param policy_arn: The ARN of the policy to roll back.
:return: The default version of the policy after the rollback.
"""
try:
policy_versions = sorted(
iam.Policy(policy_arn).versions.all(),
key=operator.attrgetter("create_date"),
)
logger.info("Got %s versions for %s.", len(policy_versions), policy_arn)
except ClientError:
logger.exception("Couldn't get versions for %s.", policy_arn)
raise
default_version = None
rollback_version = None
try:
while default_version is None:
ver = policy_versions.pop()
if ver.is_default_version:
default_version = ver
rollback_version = policy_versions.pop()
rollback_version.set_as_default()
logger.info("Set %s as the default version.", rollback_version.version_id)
default_version.delete()
logger.info("Deleted original default version %s.", default_version.version_id)
except IndexError:
if default_version is None:
logger.warning("No default version found for %s.", policy_arn)
elif rollback_version is None:
logger.warning(
"Default version %s found for %s, but no previous version exists, so "
"nothing to roll back to.",
default_version.version_id,
policy_arn,
)
except ClientError:
logger.exception("Couldn't roll back version for %s.", policy_arn)
raise
else:
return rollback_version
# snippet-end:[python.example_code.iam.Scenario_RollbackPolicyVersion]
# snippet-start:[python.example_code.iam.AttachRolePolicy_Policy]
def attach_to_role(role_name, policy_arn):
"""
Attaches a policy to a role.
:param role_name: The name of the role. **Note** this is the name, not the ARN.
:param policy_arn: The ARN of the policy.
"""
try:
iam.Policy(policy_arn).attach_role(RoleName=role_name)
logger.info("Attached policy %s to role %s.", policy_arn, role_name)
except ClientError:
logger.exception("Couldn't attach policy %s to role %s.", policy_arn, role_name)
raise
# snippet-end:[python.example_code.iam.AttachRolePolicy_Policy]
# snippet-start:[python.example_code.iam.DetachRolePolicy_Policy]
def detach_from_role(role_name, policy_arn):
"""
Detaches a policy from a role.
:param role_name: The name of the role. **Note** this is the name, not the ARN.
:param policy_arn: The ARN of the policy.
"""
try:
iam.Policy(policy_arn).detach_role(RoleName=role_name)
logger.info("Detached policy %s from role %s.", policy_arn, role_name)
except ClientError:
logger.exception(
"Couldn't detach policy %s from role %s.", policy_arn, role_name
)
raise
# snippet-end:[python.example_code.iam.DetachRolePolicy_Policy]
# snippet-start:[python.example_code.iam.Scenario_PolicyManagement]
def usage_demo():
"""Shows how to use the policy functions."""
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
print("-" * 88)
print("Welcome to the AWS Identity and Account Management policy demo.")
print("-" * 88)
print(
"Policies let you define sets of permissions that can be attached to "
"other IAM resources, like users and roles."
)
bucket_arn = f"arn:aws:s3:::amzn-s3-demo-bucket"
policy = create_policy(
"demo-iam-policy",
"Policy for IAM demonstration.",
["s3:ListObjects"],
bucket_arn,
)
print(f"Created policy {policy.policy_name}.")
policies = list_policies("Local")
print(f"Your account has {len(policies)} managed policies:")
print(*[pol.policy_name for pol in policies], sep=", ")
time.sleep(1)
policy_version = create_policy_version(
policy.arn, ["s3:PutObject"], bucket_arn, True
)
print(
f"Added policy version {policy_version.version_id} to policy "
f"{policy.policy_name}."
)
default_statement = get_default_policy_statement(policy.arn)
print(f"The default policy statement for {policy.policy_name} is:")
pprint.pprint(default_statement)
rollback_version = rollback_policy_version(policy.arn)
print(
f"Rolled back to version {rollback_version.version_id} for "
f"{policy.policy_name}."
)
default_statement = get_default_policy_statement(policy.arn)
print(f"The default policy statement for {policy.policy_name} is now:")
pprint.pprint(default_statement)
delete_policy(policy.arn)
print(f"Deleted policy {policy.policy_name}.")
print("Thanks for watching!")
# snippet-end:[python.example_code.iam.Scenario_PolicyManagement]
if __name__ == "__main__":
usage_demo()