|
13 | 13 | # See the License for the specific language governing permissions and |
14 | 14 | # limitations under the License. |
15 | 15 | """Google Cloud Resource Manager functionality.""" |
16 | | -from typing import TYPE_CHECKING, Dict, List, Any |
| 16 | +from typing import TYPE_CHECKING, Dict, List, Any, Optional |
17 | 17 | from googleapiclient import errors as google_api_errors |
18 | 18 |
|
19 | 19 | from libcloudforensics import logging_utils |
@@ -164,3 +164,149 @@ def GetIamPolicy(self, name: str) -> Dict[str, Any]: |
164 | 164 | resource_client, 'getIamPolicy', request)[0] |
165 | 165 |
|
166 | 166 | return response |
| 167 | + |
| 168 | + def GetOrgPolicy(self, resource: str, constraint: str) -> Dict[str, Any]: |
| 169 | + """Gets a particular Org Policy on a resource. |
| 170 | +
|
| 171 | + Args: |
| 172 | + resource (str): a resource identifier in the format |
| 173 | + resource_type/resource_number e.g. projects/123456789012 where |
| 174 | + project_type is one of projects, folders or organizations. |
| 175 | + constraint (str): the name of the constraint to get. |
| 176 | +
|
| 177 | + Returns: |
| 178 | + Dict[str, Any]: The Org Policy details. |
| 179 | + See https://cloud.google.com/resource-manager/reference/rest/v1/Policy |
| 180 | +
|
| 181 | + Raises: |
| 182 | + TypeError: if an invalid resource type is provided. |
| 183 | + """ |
| 184 | + resource_type = resource.split('/')[0] |
| 185 | + if resource_type not in self.RESOURCE_TYPES: |
| 186 | + raise TypeError('Invalid resource type "{0:s}", resource must be one of ' |
| 187 | + '"projects", "folders" or "organizations" provided in the format ' |
| 188 | + '"resource_type/resource_number".'.format(resource)) |
| 189 | + |
| 190 | + if not constraint.startswith('constraints/'): |
| 191 | + constraint = 'constraints/' + constraint |
| 192 | + |
| 193 | + # Override API version, since this doesn't exist in v2 or v3 |
| 194 | + self.RESOURCE_MANAGER_API_VERSION = 'v1' # pylint: disable=invalid-name |
| 195 | + service = self.GrmApi() |
| 196 | + resource_client = getattr(service, resource_type)() |
| 197 | + response: Dict[str, Any] = resource_client.getOrgPolicy( |
| 198 | + resource=resource, body={'constraint': constraint} |
| 199 | + ).execute() |
| 200 | + return response |
| 201 | + |
| 202 | + def ListOrgPolicy(self, resource: str) -> Dict[str, Any]: |
| 203 | + """Lists all Org Policies on a resource. |
| 204 | +
|
| 205 | + Args: |
| 206 | + resource (str): a resource identifier in the format |
| 207 | + resource_type/resource_number e.g. projects/123456789012 where |
| 208 | + project_type is one of projects, folders or organizations. |
| 209 | +
|
| 210 | + Returns: |
| 211 | + Dict[str, Any]: The Org Policy details. |
| 212 | + See https://cloud.google.com/resource-manager/reference/rest/v1/Policy |
| 213 | +
|
| 214 | + Raises: |
| 215 | + TypeError: if an invalid resource type is provided. |
| 216 | + """ |
| 217 | + resource_type = resource.split('/')[0] |
| 218 | + if resource_type not in self.RESOURCE_TYPES: |
| 219 | + raise TypeError('Invalid resource type "{0:s}", resource must be one of ' |
| 220 | + '"projects", "folders" or "organizations" provided in the format ' |
| 221 | + '"resource_type/resource_number".'.format(resource)) |
| 222 | + |
| 223 | + # Override API version, since this doesn't exist in v2 or v3 |
| 224 | + self.RESOURCE_MANAGER_API_VERSION = 'v1' |
| 225 | + service = self.GrmApi() |
| 226 | + resource_client = getattr(service, resource_type)() |
| 227 | + response: Dict[str, Any] = resource_client.listOrgPolicies( |
| 228 | + resource=resource).execute() |
| 229 | + return response |
| 230 | + |
| 231 | + def SetOrgPolicy( |
| 232 | + self, resource: str, policy: Dict[str, Any], |
| 233 | + etag: Optional[str] = None) -> Dict[str, Any]: |
| 234 | + """Updates the specified Policy on the resource. |
| 235 | + Creates a new Policy for that Constraint on the resource if one does |
| 236 | + not exist. |
| 237 | + |
| 238 | +
|
| 239 | + Args: |
| 240 | + resource (str): a resource identifier in the format |
| 241 | + resource_type/resource_number e.g. projects/123456789012 where |
| 242 | + project_type is one of projects, folders or organizations. |
| 243 | + policy (dict): The policy to create, as per |
| 244 | + https://cloud.google.com/resource-manager/reference/rest/v1/Policy |
| 245 | + etag (str): The current version, for concurrency control. |
| 246 | + Not supplying an etag on the request Policy results in an unconditional |
| 247 | + write of the Policy. |
| 248 | +
|
| 249 | + Returns: |
| 250 | + Dict[str, Any]: The Org Policy that was created. |
| 251 | + https://cloud.google.com/resource-manager/reference/rest/v1/Policy |
| 252 | +
|
| 253 | + Raises: |
| 254 | + TypeError: if an invalid resource type is provided. |
| 255 | + """ |
| 256 | + resource_type = resource.split('/')[0] |
| 257 | + if resource_type not in self.RESOURCE_TYPES: |
| 258 | + raise TypeError('Invalid resource type "{0:s}", resource must be one of ' |
| 259 | + '"projects", "folders" or "organizations" provided in the format ' |
| 260 | + '"resource_type/resource_number".'.format(resource)) |
| 261 | + |
| 262 | + # Override API version, since this doesn't exist in v2 or v3 |
| 263 | + self.RESOURCE_MANAGER_API_VERSION = 'v1' |
| 264 | + service = self.GrmApi() |
| 265 | + resource_client = getattr(service, resource_type)() |
| 266 | + body = {'policy': policy} |
| 267 | + if etag: |
| 268 | + body['policy']['etag'] = etag |
| 269 | + response: Dict[str, Any] = resource_client.setOrgPolicy(resource=resource, |
| 270 | + body=body).execute() |
| 271 | + return response |
| 272 | + |
| 273 | + def DeleteOrgPolicy( |
| 274 | + self, resource: str, constraint: str, etag: Optional[str] = None) -> bool: |
| 275 | + """Removes a particular Org Policy on a resource. |
| 276 | +
|
| 277 | + Args: |
| 278 | + resource (str): a resource identifier in the format |
| 279 | + resource_type/resource_number e.g. projects/123456789012 where |
| 280 | + project_type is one of projects, folders or organizations. |
| 281 | + constraint (str): the name of the constraint to get. |
| 282 | + etag (str): The current version, for concurrency control. |
| 283 | + Not sending an etag will cause the Policy to be cleared blindly. |
| 284 | +
|
| 285 | + Returns: |
| 286 | + bool: True if successful, False otherwise. |
| 287 | +
|
| 288 | + Raises: |
| 289 | + TypeError: if an invalid resource type is provided. |
| 290 | + """ |
| 291 | + resource_type = resource.split('/')[0] |
| 292 | + if resource_type not in self.RESOURCE_TYPES: |
| 293 | + raise TypeError('Invalid resource type "{0:s}", resource must be one of ' |
| 294 | + '"projects", "folders" or "organizations" provided in the format ' |
| 295 | + '"resource_type/resource_number".'.format(resource)) |
| 296 | + |
| 297 | + if not constraint.startswith('constraints/'): |
| 298 | + constraint = 'constraints/' + constraint |
| 299 | + |
| 300 | + # Override API version, since this doesn't exist in v2 or v3 |
| 301 | + self.RESOURCE_MANAGER_API_VERSION = 'v1' |
| 302 | + service = self.GrmApi() |
| 303 | + resource_client = getattr(service, resource_type)() |
| 304 | + body = {'constraint': constraint} |
| 305 | + if etag: |
| 306 | + body['etag'] = etag |
| 307 | + response: Dict[str, Any] = resource_client.clearOrgPolicy( |
| 308 | + resource=resource, body=body).execute() |
| 309 | + if not response: |
| 310 | + return True |
| 311 | + logger.warning("Unable to delete Org Policy: {0}".format(response)) |
| 312 | + return False |
0 commit comments