-
Notifications
You must be signed in to change notification settings - Fork 537
Expand file tree
/
Copy pathec2_helpers.py
More file actions
534 lines (449 loc) · 20.5 KB
/
ec2_helpers.py
File metadata and controls
534 lines (449 loc) · 20.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
"""EC2 EFA instance lifecycle helpers.
Provides a context manager for launching EFA-enabled EC2 instances,
setting up multi-node containers with SSH, and guaranteed cleanup.
"""
import logging
import os
from contextlib import contextmanager
from test_utils.aws import AWSSessionManager, LoggedConnection
from test_utils.constants import DEFAULT_REGION, EC2_INSTANCE_ROLE_NAME
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.INFO)
MASTER_CONTAINER_NAME = "master_container"
WORKER_CONTAINER_NAME = "worker_container"
MASTER_SSH_KEY_NAME = "master_id_rsa"
WORKER_SSH_KEY_NAME = "worker_id_rsa"
HOSTS_FILE_LOCATION = "/root/hosts"
DEFAULT_TIMEOUT = 600
# Permanent SG looked up by name.
EFA_SG_NAME = "dlc-cicd-efa-test"
def get_efa_devices(conn):
"""Get list of EFA device paths on an instance."""
result = conn.run("ls -d /dev/infiniband/uverbs* 2>/dev/null || true")
devices = result.stdout.strip().split()
return devices
def get_num_gpus(conn):
"""Get number of GPUs on an instance."""
result = conn.run("nvidia-smi -L | wc -l")
return int(result.stdout.strip())
def get_num_efa_interfaces(aws_session, instance_type):
"""Get the maximum number of EFA interfaces for an instance type (from EC2 API)."""
resp = aws_session.ec2.describe_instance_types(InstanceTypes=[instance_type])
info = resp["InstanceTypes"][0]
num = info.get("NetworkInfo", {}).get("EfaInfo", {}).get("MaximumEfaInterfaces")
if not num:
raise RuntimeError(f"{instance_type} does not support EFA")
return num
def generate_efa_network_interfaces(aws_session, instance_type, subnet_id, sg_id):
"""Generate NetworkInterfaces config for EFA-enabled launch."""
num_interfaces = get_num_efa_interfaces(aws_session, instance_type)
interfaces = []
for idx in range(num_interfaces):
iface = {
"DeviceIndex": 0 if idx == 0 else 1,
"NetworkCardIndex": idx,
"SubnetId": subnet_id,
"Groups": [sg_id],
"InterfaceType": "efa",
"DeleteOnTermination": True,
}
interfaces.append(iface)
return interfaces
def get_default_subnet(aws_session, az=None):
"""Get a default subnet ID, optionally in a specific AZ."""
filters = [{"Name": "default-for-az", "Values": ["true"]}]
if az:
filters.append({"Name": "availability-zone", "Values": [az]})
subnets = aws_session.ec2.describe_subnets(Filters=filters)["Subnets"]
if not subnets:
raise RuntimeError(f"No default subnet found{f' in {az}' if az else ''}")
return subnets[0]["SubnetId"]
def get_efa_security_group_id(aws_session, name=EFA_SG_NAME):
"""Look up the permanent EFA SG in the default VPC by name."""
vpc_id = aws_session.ec2.describe_vpcs(Filters=[{"Name": "is-default", "Values": ["true"]}])[
"Vpcs"
][0]["VpcId"]
resp = aws_session.ec2.describe_security_groups(
Filters=[
{"Name": "group-name", "Values": [name]},
{"Name": "vpc-id", "Values": [vpc_id]},
],
)
if not resp["SecurityGroups"]:
raise RuntimeError(f"SG {name!r} not found in default VPC {vpc_id}")
return resp["SecurityGroups"][0]["GroupId"]
def authorize_runner_ssh(aws_session, sg_id, runner_ip, description):
"""Add ingress rule allowing SSH from the runner's IP to the permanent SG.
Idempotent: if the rule already exists, logs and continues.
"""
from botocore.exceptions import ClientError
try:
aws_session.ec2.authorize_security_group_ingress(
GroupId=sg_id,
IpPermissions=[
{
"IpProtocol": "tcp",
"FromPort": 22,
"ToPort": 22,
"IpRanges": [{"CidrIp": f"{runner_ip}/32", "Description": description}],
}
],
)
LOGGER.info(f"Authorized SSH from {runner_ip}/32 on SG {sg_id} ({description})")
except ClientError as e:
if e.response["Error"]["Code"] == "InvalidPermission.Duplicate":
LOGGER.info(f"SSH ingress from {runner_ip}/32 already exists on SG {sg_id}")
else:
raise
def revoke_runner_ssh(aws_session, sg_id, runner_ip):
"""Remove the SSH ingress rule for the runner's IP. Silent on error (cleanup path)."""
try:
aws_session.ec2.revoke_security_group_ingress(
GroupId=sg_id,
IpPermissions=[
{
"IpProtocol": "tcp",
"FromPort": 22,
"ToPort": 22,
"IpRanges": [{"CidrIp": f"{runner_ip}/32"}],
}
],
)
LOGGER.info(f"Revoked SSH from {runner_ip}/32 on SG {sg_id}")
except Exception as e:
LOGGER.warning(f"Failed to revoke SSH from {runner_ip}/32 on SG {sg_id}: {e}")
def cleanup_stale_runner_ssh_rules(aws_session, sg_id, description_prefix="efa-test-runner-"):
"""Revoke stale port-22 ingress CIDR rules left by killed/crashed previous runs.
Only touches rules whose Description starts with `description_prefix`. Leaves
prefix-list rules, self-referencing rules, and any manually-added rules intact.
"""
resp = aws_session.ec2.describe_security_groups(GroupIds=[sg_id])
if not resp["SecurityGroups"]:
return
stale_cidrs = []
for perm in resp["SecurityGroups"][0].get("IpPermissions", []):
if (
perm.get("IpProtocol") != "tcp"
or perm.get("FromPort") != 22
or perm.get("ToPort") != 22
):
continue
for ip_range in perm.get("IpRanges", []):
desc = ip_range.get("Description", "")
cidr = ip_range.get("CidrIp")
if cidr and desc.startswith(description_prefix):
stale_cidrs.append(cidr)
if not stale_cidrs:
LOGGER.info(f"No stale runner SSH rules on SG {sg_id}")
return
LOGGER.info(f"Cleaning up {len(stale_cidrs)} stale runner SSH rule(s) on SG {sg_id}")
for cidr in stale_cidrs:
try:
aws_session.ec2.revoke_security_group_ingress(
GroupId=sg_id,
IpPermissions=[
{
"IpProtocol": "tcp",
"FromPort": 22,
"ToPort": 22,
"IpRanges": [{"CidrIp": cidr}],
}
],
)
LOGGER.info(f"Revoked stale rule {cidr} on SG {sg_id}")
except Exception as e:
LOGGER.warning(f"Failed to revoke stale rule {cidr} on SG {sg_id}: {e}")
def get_available_reservations(aws_session, instance_type, min_count=1):
"""Get capacity reservations with available instances, sorted by availability."""
response = aws_session.ec2.describe_capacity_reservations(
Filters=[
{"Name": "instance-type", "Values": [instance_type]},
{"Name": "state", "Values": ["active"]},
]
)
reservations = [
r for r in response["CapacityReservations"] if r["AvailableInstanceCount"] >= min_count
]
reservations.sort(key=lambda r: r["AvailableInstanceCount"])
return reservations
def _build_efa_run_params(ami_id, instance_type, key_name, network_interfaces, az, name=""):
"""Build common RunInstances params for EFA launch."""
return {
"ImageId": ami_id,
"InstanceType": instance_type,
"KeyName": key_name,
"NetworkInterfaces": network_interfaces,
"Placement": {"AvailabilityZone": az},
"MetadataOptions": {
"HttpTokens": "required",
"HttpEndpoint": "enabled",
"HttpPutResponseHopLimit": 2,
},
"BlockDeviceMappings": [
{"DeviceName": "/dev/xvda", "Ebs": {"VolumeSize": 300}},
],
"TagSpecifications": [
{
"ResourceType": "instance",
"Tags": [{"Key": "Name", "Value": f"CI-CD EFA {name}"}],
},
],
"IamInstanceProfile": {"Name": EC2_INSTANCE_ROLE_NAME},
}
def launch_efa_instances(aws_session, ami_id, instance_type, key_name, sg_id, count=2, name=""):
"""Launch EFA instances using capacity reservations.
Tries each reservation with sufficient capacity. Does not fall back to on-demand
(p4d on-demand availability is near zero).
Returns list of instance IDs.
"""
from botocore.exceptions import ClientError
reservations = get_available_reservations(aws_session, instance_type, min_count=count)
if not reservations:
raise RuntimeError(
f"No capacity reservations with >= {count} available {instance_type} instances. "
f"Check reservation status and retry when capacity is available."
)
for reservation in reservations:
az = reservation["AvailabilityZone"]
cr_id = reservation["CapacityReservationId"]
subnet_id = get_default_subnet(aws_session, az)
network_interfaces = generate_efa_network_interfaces(
aws_session, instance_type, subnet_id, sg_id
)
params = _build_efa_run_params(
ami_id, instance_type, key_name, network_interfaces, az, name
)
params["MinCount"] = count
params["MaxCount"] = count
params["CapacityReservationSpecification"] = {
"CapacityReservationTarget": {"CapacityReservationId": cr_id},
}
try:
response = aws_session.ec2.run_instances(**params)
instance_ids = [inst["InstanceId"] for inst in response["Instances"]]
LOGGER.info(
f"Launched {count}x {instance_type} in {az} via reservation {cr_id}: {instance_ids}"
)
return instance_ids
except ClientError as e:
LOGGER.warning(f"Failed to launch via reservation {cr_id} in {az}: {e}")
continue
raise RuntimeError(
f"Failed to launch {instance_type} from any capacity reservation. "
f"Tried {len(reservations)} reservation(s)."
)
def setup_container(conn, image_uri, container_name):
"""Pull image and start container with EFA devices and host networking."""
devices = get_efa_devices(conn)
device_args = " ".join(f"--device {d}" for d in devices)
conn.run(f"docker rm -f {container_name}", warn=True)
conn.run(
f"docker run --runtime=nvidia --gpus all -id "
f"--name {container_name} --network host --ulimit memlock=-1:-1 "
f"{device_args} -v $HOME/test:/test -v /dev/shm:/dev/shm "
f"{image_uri} bash"
)
LOGGER.info(f"Started container {container_name}")
def run_on_container(container_name, conn, cmd, timeout=DEFAULT_TIMEOUT, warn=False):
"""Execute a command inside a running container."""
return conn.run(f"docker exec {container_name} bash -c '{cmd}'", timeout=timeout, warn=warn)
def setup_master_ssh(conn):
"""Configure SSH client on master container."""
run_on_container(MASTER_CONTAINER_NAME, conn, f"rm -rf $HOME/.ssh/{MASTER_SSH_KEY_NAME}*")
run_on_container(
MASTER_CONTAINER_NAME, conn, f'ssh-keygen -t rsa -f $HOME/.ssh/{MASTER_SSH_KEY_NAME} -N ""'
)
ssh_config = (
"Host *\\n"
f" IdentityFile /root/.ssh/{MASTER_SSH_KEY_NAME}\\n"
" StrictHostKeyChecking no\\n"
" UserKnownHostsFile /dev/null\\n"
" Port 2022"
)
run_on_container(MASTER_CONTAINER_NAME, conn, f'echo -e "{ssh_config}" > $HOME/.ssh/config')
run_on_container(MASTER_CONTAINER_NAME, conn, "chmod -R 600 $HOME/.ssh/*")
def setup_worker_ssh(conn, master_pub_key):
"""Configure SSH server on worker container to accept master connections."""
run_on_container(WORKER_CONTAINER_NAME, conn, 'echo "Port 2022" >> /etc/ssh/sshd_config')
run_on_container(WORKER_CONTAINER_NAME, conn, f"rm -rf $HOME/.ssh/{WORKER_SSH_KEY_NAME}*")
run_on_container(
WORKER_CONTAINER_NAME, conn, f'ssh-keygen -t rsa -f $HOME/.ssh/{WORKER_SSH_KEY_NAME} -N ""'
)
run_on_container(
WORKER_CONTAINER_NAME,
conn,
f"cp $HOME/.ssh/{WORKER_SSH_KEY_NAME}.pub $HOME/.ssh/authorized_keys",
)
run_on_container(
WORKER_CONTAINER_NAME, conn, f'echo "{master_pub_key}" >> $HOME/.ssh/authorized_keys'
)
run_on_container(
WORKER_CONTAINER_NAME,
conn,
f"eval `ssh-agent -s` && ssh-add $HOME/.ssh/{WORKER_SSH_KEY_NAME}",
)
# Start sshd directly (AL2023 base image has no sysvinit).
run_on_container(WORKER_CONTAINER_NAME, conn, "/usr/sbin/sshd")
status = run_on_container(WORKER_CONTAINER_NAME, conn, "pgrep -x sshd", warn=True)
if status.failed:
raise RuntimeError("Failed to start SSH daemon on worker")
def create_hosts_file(master_conn, worker_private_ip, num_gpus):
"""Create MPI hosts file on master container."""
hosts = f"localhost slots={num_gpus}\n{worker_private_ip} slots={num_gpus}"
run_on_container(
MASTER_CONTAINER_NAME, master_conn, f'echo -e "{hosts}" > {HOSTS_FILE_LOCATION}'
)
def get_private_ip(aws_session, instance_id):
"""Get private IP of an instance."""
response = aws_session.ec2.describe_instances(InstanceIds=[instance_id])
return response["Reservations"][0]["Instances"][0]["PrivateIpAddress"]
def allocate_and_associate_eip(aws_session, instance_id):
"""Allocate an Elastic IP and associate it with the instance's primary network interface.
Returns (allocation_id, public_ip).
"""
eip = aws_session.ec2.allocate_address(Domain="vpc")
alloc_id = eip["AllocationId"]
public_ip = eip["PublicIp"]
# Get the primary network interface (DeviceIndex 0)
instance = aws_session.ec2.describe_instances(InstanceIds=[instance_id])
eni_id = None
for iface in instance["Reservations"][0]["Instances"][0]["NetworkInterfaces"]:
if iface["Attachment"]["DeviceIndex"] == 0:
eni_id = iface["NetworkInterfaceId"]
break
aws_session.ec2.associate_address(
AllocationId=alloc_id,
NetworkInterfaceId=eni_id,
)
LOGGER.info(f"Associated EIP {public_ip} ({alloc_id}) with instance {instance_id}")
return alloc_id, public_ip
def release_eip(aws_session, alloc_id):
"""Release an Elastic IP."""
try:
aws_session.ec2.release_address(AllocationId=alloc_id)
LOGGER.info(f"Released EIP {alloc_id}")
except Exception as e:
LOGGER.warning(f"Failed to release EIP {alloc_id}: {e}")
def cleanup_stale_efa_instances(aws_session, max_age_hours=4):
"""Terminate EFA test instances older than max_age_hours and release their EIPs.
Prevents resource leaks from cancelled/crashed workflow runs that didn't reach cleanup.
"""
from datetime import datetime, timezone
cutoff = datetime.now(timezone.utc).timestamp() - (max_age_hours * 3600)
try:
resp = aws_session.ec2.describe_instances(
Filters=[
{"Name": "tag:Name", "Values": ["CI-CD EFA efa-test"]},
{"Name": "instance-state-name", "Values": ["running", "stopped"]},
]
)
for reservation in resp.get("Reservations", []):
for instance in reservation.get("Instances", []):
launch_time = instance["LaunchTime"].timestamp()
if launch_time < cutoff:
instance_id = instance["InstanceId"]
LOGGER.warning(
f"Terminating stale EFA instance {instance_id} (launched {instance['LaunchTime']})"
)
aws_session.ec2.terminate_instances(InstanceIds=[instance_id])
# Release unassociated EIPs (leaked from terminated instances)
addresses = aws_session.ec2.describe_addresses().get("Addresses", [])
for addr in addresses:
if not addr.get("AssociationId") and addr.get("AllocationId"):
LOGGER.warning(
f"Releasing orphaned EIP {addr['AllocationId']} ({addr.get('PublicIp', '')})"
)
release_eip(aws_session, addr["AllocationId"])
except Exception as e:
LOGGER.warning(f"Stale resource cleanup failed (non-fatal): {e}")
@contextmanager
def efa_instances(image_uri, instance_type="p4d.24xlarge", region=DEFAULT_REGION):
"""Context manager that launches 2 EFA instances, sets up containers + SSH, and cleans up.
Yields (master_conn, worker_conn, aws_session) where connections are to the EC2 hosts.
"""
aws_session = AWSSessionManager(region=region)
ami_id = aws_session.get_latest_ami()
sg_id = get_efa_security_group_id(aws_session)
# Clean up leaked resources from previous cancelled/crashed runs
cleanup_stale_efa_instances(aws_session)
key_name = None
key_path = None
runner_ip = None
master_id = None
worker_id = None
master_eip_alloc = None
worker_eip_alloc = None
try:
key_name, key_path = aws_session.create_key_pair()
# Clean up stale runner SSH rules from any previous test that failed to clean up
# (e.g., runner process killed). Only touches rules our test created.
cleanup_stale_runner_ssh_rules(aws_session, sg_id)
# Authorize SSH from this runner's public IP on the permanent SG.
# Permanent SG allows corp prefix list only; CodeBuild runner IPs aren't in it.
runner_ip = aws_session.get_codebuild_runner_public_ip()
ssh_rule_description = f"efa-test-runner-{key_name}"
authorize_runner_ssh(aws_session, sg_id, runner_ip, ssh_rule_description)
instance_ids = launch_efa_instances(
aws_session, ami_id, instance_type, key_name, sg_id, count=2, name="efa-test"
)
master_id = instance_ids[0]
worker_id = instance_ids[1]
aws_session.wait_for_instance_ready(master_id)
aws_session.wait_for_instance_ready(worker_id)
# EIPs: multi-NIC EFA instances don't get auto public IPs.
master_eip_alloc, master_ip = allocate_and_associate_eip(aws_session, master_id)
worker_eip_alloc, worker_ip = allocate_and_associate_eip(aws_session, worker_id)
master_conn = LoggedConnection(
host=master_ip,
user="ec2-user",
connect_kwargs={"key_filename": [key_path]},
connect_timeout=600,
)
master_conn.config.run.in_stream = False
worker_conn = LoggedConnection(
host=worker_ip,
user="ec2-user",
connect_kwargs={"key_filename": [key_path]},
connect_timeout=600,
)
worker_conn.config.run.in_stream = False
master_conn.run("mkdir -p ~/test/efa/scripts ~/test/efa/logs")
worker_conn.run("mkdir -p ~/test/efa/scripts ~/test/efa/logs")
repo_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", ".."))
scripts_dir = os.path.join(repo_root, "test", "efa", "scripts")
for script in os.listdir(scripts_dir):
# SFTP does not expand ~; use paths relative to SSH home.
master_conn.put(os.path.join(scripts_dir, script), f"test/efa/scripts/{script}")
worker_conn.put(os.path.join(scripts_dir, script), f"test/efa/scripts/{script}")
master_conn.run("chmod +x ~/test/efa/scripts/*.sh")
worker_conn.run("chmod +x ~/test/efa/scripts/*.sh")
account_id = image_uri.split(".")[0]
ecr_login = f"aws ecr get-login-password --region {region} | docker login --username AWS --password-stdin {account_id}.dkr.ecr.{region}.amazonaws.com"
master_conn.run(ecr_login)
worker_conn.run(ecr_login)
master_conn.run(f"docker pull {image_uri}")
worker_conn.run(f"docker pull {image_uri}")
setup_container(master_conn, image_uri, MASTER_CONTAINER_NAME)
setup_container(worker_conn, image_uri, WORKER_CONTAINER_NAME)
setup_master_ssh(master_conn)
worker_private_ip = get_private_ip(aws_session, worker_id)
master_pub_key = run_on_container(
MASTER_CONTAINER_NAME, master_conn, f"cat $HOME/.ssh/{MASTER_SSH_KEY_NAME}.pub"
).stdout.strip()
setup_worker_ssh(worker_conn, master_pub_key)
num_gpus = get_num_gpus(master_conn)
create_hosts_file(master_conn, worker_private_ip, num_gpus)
yield master_conn, worker_conn, aws_session
finally:
if master_id:
aws_session.terminate_instance(master_id)
if worker_id:
aws_session.terminate_instance(worker_id)
if master_eip_alloc:
release_eip(aws_session, master_eip_alloc)
if worker_eip_alloc:
release_eip(aws_session, worker_eip_alloc)
if runner_ip:
revoke_runner_ssh(aws_session, sg_id, runner_ip)
if key_name:
aws_session.delete_key_pair(key_name, key_path)