Skip to content

Commit 2a348b9

Browse files
fuc
1 parent 3948a48 commit 2a348b9

File tree

2 files changed

+306
-0
lines changed

2 files changed

+306
-0
lines changed

utils/context_managers.py

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
import hashlib
2+
import os
3+
from pathlib import Path
4+
from typing import TYPE_CHECKING
5+
6+
import git
7+
8+
from config import settings
9+
10+
if TYPE_CHECKING:
11+
from collections.abc import Sequence
12+
from types import TracebackType
13+
from typing import Final
14+
15+
16+
__all__: "Sequence[str]" = (
17+
"EnvVariableDeleter",
18+
"FileTemporaryDeleter",
19+
"TemporarySettingsKeyReplacer",
20+
)
21+
22+
23+
class EnvVariableDeleter:
24+
"""
25+
Context manager that deletes the given environment variable.
26+
27+
The given environment variable is removed from both
28+
the system environment variables list,
29+
and the .env file in this project's root directory.
30+
"""
31+
32+
def __init__(self, env_variable_name: str) -> None:
33+
"""Store the current state of any instances of the stored environment variable."""
34+
self.env_variable_name: str = env_variable_name
35+
36+
PROJECT_ROOT: Final[str | git.PathLike | None] = git.Repo(
37+
".", search_parent_directories=True
38+
).working_tree_dir
39+
if PROJECT_ROOT is None:
40+
NO_ROOT_DIRECTORY_MESSAGE: Final[str] = "Could not locate project root directory."
41+
raise FileNotFoundError(NO_ROOT_DIRECTORY_MESSAGE)
42+
43+
self.env_file_path: Path = PROJECT_ROOT / Path(".env")
44+
self.old_env_value: str | None = os.environ.get(self.env_variable_name)
45+
46+
def __enter__(self) -> None:
47+
"""Delete all stored instances of the stored environment variable."""
48+
if self.env_file_path.is_file():
49+
self.env_file_path = self.env_file_path.rename(
50+
self.env_file_path.parent / Path(".env.original"),
51+
)
52+
53+
if self.old_env_value is not None:
54+
del os.environ[self.env_variable_name]
55+
56+
def __exit__(
57+
self,
58+
exc_type: type[BaseException] | None,
59+
exc_val: BaseException | None,
60+
exc_tb: "TracebackType | None", # noqa: PYI036
61+
) -> None:
62+
"""Restore the deleted environment variable to its previous states."""
63+
if self.env_file_path.is_file():
64+
self.env_file_path.rename(self.env_file_path.parent / Path(".env"))
65+
66+
if self.old_env_value is not None:
67+
os.environ[self.env_variable_name] = self.old_env_value
68+
69+
70+
class TemporarySettingsKeyReplacer:
71+
"""Context manager that temporarily replaces the value at the given settings key."""
72+
73+
NOT_SET: "Final[object]" = object()
74+
75+
@classmethod
76+
def _get_old_settings_value(cls, settings_key_name: str) -> object:
77+
try:
78+
return settings[settings_key_name]
79+
except KeyError:
80+
return cls.NOT_SET
81+
82+
def __init__(self, settings_key_name: str, new_settings_value: object) -> None:
83+
"""Store the current state of the settings value if it exists."""
84+
self.settings_key_name: str = settings_key_name
85+
self.new_settings_value: object = new_settings_value
86+
87+
self.old_settings_value: object = self._get_old_settings_value(
88+
self.settings_key_name,
89+
)
90+
91+
def __enter__(self) -> None:
92+
"""Replace the settings value with the new value provided."""
93+
settings._settings[self.settings_key_name] = self.new_settings_value # noqa: SLF001
94+
95+
def __exit__(
96+
self,
97+
exc_type: type[BaseException] | None,
98+
exc_val: BaseException | None,
99+
exc_tb: "TracebackType | None", # noqa: PYI036
100+
) -> None:
101+
"""Restore the replaced settings value with the original value if it existed."""
102+
if self.old_settings_value is self.NOT_SET:
103+
settings._settings.pop(self.settings_key_name) # noqa: SLF001
104+
105+
else:
106+
settings._settings[self.settings_key_name] = self.old_settings_value # noqa: SLF001
107+
108+
109+
class FileTemporaryDeleter:
110+
"""
111+
Context manager that temporarily deletes the file at the given file path.
112+
113+
The file at the given file path is restored after the context manager exits.
114+
"""
115+
116+
def __init__(self, file_path: Path) -> None:
117+
"""Store the given file path to delete."""
118+
self.file_path: Path = file_path
119+
self._temp_file_path: Path | None = None
120+
121+
def __enter__(self) -> None:
122+
"""Delete the file at the stored file path if that file actually exists."""
123+
if self._temp_file_path is not None:
124+
ALREADY_DELETED_MESSAGE: Final[str] = (
125+
"Given file path has already been deleted by this context manager."
126+
)
127+
raise RuntimeError(ALREADY_DELETED_MESSAGE)
128+
129+
if self.file_path.is_file():
130+
new_file_path: Path = self.file_path.parent / (
131+
f"{self.file_path.name}."
132+
f"{
133+
hashlib.sha1(
134+
str(self.file_path.resolve(strict=False)).encode(),
135+
usedforsecurity=False,
136+
).hexdigest()[:10]
137+
}-"
138+
f"invalid"
139+
)
140+
141+
if new_file_path.exists():
142+
CANNOT_DELETE_FILE_MESSAGE: Final[str] = (
143+
"Cannot delete file at given file path: "
144+
"file already exists at temporary file path."
145+
)
146+
raise RuntimeError(CANNOT_DELETE_FILE_MESSAGE)
147+
148+
self.file_path.replace(new_file_path)
149+
self._temp_file_path = new_file_path
150+
151+
def __exit__(
152+
self,
153+
exc_type: type[BaseException] | None,
154+
exc_val: BaseException | None,
155+
exc_tb: "TracebackType | None", # noqa: PYI036
156+
) -> None:
157+
"""Restore the deleted file at the stored file path."""
158+
if self._temp_file_path is not None:
159+
if not self._temp_file_path.exists():
160+
TEMPORARY_FILE_PATH_NOT_SAVED_CORRECTLY_MESSAGE: Final[str] = (
161+
"Cannot restore the deleted file, "
162+
"because the temporary file path was not stored correctly."
163+
)
164+
raise RuntimeError(TEMPORARY_FILE_PATH_NOT_SAVED_CORRECTLY_MESSAGE)
165+
166+
self._temp_file_path.replace(self.file_path)

