-
Notifications
You must be signed in to change notification settings - Fork 12
Expand file tree
/
Copy pathcontainers.py
More file actions
722 lines (588 loc) · 23.8 KB
/
containers.py
File metadata and controls
722 lines (588 loc) · 23.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
from dataclasses import dataclass
from dataclasses_json import dataclass_json, Undefined # type: ignore
from typing import List, Optional, Dict
from enum import Enum
# API endpoints
CONTAINER_DEPLOYMENTS_ENDPOINT = '/container-deployments'
SERVERLESS_COMPUTE_RESOURCES_ENDPOINT = '/serverless-compute-resources'
CONTAINER_REGISTRY_CREDENTIALS_ENDPOINT = '/container-registry-credentials'
SECRETS_ENDPOINT = '/secrets'
class EnvVarType(str, Enum):
PLAIN = "plain"
SECRET = "secret"
class VolumeMountType(str, Enum):
SCRATCH = "scratch"
SECRET = "secret"
MEMORY = "memory"
class ContainerRegistryType(str, Enum):
GCR = "gcr"
DOCKERHUB = "dockerhub"
GITHUB = "ghcr"
AWS_ECR = "aws-ecr"
CUSTOM = "custom"
class ContainerDeploymentStatus(str, Enum):
INITIALIZING = "initializing"
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
PAUSED = "paused"
QUOTA_REACHED = "quota_reached"
IMAGE_PULLING = "image_pulling"
VERSION_UPDATING = "version_updating"
@dataclass_json
@dataclass
class HealthcheckSettings:
"""Settings for container health checking.
:param enabled: Whether health checking is enabled
:param port: Port number to perform health check on
:param path: HTTP path to perform health check on
"""
enabled: bool = True
port: Optional[int] = None
path: Optional[str] = None
@dataclass_json
@dataclass
class EntrypointOverridesSettings:
"""Settings for overriding container entrypoint and command.
:param enabled: Whether entrypoint overrides are enabled
:param entrypoint: List of strings forming the entrypoint command
:param cmd: List of strings forming the command arguments
"""
enabled: bool = True
entrypoint: Optional[List[str]] = None
cmd: Optional[List[str]] = None
@dataclass_json
@dataclass
class EnvVar:
"""Environment variable configuration for containers.
:param name: Name of the environment variable
:param value_or_reference_to_secret: Direct value or reference to a secret
:param type: Type of the environment variable
"""
name: str
value_or_reference_to_secret: str
type: EnvVarType
@dataclass_json(undefined=Undefined.EXCLUDE)
@dataclass
class VolumeMount:
"""Volume mount configuration for containers.
:param type: Type of volume mount
:param mount_path: Path where the volume should be mounted in the container
:param size_in_mb: Size of the volume in megabytes, only used for memory volume mounts
"""
type: VolumeMountType
mount_path: str
size_in_mb: Optional[int] = None
@dataclass_json
@dataclass
class Container:
"""Container configuration for deployment creation and updates.
This class omits the name field which is managed by the system.
:param image: Container image to use
:param exposed_port: Port to expose from the container
:param healthcheck: Optional health check configuration
:param entrypoint_overrides: Optional entrypoint override settings
:param env: Optional list of environment variables
:param volume_mounts: Optional list of volume mounts
"""
image: str
exposed_port: int
healthcheck: Optional[HealthcheckSettings] = None
entrypoint_overrides: Optional[EntrypointOverridesSettings] = None
env: Optional[List[EnvVar]] = None
volume_mounts: Optional[List[VolumeMount]] = None
@dataclass_json
@dataclass
class ContainerInfo:
"""Container configuration for deployments.
This class is read-only and includes the system-managed name field.
:param name: Name of the container (system-managed)
:param image: Container image to use
:param exposed_port: Port to expose from the container
:param healthcheck: Optional health check configuration
:param entrypoint_overrides: Optional entrypoint override settings
:param env: Optional list of environment variables
:param volume_mounts: Optional list of volume mounts
"""
name: str
image: str
exposed_port: int
healthcheck: Optional[HealthcheckSettings] = None
entrypoint_overrides: Optional[EntrypointOverridesSettings] = None
env: Optional[List[EnvVar]] = None
volume_mounts: Optional[List[VolumeMount]] = None
@dataclass_json
@dataclass
class ContainerRegistryCredentials:
"""Credentials for accessing a container registry.
:param name: Name of the credentials
"""
name: str
@dataclass_json
@dataclass
class ContainerRegistrySettings:
"""Settings for container registry access.
:param is_private: Whether the registry is private
:param credentials: Optional credentials for accessing private registry
"""
is_private: bool
credentials: Optional[ContainerRegistryCredentials] = None
@dataclass_json
@dataclass
class ComputeResource:
"""Compute resource configuration.
:param name: Name of the compute resource
:param size: Size of the compute resource
:param is_available: Whether the compute resource is currently available
"""
name: str
size: int
# Made optional since it's only used in API responses
is_available: Optional[bool] = None
@dataclass_json
@dataclass
class ScalingPolicy:
"""Policy for controlling scaling behavior.
:param delay_seconds: Number of seconds to wait before applying scaling action
"""
delay_seconds: int
@dataclass_json
@dataclass
class QueueLoadScalingTrigger:
"""Trigger for scaling based on queue load.
:param threshold: Queue load threshold that triggers scaling
"""
threshold: float
@dataclass_json
@dataclass
class UtilizationScalingTrigger:
"""Trigger for scaling based on resource utilization.
:param enabled: Whether this trigger is enabled
:param threshold: Utilization threshold that triggers scaling
"""
enabled: bool
threshold: Optional[float] = None
@dataclass_json
@dataclass
class ScalingTriggers:
"""Collection of triggers that can cause scaling actions.
:param queue_load: Optional trigger based on queue load
:param cpu_utilization: Optional trigger based on CPU utilization
:param gpu_utilization: Optional trigger based on GPU utilization
"""
queue_load: Optional[QueueLoadScalingTrigger] = None
cpu_utilization: Optional[UtilizationScalingTrigger] = None
gpu_utilization: Optional[UtilizationScalingTrigger] = None
@dataclass_json
@dataclass
class ScalingOptions:
"""Configuration for automatic scaling behavior.
:param min_replica_count: Minimum number of replicas to maintain
:param max_replica_count: Maximum number of replicas allowed
:param scale_down_policy: Policy for scaling down replicas
:param scale_up_policy: Policy for scaling up replicas
:param queue_message_ttl_seconds: Time-to-live for queue messages in seconds
:param concurrent_requests_per_replica: Number of concurrent requests each replica can handle
:param scaling_triggers: Configuration for various scaling triggers
"""
min_replica_count: int
max_replica_count: int
scale_down_policy: ScalingPolicy
scale_up_policy: ScalingPolicy
queue_message_ttl_seconds: int
concurrent_requests_per_replica: int
scaling_triggers: ScalingTriggers
@dataclass_json(undefined=Undefined.EXCLUDE)
@dataclass
class Deployment:
"""Configuration for creating or updating a container deployment.
This class uses Container instead of ContainerInfo to prevent name setting.
:param name: Name of the deployment
:param container_registry_settings: Settings for accessing container registry
:param containers: List of container specifications in the deployment
:param compute: Compute resource configuration
:param is_spot: Whether is spot deployment
:param endpoint_base_url: Optional base URL for the deployment endpoint
:param scaling: Optional scaling configuration
"""
name: str
container_registry_settings: ContainerRegistrySettings
containers: List[Container]
compute: ComputeResource
is_spot: bool = False
endpoint_base_url: Optional[str] = None
scaling: Optional[ScalingOptions] = None
@dataclass_json(undefined=Undefined.EXCLUDE)
@dataclass
class DeploymentInfo:
"""Configuration for a container deployment.
This class is read-only and includes system-managed fields.
:param name: Name of the deployment
:param container_registry_settings: Settings for accessing container registry
:param containers: List of containers in the deployment
:param compute: Compute resource configuration
:param is_spot: Whether is spot deployment
:param endpoint_base_url: Optional base URL for the deployment endpoint
:param scaling: Optional scaling configuration
:param created_at: Timestamp when the deployment was created
"""
name: str
container_registry_settings: ContainerRegistrySettings
containers: List[ContainerInfo]
compute: ComputeResource
is_spot: bool = False
endpoint_base_url: Optional[str] = None
scaling: Optional[ScalingOptions] = None
created_at: Optional[str] = None
@dataclass_json
@dataclass
class ReplicaInfo:
"""Information about a deployment replica.
:param id: Unique identifier of the replica
:param status: Current status of the replica
:param started_at: Timestamp when the replica was started
"""
id: str
status: str
started_at: str
@dataclass_json
@dataclass
class Secret:
"""A secret model class"""
name: str
created_at: str
@dataclass_json
@dataclass
class RegistryCredential:
"""A container registry credential model class"""
name: str
created_at: str
@dataclass_json
@dataclass
class BaseRegistryCredentials:
"""Base class for registry credentials"""
name: str
type: ContainerRegistryType
@dataclass_json
@dataclass
class DockerHubCredentials(BaseRegistryCredentials):
"""Credentials for DockerHub registry"""
username: str
access_token: str
def __init__(self, name: str, username: str, access_token: str):
super().__init__(name=name, type=ContainerRegistryType.DOCKERHUB)
self.username = username
self.access_token = access_token
@dataclass_json
@dataclass
class GithubCredentials(BaseRegistryCredentials):
"""Credentials for GitHub Container Registry"""
username: str
access_token: str
def __init__(self, name: str, username: str, access_token: str):
super().__init__(name=name, type=ContainerRegistryType.GITHUB)
self.username = username
self.access_token = access_token
@dataclass_json
@dataclass
class GCRCredentials(BaseRegistryCredentials):
"""Credentials for Google Container Registry"""
service_account_key: str
def __init__(self, name: str, service_account_key: str):
super().__init__(name=name, type=ContainerRegistryType.GCR)
self.service_account_key = service_account_key
@dataclass_json
@dataclass
class AWSECRCredentials(BaseRegistryCredentials):
"""Credentials for AWS Elastic Container Registry"""
access_key_id: str
secret_access_key: str
region: str
ecr_repo: str
def __init__(self, name: str, access_key_id: str, secret_access_key: str, region: str, ecr_repo: str):
super().__init__(name=name, type=ContainerRegistryType.AWS_ECR)
self.access_key_id = access_key_id
self.secret_access_key = secret_access_key
self.region = region
self.ecr_repo = ecr_repo
@dataclass_json
@dataclass
class CustomRegistryCredentials(BaseRegistryCredentials):
"""Credentials for custom container registries"""
docker_config_json: str
def __init__(self, name: str, docker_config_json: str):
super().__init__(name=name, type=ContainerRegistryType.CUSTOM)
self.docker_config_json = docker_config_json
class ContainersService:
"""Service for managing container deployments"""
def __init__(self, http_client) -> None:
"""Initialize the containers service
:param http_client: HTTP client for making API requests
:type http_client: Any
"""
self.client = http_client
def get_deployments(self) -> List[DeploymentInfo]:
"""Get all deployments
:return: list of deployments
:rtype: List[DeploymentInfo]
"""
response = self.client.get(CONTAINER_DEPLOYMENTS_ENDPOINT)
return [DeploymentInfo.from_dict(deployment, infer_missing=True) for deployment in response.json()]
def get_deployment_by_name(self, deployment_name: str) -> DeploymentInfo:
"""Get a deployment by name
:param deployment_name: name of the deployment
:type deployment_name: str
:return: deployment
:rtype: DeploymentInfo
"""
response = self.client.get(
f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}")
return DeploymentInfo.from_dict(response.json(), infer_missing=True)
def create_deployment(
self,
deployment: Deployment
) -> DeploymentInfo:
"""Create a new deployment
:param deployment: deployment configuration
:type deployment: Deployment
:return: created deployment
:rtype: DeploymentInfo
"""
response = self.client.post(
CONTAINER_DEPLOYMENTS_ENDPOINT,
deployment.to_dict()
)
return DeploymentInfo.from_dict(response.json(), infer_missing=True)
def update_deployment(self, deployment_name: str, deployment: DeploymentInfo) -> DeploymentInfo:
"""Update an existing deployment
:param deployment_name: name of the deployment to update
:type deployment_name: str
:param deployment: updated deployment
:type deployment: DeploymentInfo
:return: updated deployment
:rtype: DeploymentInfo
"""
response = self.client.patch(
f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}",
deployment.to_dict()
)
return DeploymentInfo.from_dict(response.json(), infer_missing=True)
def delete_deployment(self, deployment_name: str) -> None:
"""Delete a deployment
:param deployment_name: name of the deployment to delete
:type deployment_name: str
"""
self.client.delete(
f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}")
def get_deployment_status(self, deployment_name: str) -> ContainerDeploymentStatus:
"""Get deployment status
:param deployment_name: name of the deployment
:type deployment_name: str
:return: deployment status
:rtype: ContainerDeploymentStatus
"""
response = self.client.get(
f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}/status")
return ContainerDeploymentStatus(response.json()["status"])
def restart_deployment(self, deployment_name: str) -> None:
"""Restart a deployment
:param deployment_name: name of the deployment to restart
:type deployment_name: str
"""
self.client.post(
f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}/restart")
def get_deployment_scaling_options(self, deployment_name: str) -> ScalingOptions:
"""Get deployment scaling options
:param deployment_name: name of the deployment
:type deployment_name: str
:return: scaling options
:rtype: ScalingOptions
"""
response = self.client.get(
f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}/scaling")
return ScalingOptions.from_dict(response.json())
def update_deployment_scaling_options(self, deployment_name: str, scaling_options: ScalingOptions) -> ScalingOptions:
"""Update deployment scaling options
:param deployment_name: name of the deployment
:type deployment_name: str
:param scaling_options: new scaling options
:type scaling_options: ScalingOptions
:return: updated scaling options
:rtype: ScalingOptions
"""
response = self.client.patch(
f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}/scaling",
scaling_options.to_dict()
)
return ScalingOptions.from_dict(response.json())
def get_deployment_replicas(self, deployment_name: str) -> List[ReplicaInfo]:
"""Get deployment replicas
:param deployment_name: name of the deployment
:type deployment_name: str
:return: list of replicas information
:rtype: List[ReplicaInfo]
"""
response = self.client.get(
f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}/replicas")
return [ReplicaInfo.from_dict(replica) for replica in response.json()["list"]]
def purge_deployment_queue(self, deployment_name: str) -> None:
"""Purge deployment queue
:param deployment_name: name of the deployment
:type deployment_name: str
"""
self.client.post(
f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}/purge-queue")
def pause_deployment(self, deployment_name: str) -> None:
"""Pause a deployment
:param deployment_name: name of the deployment to pause
:type deployment_name: str
"""
self.client.post(
f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}/pause")
def resume_deployment(self, deployment_name: str) -> None:
"""Resume a deployment
:param deployment_name: name of the deployment to resume
:type deployment_name: str
"""
self.client.post(
f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}/resume")
def get_deployment_environment_variables(self, deployment_name: str) -> Dict[str, List[EnvVar]]:
"""Get deployment environment variables
:param deployment_name: name of the deployment
:type deployment_name: str
:return: dictionary mapping container names to their environment variables
:rtype: Dict[str, List[EnvVar]]
"""
response = self.client.get(
f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}/environment-variables")
result = {}
for item in response.json():
container_name = item["container_name"]
env_vars = item["env"]
result[container_name] = [EnvVar.from_dict(
env_var) for env_var in env_vars]
return result
def add_deployment_environment_variables(self, deployment_name: str, container_name: str, env_vars: List[EnvVar]) -> Dict[str, List[EnvVar]]:
"""Add environment variables to a container
:param deployment_name: name of the deployment
:type deployment_name: str
:param container_name: name of the container
:type container_name: str
:param env_vars: environment variables to add
:type env_vars: List[EnvVar]
:return: updated environment variables
:rtype: Dict[str, List[EnvVar]]
"""
response = self.client.post(
f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}/environment-variables",
{"container_name": container_name, "env": [
env_var.to_dict() for env_var in env_vars]}
)
result = {}
for item in response.json():
container_name = item["container_name"]
env_vars = item["env"]
result[container_name] = [EnvVar.from_dict(
env_var) for env_var in env_vars]
return result
def update_deployment_environment_variables(self, deployment_name: str, container_name: str, env_vars: List[EnvVar]) -> Dict[str, List[EnvVar]]:
"""Update environment variables of a container
:param deployment_name: name of the deployment
:type deployment_name: str
:param container_name: name of the container
:type container_name: str
:param env_vars: updated environment variables
:type env_vars: List[EnvVar]
:return: updated environment variables
:rtype: Dict[str, List[EnvVar]]
"""
response = self.client.patch(
f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}/environment-variables",
{"container_name": container_name, "env": [
env_var.to_dict() for env_var in env_vars]}
)
result = {}
item = response.json()
container_name = item["container_name"]
env_vars = item["env"]
result[container_name] = [EnvVar.from_dict(
env_var) for env_var in env_vars]
return result
def delete_deployment_environment_variables(self, deployment_name: str, container_name: str, env_var_names: List[str]) -> Dict[str, List[EnvVar]]:
"""Delete environment variables from a container
:param deployment_name: name of the deployment
:type deployment_name: str
:param container_name: name of the container
:type container_name: str
:param env_var_names: names of environment variables to delete
:type env_var_names: List[str]
:return: remaining environment variables
:rtype: Dict[str, List[EnvVar]]
"""
response = self.client.delete(
f"{CONTAINER_DEPLOYMENTS_ENDPOINT}/{deployment_name}/environment-variables",
{"container_name": container_name, "env": env_var_names}
)
result = {}
for item in response.json():
container_name = item["container_name"]
env_vars = item["env"]
result[container_name] = [EnvVar.from_dict(
env_var) for env_var in env_vars]
return result
def get_compute_resources(self) -> List[ComputeResource]:
"""Get available compute resources
:return: list of compute resources
:rtype: List[ComputeResource]
"""
response = self.client.get(SERVERLESS_COMPUTE_RESOURCES_ENDPOINT)
resources = []
for resource_group in response.json():
for resource in resource_group:
resources.append(ComputeResource.from_dict(resource))
return resources
def get_secrets(self) -> List[Secret]:
"""Get all secrets
:return: list of secrets
:rtype: List[Secret]
"""
response = self.client.get(SECRETS_ENDPOINT)
return [Secret.from_dict(secret) for secret in response.json()]
def create_secret(self, name: str, value: str) -> None:
"""Create a new secret
:param name: name of the secret
:type name: str
:param value: value of the secret
:type value: str
"""
self.client.post(SECRETS_ENDPOINT, {"name": name, "value": value})
def delete_secret(self, secret_name: str, force: bool = False) -> None:
"""Delete a secret
:param secret_name: name of the secret to delete
:type secret_name: str
:param force: force delete even if secret is in use
:type force: bool
"""
self.client.delete(
f"{SECRETS_ENDPOINT}/{secret_name}", params={"force": str(force).lower()})
def get_registry_credentials(self) -> List[RegistryCredential]:
"""Get all registry credentials
:return: list of registry credentials
:rtype: List[RegistryCredential]
"""
response = self.client.get(CONTAINER_REGISTRY_CREDENTIALS_ENDPOINT)
return [RegistryCredential.from_dict(credential) for credential in response.json()]
def add_registry_credentials(self, credentials: BaseRegistryCredentials) -> None:
"""Add registry credentials
:param credentials: Registry credentials object
:type credentials: BaseRegistryCredentials
"""
data = credentials.to_dict()
self.client.post(CONTAINER_REGISTRY_CREDENTIALS_ENDPOINT, data)
def delete_registry_credentials(self, credentials_name: str) -> None:
"""Delete registry credentials
:param credentials_name: name of the credentials to delete
:type credentials_name: str
"""
self.client.delete(
f"{CONTAINER_REGISTRY_CREDENTIALS_ENDPOINT}/{credentials_name}")