-
Notifications
You must be signed in to change notification settings - Fork 55
Expand file tree
/
Copy pathsecurity_groups.py
More file actions
115 lines (94 loc) · 4.81 KB
/
security_groups.py
File metadata and controls
115 lines (94 loc) · 4.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
from dataclasses import dataclass, asdict
from enum import Enum, auto
from typing import TYPE_CHECKING
from cloudfoundry_client.v3.entities import EntityManager, ToManyRelationship, Entity, ToOneRelationship
if TYPE_CHECKING:
from cloudfoundry_client.client import CloudFoundryClient
class RuleProtocol(Enum):
TCP = auto()
UDP = auto()
ICMP = auto()
ALL = auto()
def __repr__(self):
return '%s' % self.name.lower()
@dataclass
class Rule:
protocol: RuleProtocol
destination: str
ports: str | None = None
type: int | None = None
code: int | None = None
description: str | None = None
log: bool | None = None
@dataclass
class GloballyEnabled:
running: bool | None = None
staging: bool | None = None
class SecurityGroupManager(EntityManager):
def __init__(self, target_endpoint: str, client: "CloudFoundryClient"):
super(SecurityGroupManager, self).__init__(target_endpoint, client, "/v3/security_groups")
def create(self,
name: str,
rules: list[Rule] | None = None,
globally_enabled: GloballyEnabled | None = None,
staging_spaces: ToManyRelationship | None = None,
running_spaces: ToManyRelationship | None = None) -> Entity:
payload = self._generate_payload(name, rules, globally_enabled, staging_spaces, running_spaces)
return super()._create(payload)
def update(self,
security_group_id: str,
name: str | None = None,
rules: list[Rule] | None = None,
globally_enabled: GloballyEnabled | None = None,
staging_spaces: ToManyRelationship | None = None,
running_spaces: ToManyRelationship | None = None) -> Entity:
payload = self._generate_payload(name, rules, globally_enabled, staging_spaces, running_spaces)
return super()._update(security_group_id, payload)
def remove(self, security_group_id: str, asynchronous: bool = True) -> str | None:
return super()._remove(security_group_id, asynchronous)
def bind_running_security_group_to_spaces(self, security_group_id: str, space_guids: ToManyRelationship) \
-> ToManyRelationship:
relationship = "running_spaces"
return self._bind_spaces(security_group_id, space_guids, relationship)
def bind_staging_security_group_to_spaces(self, security_group_id: str, space_guids: ToManyRelationship) \
-> ToManyRelationship:
relationship = "staging_spaces"
return self._bind_spaces(security_group_id, space_guids, relationship)
def unbind_running_security_group_from_space(self, security_group_id: str, space_guid: ToOneRelationship):
relationship = "running_spaces"
return self._unbind_space(security_group_id, space_guid, relationship)
def unbind_staging_security_group_from_space(self, security_group_id: str, space_guid: ToOneRelationship):
relationship = "staging_spaces"
return self._unbind_space(security_group_id, space_guid, relationship)
def _bind_spaces(self, security_group_id: str, space_guids: ToManyRelationship, relationship: str) \
-> ToManyRelationship:
url = "%s%s/%s/relationships/%s" % (self.target_endpoint, self.entity_uri, security_group_id, relationship)
return ToManyRelationship.from_json_object(super()._post(url, space_guids))
def _unbind_space(self, security_group_id: str, space_guid: ToOneRelationship, relationship: str):
url = "%s%s/%s/relationships/%s/%s" \
% (self.target_endpoint, self.entity_uri, security_group_id, relationship, space_guid.guid)
super()._delete(url)
@staticmethod
def _generate_payload(name: str | None,
rules: list[Rule] | None,
globally_enabled: GloballyEnabled | None,
staging_spaces: ToManyRelationship | None,
running_spaces: ToManyRelationship | None):
payload = {}
if name:
payload["name"] = name
if rules:
payload["rules"] = [asdict(rule, dict_factory=lambda x: {k: repr(v) if k == "protocol" else v
for (k, v) in x if v is not None})
for rule in rules]
if globally_enabled:
payload["globally_enabled"] = asdict(globally_enabled,
dict_factory=lambda x: {k: v for (k, v) in x if v is not None})
relationships = dict()
if staging_spaces:
relationships["staging_spaces"] = staging_spaces
if running_spaces:
relationships["running_spaces"] = running_spaces
if len(relationships) > 0:
payload["relationships"] = relationships
return payload