forked from awsdocs/aws-doc-sdk-examples
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscenario_controltower.py
More file actions
327 lines (283 loc) · 13 KB
/
scenario_controltower.py
File metadata and controls
327 lines (283 loc) · 13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import logging
import sys
import time
from typing import Any, Optional
import boto3
from botocore.exceptions import ClientError
from controltower_wrapper import ControlTowerWrapper
# Add relative path to include demo_tools in this code example without need for setup.
sys.path.append("../..")
import demo_tools.question as q # noqa
logger = logging.getLogger(__name__)
# snippet-start:[python.example_code.controltower.ControlTowerScenario]
class ControlTowerScenario:
IDENTITY_CENTER_BASELINE = "baseline/LN25R72TTG6IGPTQ"
stack_name = ""
def __init__(
self, controltower_wrapper: ControlTowerWrapper, org_client: boto3.client
):
"""
:param controltower_wrapper: An instance of the ControlTowerWrapper class.
:param org_client: A Boto3 Organization client.
"""
self.controltower_wrapper = controltower_wrapper
self.org_client = org_client
self.stack = None
self.ou_id = None
self.ou_arn = None
self.account_id = None
self.landing_zone_id = None
self.use_landing_zone = False
def run_scenario(self) -> None:
print("-" * 88)
print(
"\tWelcome to the AWS Control Tower with ControlCatalog example scenario."
)
print("-" * 88)
print(
"This demo will walk you through working with AWS Control Tower for landing zones,\n"
"managing baselines, and working with controls."
)
self.account_id = boto3.client("sts").get_caller_identity()["Account"]
print(
"Some demo operations require the use of a landing zone. "
"\nYou can use an existing landing zone or opt out of these operations in the demo."
"\nFor instructions on how to set up a landing zone, "
"\nsee https://docs.aws.amazon.com/controltower/latest/userguide/getting-started-from-console.html"
)
# List available landing zones
landing_zones = self.controltower_wrapper.list_landing_zones()
if landing_zones:
print("\nAvailable Landing Zones:")
for i, lz in enumerate(landing_zones, 1):
print(f"{i} {lz['arn']})")
# Ask if user wants to use the first landing zone in the list
if q.ask(
f"Do you want to use the first landing zone in the list ({landing_zones[0]['arn']})? (y/n) ",
q.is_yesno,
):
self.use_landing_zone = True
self.landing_zone_id = landing_zones[0]["arn"]
print(f"Using landing zone ID: {self.landing_zone_id})")
# Set up organization and get Sandbox OU ID.
sandbox_ou_id = self.setup_organization()
# Store the OU ID for use in the CloudFormation template.
self.ou_id = sandbox_ou_id
elif q.ask(
f"Do you want to use a different existing Landing Zone for this demo? (y/n) ",
q.is_yesno,
):
self.use_landing_zone = True
self.landing_zone_id = q.ask("Enter landing zone id: ", q.non_empty)
# Set up organization and get Sandbox OU ID.
sandbox_ou_id = self.setup_organization()
# Store the OU ID for use in the CloudFormation template.
self.ou_id = sandbox_ou_id
# List and Enable Baseline.
print("\nManaging Baselines:")
control_tower_baseline = None
identity_center_baseline = None
baselines = self.controltower_wrapper.list_baselines()
print("\nListing available Baselines:")
for baseline in baselines:
if baseline["name"] == "AWSControlTowerBaseline":
control_tower_baseline = baseline
print(f"{baseline['name']}")
if self.use_landing_zone:
print("\nListing enabled baselines:")
enabled_baselines = self.controltower_wrapper.list_enabled_baselines()
for baseline in enabled_baselines:
# If the Identity Center baseline is enabled, the identifier must be used for other baselines.
if self.IDENTITY_CENTER_BASELINE in baseline["baselineIdentifier"]:
identity_center_baseline = baseline
print(f"{baseline['baselineIdentifier']}")
if q.ask(
f"Do you want to enable the Control Tower Baseline? (y/n) ",
q.is_yesno,
):
print("\nEnabling Control Tower Baseline.")
ic_baseline_arn = (
identity_center_baseline["arn"]
if identity_center_baseline
else None
)
baseline_arn = self.controltower_wrapper.enable_baseline(
self.ou_arn, ic_baseline_arn, control_tower_baseline["arn"], "5.0"
)
if baseline_arn:
print(f"Enabled baseline ARN: {baseline_arn}")
else:
# Find the enabled baseline so we can reset it.
for enabled_baseline in enabled_baselines:
if (
enabled_baseline["baselineIdentifier"]
== control_tower_baseline["arn"]
):
baseline_arn = enabled_baseline["arn"]
if baseline_arn:
print("No change, the selected baseline was already enabled.")
if baseline_arn and q.ask(
f"Do you want to reset the Control Tower Baseline? (y/n) ",
q.is_yesno,
):
print(f"\nResetting Control Tower Baseline. {baseline_arn}")
operation_id = self.controltower_wrapper.reset_enabled_baseline(
baseline_arn
)
print(f"\nReset baseline operation id {operation_id}.")
if baseline_arn and q.ask(
f"Do you want to disable the Control Tower Baseline? (y/n) ",
q.is_yesno,
):
print(f"Disabling baseline ARN: {baseline_arn}")
operation_id = self.controltower_wrapper.disable_baseline(
baseline_arn
)
print(f"\nDisabled baseline operation id {operation_id}.")
# Re-enable the baseline for the next step.
print("\nEnabling Control Tower Baseline.")
self.controltower_wrapper.enable_baseline(
self.ou_arn,
ic_baseline_arn,
control_tower_baseline["arn"],
"5.0",
)
# List and Enable Controls.
print("\nManaging Controls:")
controls = self.controltower_wrapper.list_controls()
print("\nListing first 5 available Controls:")
for i, control in enumerate(controls[:5], 1):
print(f"{i}. {control['Name']} - {control['Arn']}")
if self.use_landing_zone:
target_ou = self.ou_arn
enabled_controls = self.controltower_wrapper.list_enabled_controls(
target_ou
)
print("\nListing enabled controls:")
for i, control in enumerate(enabled_controls, 1):
print(f"{i}. {control['controlIdentifier']}")
# Enable first non-enabled control as an example.
enabled_control_arns = [control["arn"] for control in enabled_controls]
control_arn = next(
control["Arn"]
for control in controls
if control["Arn"] not in enabled_control_arns
)
if control_arn and q.ask(
f"Do you want to enable the control {control_arn}? (y/n) ",
q.is_yesno,
):
print(f"\nEnabling control: {control_arn}")
operation_id = self.controltower_wrapper.enable_control(
control_arn, target_ou
)
if operation_id:
print(f"Enabled control with operation id {operation_id}")
if control_arn and q.ask(
f"Do you want to disable the control? (y/n) ",
q.is_yesno,
):
print("\nDisabling the control...")
operation_id = self.controltower_wrapper.disable_control(
control_arn, target_ou
)
print(f"Disable operation ID: {operation_id}")
print("\nThis concludes the example scenario.")
print("Thanks for watching!")
print("-" * 88)
def setup_organization(self):
"""
Checks if the current account is part of an organization and creates one if needed.
Also ensures a Sandbox OU exists and returns its ID.
:return: The ID of the Sandbox OU
"""
print("\nChecking organization status...")
try:
# Check if account is part of an organization
org_response = self.org_client.describe_organization()
org_id = org_response["Organization"]["Id"]
print(f"Account is part of organization: {org_id}")
except ClientError as error:
if error.response["Error"]["Code"] == "AWSOrganizationsNotInUseException":
print("No organization found. Creating a new organization...")
try:
create_response = self.org_client.create_organization(
FeatureSet="ALL"
)
org_id = create_response["Organization"]["Id"]
print(f"Created new organization: {org_id}")
# Wait for organization to be available.
waiter = self.org_client.get_waiter("organization_active")
waiter.wait(
Organization=org_id,
WaiterConfig={"Delay": 5, "MaxAttempts": 12},
)
except ClientError as create_error:
logger.error(
"Couldn't create organization. Here's why: %s: %s",
create_error.response["Error"]["Code"],
create_error.response["Error"]["Message"],
)
raise
else:
logger.error(
"Couldn't describe organization. Here's why: %s: %s",
error.response["Error"]["Code"],
error.response["Error"]["Message"],
)
raise
# Look for Sandbox OU.
sandbox_ou_id = None
paginator = self.org_client.get_paginator(
"list_organizational_units_for_parent"
)
try:
# Get root ID first.
roots = self.org_client.list_roots()["Roots"]
if not roots:
raise ValueError("No root found in organization")
root_id = roots[0]["Id"]
# Search for existing Sandbox OU.
print("Checking for Sandbox OU...")
for page in paginator.paginate(ParentId=root_id):
for ou in page["OrganizationalUnits"]:
if ou["Name"] == "Sandbox":
sandbox_ou_id = ou["Id"]
self.ou_arn = ou["Arn"]
print(f"Found existing Sandbox OU: {sandbox_ou_id}")
break
if sandbox_ou_id:
break
# Create Sandbox OU if it doesn't exist.
if not sandbox_ou_id:
print("Creating Sandbox OU...")
create_ou_response = self.org_client.create_organizational_unit(
ParentId=root_id, Name="Sandbox"
)
sandbox_ou_id = create_ou_response["OrganizationalUnit"]["Id"]
print(f"Created new Sandbox OU: {sandbox_ou_id}")
# Wait for OU to be available.
waiter = self.org_client.get_waiter("organizational_unit_active")
waiter.wait(
OrganizationalUnitId=sandbox_ou_id,
WaiterConfig={"Delay": 5, "MaxAttempts": 12},
)
except ClientError as error:
logger.error(
"Couldn't set up Sandbox OU. Here's why: %s: %s",
error.response["Error"]["Code"],
error.response["Error"]["Message"],
)
raise
return sandbox_ou_id
if __name__ == "__main__":
try:
org = boto3.client("organizations")
control_tower_wrapper = ControlTowerWrapper.from_client()
scenario = ControlTowerScenario(control_tower_wrapper, org)
scenario.run_scenario()
except Exception:
logging.exception("Something went wrong with the scenario.")
# snippet-end:[python.example_code.controltower.ControlTowerScenario]