utils/random_generators.py

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
"""Module for generating random values for environment variables."""
2+
3+
import abc
4+
import random
5+
import string
6+
from collections.abc import Iterable
7+
from typing import TYPE_CHECKING, Generic, TypeVar, override
8+
9+
if TYPE_CHECKING:
10+
from collections.abc import Iterable, Sequence
11+
12+
13+
__all__: "Sequence[str]" = (
14+
"BaseRandomEnvVariableValueGenerator",
15+
"RandomDiscordBotTokenGenerator",
16+
"RandomDiscordGuildIDGenerator",
17+
"RandomDiscordLogChannelWebhookURLGenerator",
18+
"RandomOrganisationIDGenerator",
19+
)
20+
21+
22+
T = TypeVar("T")
23+
24+
25+
class BaseRandomEnvVariableValueGenerator(Generic[T], abc.ABC):
26+
"""Generates random values for a specific environment variable."""
27+
28+
@classmethod
29+
@abc.abstractmethod
30+
def multiple_values(cls, count: int = 5) -> "Iterable[T]":
31+
"""Return `count` number of random values."""
32+
33+
@classmethod
34+
def single_value(cls) -> T:
35+
"""Return a single random value."""
36+
return next(iter(cls.multiple_values(count=1)))
37+
38+
39+
class RandomDiscordBotTokenGenerator(BaseRandomEnvVariableValueGenerator[str]):
40+
"""Generates random values that are valid Discord bot tokens."""
41+
42+
@classmethod
43+
@override
44+
def multiple_values(cls, count: int = 5) -> "Iterable[str]":
45+
"""Return `count` number of random `DISCORD_BOT_TOKEN` values."""
46+
return (
47+
f"{
48+
''.join(
49+
random.choices(
50+
string.ascii_letters + string.digits, k=random.randint(24, 26)
51+
)
52+
)
53+
}.{''.join(random.choices(string.ascii_letters + string.digits, k=6))}.{
54+
''.join(
55+
random.choices(
56+
string.ascii_letters + string.digits + '_-', k=random.randint(27, 38)
57+
)
58+
)
59+
}" # noqa: S311
60+
for _ in range(count)
61+
)
62+
63+
@classmethod
64+
@override
65+
def single_value(cls) -> str:
66+
"""Return a single random `DISCORD_BOT_TOKEN` value."""
67+
return super().single_value()
68+
69+
70+
class RandomDiscordLogChannelWebhookURLGenerator(BaseRandomEnvVariableValueGenerator[str]):
71+
"""Generates random values that are valid Discord log channel webhook URLs."""
72+
73+
@classmethod
74+
@override
75+
def multiple_values(
76+
cls, count: int = 5, *, with_trailing_slash: bool | None = None
77+
) -> "Iterable[str]":
78+
"""Return `count` number of random `DISCORD_LOG_CHANNEL_WEBHOOK_URL` values."""
79+
return (
80+
f"https://discord.com/api/webhooks/{
81+
''.join(random.choices(string.digits, k=random.randint(17, 20)))
82+
}/{
83+
''.join(
84+
random.choices(
85+
string.ascii_letters + string.digits, k=random.randint(60, 90)
86+
)
87+
)
88+
}{
89+
(
90+
'/'
91+
if with_trailing_slash
92+
else (random.choice(('', '/')) if with_trailing_slash is None else '')
93+
)
94+
}" # noqa: S311
95+
for _ in range(count)
96+
)
97+
98+
@classmethod
99+
@override
100+
def single_value(cls) -> str:
101+
"""Return a single random `DISCORD_LOG_CHANNEL_WEBHOOK_URL` value."""
102+
return super().single_value()
103+
104+
105+
class RandomDiscordGuildIDGenerator(BaseRandomEnvVariableValueGenerator[str]):
106+
"""Generates random values that are valid Discord guild IDs."""
107+
108+
@classmethod
109+
@override
110+
def multiple_values(cls, count: int = 5) -> "Iterable[str]":
111+
"""Return `count` number of random `DISCORD_GUILD_ID` values."""
112+
return (
113+
"".join(random.choices(string.digits, k=random.randint(17, 20))) # noqa: S311
114+
for _ in range(count)
115+
)
116+
117+
@classmethod
118+
@override
119+
def single_value(cls) -> str:
120+
"""Return a single random `DISCORD_GUILD_ID` value."""
121+
return super().single_value()
122+
123+
124+
class RandomOrganisationIDGenerator(BaseRandomEnvVariableValueGenerator[str]):
125+
"""Generates random values that are valid organisation IDs."""
126+
127+
@classmethod
128+
@override
129+
def multiple_values(cls, count: int = 5) -> "Iterable[str]":
130+
"""Return `count` number of random `ORGANISATION_ID` values."""
131+
return (
132+
"".join(random.choices(string.digits, k=random.randint(4, 5))) # noqa: S311
133+
for _ in range(count)
134+
)
135+
136+
@classmethod
137+
@override
138+
def single_value(cls) -> str:
139+
"""Return a single random `ORGANISATION_ID` value."""
140+
return super().single_value()

0 commit comments

Comments
 (0)