From 768ddf12b8b98f6d1e2f27171b24226aa7aff811 Mon Sep 17 00:00:00 2001 From: seladb Date: Sat, 4 Apr 2026 01:24:22 -0700 Subject: [PATCH 1/4] Add domain, URL and email validators --- tests/test_validators.py | 156 +++++++++++++++++++++++++ tortoise/validators.py | 246 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 402 insertions(+) diff --git a/tests/test_validators.py b/tests/test_validators.py index 47946992c..397d9be57 100644 --- a/tests/test_validators.py +++ b/tests/test_validators.py @@ -4,6 +4,18 @@ from tests.testmodels import ValidatorModel from tortoise.exceptions import ValidationError +from tortoise.validators import ( + DomainNameValidator, + EmailValidator, + InvalidDomainName, + InvalidEmailAddress, + InvalidScheme, + InvalidURL, + URLValidator, + validate_domain_name, + validate_email, + validate_url, +) @pytest.mark.asyncio @@ -116,3 +128,147 @@ async def test_update(db): record.min_value_decimal = Decimal("0.9") with pytest.raises(ValidationError): await record.save() + + +@pytest.mark.parametrize( + "value", + [ + "example.com", + "sub.example.com", + "example.co.uk", + "münchen.de", + ], +) +def test_domain_name_validator_valid(value): + validate_domain_name(value) + + +@pytest.mark.parametrize( + "value", + [ + "", + "---.com", + "example-.com", + ], +) +def test_domain_name_validator_invalid(value): + with pytest.raises(InvalidDomainName): + validate_domain_name(value) + + +def test_domain_name_validator_invalid_idn_disabled(): + validator = DomainNameValidator(accept_idna=False) + with pytest.raises(InvalidDomainName): + validator("münchen.de") + + +@pytest.mark.parametrize( + "value", + [ + "http://example.com", + "https://www.example.com/path?query=1", + "ftp://ftp.example.com/file.txt", + "http://localhost:8080", + "http://192.168.1.1", + "http://8.8.8.8:8080", + "https://[::1]", + "https://[2001:db8::1]:443", + "http://user:pass@example.com", + "http://example.com#fragment", + ], +) +def test_url_validator_valid(value): + validate_url(value) + + +@pytest.mark.parametrize( + "value", + [ + "http://example.com", + "https://example.com", + ], +) +def test_url_validator_valid_custom_schemes(value): + validator = URLValidator(allowed_schemes=["http", "https"]) + validator(value) + + +def test_url_validator_invalid_scheme(): + validator = URLValidator(allowed_schemes=["http", "https"]) + with pytest.raises(InvalidScheme): + validator("ftp://example.com") + + +@pytest.mark.parametrize( + "value", + [ + "", + "not-a-url", + "http://", + "http:// space.com", + "http://[::gggg]", + "http://256.1.1.1", + "http://" + "a" * 254 + ".com", + ], +) +def test_url_validator_invalid(value): + with pytest.raises(InvalidURL): + validate_url(value) + + +def test_url_validator_max_length(): + long_url = "http://example.com/" + "a" * 2100 + with pytest.raises(InvalidURL): + validate_url(long_url) + + +@pytest.mark.parametrize( + "value", + [ + "user@example.com", + "user.name@example.com", + "user+tag@example.co.uk", + "user@sub.domain.com", + "user@[192.168.1.1]", + "user@[::1]", + "a+b@example.com", + "a-b@example.com", + "a_b@example.com", + "test@test.co.uk", + ], +) +def test_email_validator_valid(value): + validate_email(value) + + +def test_email_validator_valid_allowed_domains(): + validator = EmailValidator(allowed_domains=["example.com", "test.com"]) + validator("user@example.com") + validator("user@test.com") + + +def test_email_validator_invalid_allowed_domains(): + validator = EmailValidator(allowed_domains=["example.com"]) + validator("user@example.com") + with pytest.raises(InvalidEmailAddress): + validator("user@") + with pytest.raises(InvalidEmailAddress): + validator("user@invalid..com") + + +@pytest.mark.parametrize( + "value", + [ + "", + "not-an-email", + "user@", + "@example.com", + "user@.com", + "user@com.", + "user@com..com", + "a" * 330 + "@example.com", + ], +) +def test_email_validator_invalid(value): + with pytest.raises(InvalidEmailAddress): + validate_email(value) diff --git a/tortoise/validators.py b/tortoise/validators.py index 36785bb74..5138586a4 100644 --- a/tortoise/validators.py +++ b/tortoise/validators.py @@ -1,10 +1,12 @@ from __future__ import annotations import abc +import functools import ipaddress import re from decimal import Decimal from typing import Any +from urllib.parse import urlsplit from tortoise.exceptions import ValidationError @@ -117,6 +119,250 @@ def __call__(self, value: str) -> None: self.regex(value) +UL = "\u00a1-\uffff" +HOSTNAME_REGEX = r"[a-z" + UL + r"0-9](?:[a-z" + UL + r"0-9-]{0,61}[a-z" + UL + r"0-9])?" +# Max length for domain name labels is 63 characters per RFC 1034 sec. 3.1. +DOMAIN_REGEX = r"(?:\.(?!-)[a-z" + UL + r"0-9-]{1,63}(? None: + super().__init__("Invalid domain name") + + +class DomainNameValidator(Validator): + """ + Validator for domain names. + + Validates domain names according to RFC 1034 and RFC 1123. Supports both + ASCII domain names and internationalized domain names (IDN) when accept_idna + is True. + + :param accept_idna: If True, accepts internationalized domain names (IDN). + Defaults to True. + :raises InvalidDomainName: if the value is not a valid domain name. + """ + + ASCII_ONLY_HOSTNAME_REGEX = r"[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?" + ASCII_ONLY_DOMAIN_REGEX = r"(?:\.(?!-)[a-zA-Z0-9-]{1,63}(? re.Pattern[str]: + return re.compile(r"^" + HOSTNAME_REGEX + DOMAIN_REGEX + TLD_REGEX + r"$", re.IGNORECASE) + + @functools.cached_property + def _do_not_accept_idna_regex(self) -> re.Pattern[str]: + return re.compile( + r"^" + + self.ASCII_ONLY_HOSTNAME_REGEX + + self.ASCII_ONLY_DOMAIN_REGEX + + self.ASCII_ONLY_TLD_REGEX + + r"$", + re.IGNORECASE, + ) + + def __init__(self, accept_idna: bool = True) -> None: + self.accept_idna = accept_idna + self.regex = self._accept_idna_regex if accept_idna else self._do_not_accept_idna_regex + + def __call__(self, value: str) -> None: + if len(value) > self.MAX_DOMAIN_LENGTH: + raise InvalidDomainName() + + if not (self.accept_idna or value.isascii()): + raise InvalidDomainName() + + if not self.regex.search(value): + raise InvalidDomainName() + + +validate_domain_name = DomainNameValidator() + + +class InvalidURL(ValidationError): + def __init__(self, message: str = "Invalid URL") -> None: + super().__init__(message) + + +class InvalidScheme(InvalidURL): + def __init__(self, scheme: str) -> None: + super().__init__(f"Invalid scheme: {scheme} is not allowed") + + +class URLValidator(Validator): + """ + Validator for URLs. + + Validates URLs according to RFC 3986. Checks scheme, host, port, and path + components. Supports HTTP, HTTPS, FTP, and FTPS schemes by default. + + :param allowed_schemes: List of allowed URL schemes. Defaults to ["http", "https", "ftp", "ftps"]. + :raises InvalidURL: if the value is not a valid URL. + :raises InvalidScheme: if the URL scheme is not in the allowed list. + """ + + IPV4_REGEX = ( + r"(?:0|25[0-5]|2[0-4][0-9]|1[0-9]?[0-9]?|[1-9][0-9]?)" + r"(?:\.(?:0|25[0-5]|2[0-4][0-9]|1[0-9]?[0-9]?|[1-9][0-9]?)){3}" + ) + SIMPLE_IPV6_REGEX = r"\[[0-9a-f:.]+\]" # (simple regex, validated later) + ADVANCED_IPV6_REGEX = r"^\[(.+)\](?::[0-9]{1,5})?$" + HOST_REGEX = "(" + HOSTNAME_REGEX + DOMAIN_REGEX + TLD_REGEX + "|localhost)" + URL_REGEX = ( + r"^(?:[a-z0-9.+-]*)://" # scheme is validated separately + r"(?:[^\s:@/]+(?::[^\s:@/]*)?@)?" # user:pass authentication + r"(?:" + IPV4_REGEX + "|" + SIMPLE_IPV6_REGEX + "|" + HOST_REGEX + ")" + r"(?::[0-9]{1,5})?" # port + r"(?:[/?#][^\s]*)?" # resource path + r"\Z" + ) + UNSAFE_CHARS = frozenset("\t\r\n") + MAX_URL_LENGTH = 2048 + + # The maximum length of a full host name is 253 characters per RFC 1034 + # section 3.1. It's defined to be 255 bytes or fewer, but this includes + # one byte for the length of the name and one byte for the trailing dot + # that's used to indicate absolute names in DNS. + MAX_HOSTNAME_LENGTH = 253 + + @functools.cached_property + def _url_regex(self) -> re.Pattern[str]: + return re.compile(self.URL_REGEX, re.IGNORECASE) + + def __init__(self, allowed_schemes: list[str] | None = None) -> None: + self.allowed_schemes = allowed_schemes or ["http", "https", "ftp", "ftps"] + + def __call__(self, value: str) -> None: + if len(value) > self.MAX_URL_LENGTH: + raise InvalidURL() + + if self.UNSAFE_CHARS.intersection(value): + raise InvalidURL() + + try: + split_url = urlsplit(value) + except ValueError: + raise InvalidURL() + + if split_url.scheme.lower() not in self.allowed_schemes: + raise InvalidScheme(split_url.scheme.lower()) + + if split_url.hostname is None or len(split_url.hostname) > self.MAX_HOSTNAME_LENGTH: + raise InvalidURL() + + if not self._url_regex.search(value): + raise InvalidURL() + + # Now verify IPv6 in the netloc part + host_match = re.search(self.ADVANCED_IPV6_REGEX, split_url.netloc) + if host_match: + potential_ip = host_match[1] + try: + validate_ipv6_address(potential_ip) + except ValidationError: + raise InvalidURL() + + +validate_url = URLValidator() + + +class InvalidEmailAddress(ValidationError): + def __init__(self) -> None: + super().__init__("Invalid email address") + + +class EmailValidator(Validator): + """ + Validator for email addresses. + + Validates email addresses according to RFC 3696. Checks both the local part + (before @) and domain part (after @). Supports domain allowlisting for + restricting to specific domains. + + :param allowed_domains: List of allowed domain names. If provided, only emails + from these domains will be accepted. + :raises InvalidEmailAddress: if the value is not a valid email address. + """ + + # The maximum length of an email is 320 characters per RFC 3696 section 3 + MAX_EMAIL_LENGTH = 320 + + USER_REGEX = ( + # dot-atom + r"(^[-!#$%&'*+/=?^_`{}|~0-9A-Z]+(\.[-!#$%&'*+/=?^_`{}|~0-9A-Z]+)*\Z" + # quoted-string + r'|^"([\001-\010\013\014\016-\037!#-\[\]-\177]|\\[\001-\011\013\014\016-\177])' + r'*"\Z)' + ) + LITERAL_REGEX = ( + # literal form, ipv4 or ipv6 address (SMTP 4.1.3) + r"\[([A-F0-9:.]+)\]\Z" + ) + + @functools.cached_property + def _user_regex(self) -> re.Pattern[str]: + return re.compile(self.USER_REGEX, re.IGNORECASE) + + @functools.cached_property + def _domain_regex(self) -> re.Pattern[str]: + print("evaluating domain regex!!!") + return re.compile( + r"^" + HOSTNAME_REGEX + DOMAIN_REGEX + TLD_NO_FQDN_REGEX + r"\Z", re.IGNORECASE + ) + + @functools.cached_property + def _literal_regex(self) -> re.Pattern[str]: + return re.compile(self.LITERAL_REGEX, re.IGNORECASE) + + def _validate_domain_part(self, domain_part: str) -> bool: + if self._domain_regex.match(domain_part): + return True + + if literal_match := self._literal_regex.match(domain_part): + ip_address = literal_match[1] + try: + validate_ipv46_address(ip_address) + return True + except ValidationError: + pass + return False + + def __init__(self, allowed_domains: list[str] | None = None) -> None: + self.allowed_domains: list[str] = allowed_domains or [] + + def __call__(self, value: str) -> None: + if "@" not in value or len(value) > self.MAX_EMAIL_LENGTH: + raise InvalidEmailAddress() + + user_part, domain_part = value.rsplit("@", 1) + + if not self._user_regex.match(user_part): + raise InvalidEmailAddress() + + if domain_part not in self.allowed_domains and not self._validate_domain_part(domain_part): + raise InvalidEmailAddress() + + +validate_email = EmailValidator() + + def validate_ipv4_address(value: Any) -> None: """ A validator to validate whether the given value is valid IPv4Address or not. From 033ce38517cc5deee057e663b6c802f810a3136a Mon Sep 17 00:00:00 2001 From: seladb Date: Sat, 4 Apr 2026 01:37:03 -0700 Subject: [PATCH 2/4] Add docstrings --- CHANGELOG.rst | 7 +++++++ tortoise/validators.py | 3 +++ 2 files changed, 10 insertions(+) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 68d944bea..3ce1f0be2 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -8,6 +8,13 @@ Changelog 1.1 === +1.1.8 +----- + +Added +^^^^^ +- Built-in ``DomainNameValidator``, ``URLValidator``, and ``EmailValidator`` classes for common validation patterns. (#2162) + 1.1.7 ----- diff --git a/tortoise/validators.py b/tortoise/validators.py index 5138586a4..b443202a7 100644 --- a/tortoise/validators.py +++ b/tortoise/validators.py @@ -194,6 +194,7 @@ def __call__(self, value: str) -> None: validate_domain_name = DomainNameValidator() +validate_domain_name.__doc__ = "Pre-configured DomainNameValidator instance." class InvalidURL(ValidationError): @@ -281,6 +282,7 @@ def __call__(self, value: str) -> None: validate_url = URLValidator() +validate_url.__doc__ = "Pre-configured URLValidator instance." class InvalidEmailAddress(ValidationError): @@ -361,6 +363,7 @@ def __call__(self, value: str) -> None: validate_email = EmailValidator() +validate_email.__doc__ = "Pre-configured EmailValidator instance." def validate_ipv4_address(value: Any) -> None: From 92fabb84db24b1230354cc14caf62d63f6ae028a Mon Sep 17 00:00:00 2001 From: seladb Date: Tue, 12 May 2026 01:05:16 -0700 Subject: [PATCH 3/4] Address PR comments --- tests/test_validators.py | 7 +++++++ tortoise/validators.py | 14 +++++++------- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/tests/test_validators.py b/tests/test_validators.py index 397d9be57..57073b934 100644 --- a/tests/test_validators.py +++ b/tests/test_validators.py @@ -137,6 +137,11 @@ async def test_update(db): "sub.example.com", "example.co.uk", "münchen.de", + "sub1.sub2.example.org", + "UPPER-CASE.is.ok.net", + "tortoise.github.io", + "example.space", + "❤️.website", ], ) def test_domain_name_validator_valid(value): @@ -149,6 +154,8 @@ def test_domain_name_validator_valid(value): "", "---.com", "example-.com", + "under_line.com", + "💻.tech", ], ) def test_domain_name_validator_invalid(value): diff --git a/tortoise/validators.py b/tortoise/validators.py index b443202a7..d2d0ae055 100644 --- a/tortoise/validators.py +++ b/tortoise/validators.py @@ -1,10 +1,10 @@ from __future__ import annotations import abc -import functools import ipaddress import re from decimal import Decimal +from functools import cached_property from typing import Any from urllib.parse import urlsplit @@ -163,11 +163,11 @@ class DomainNameValidator(Validator): ) MAX_DOMAIN_LENGTH = 255 - @functools.cached_property + @cached_property def _accept_idna_regex(self) -> re.Pattern[str]: return re.compile(r"^" + HOSTNAME_REGEX + DOMAIN_REGEX + TLD_REGEX + r"$", re.IGNORECASE) - @functools.cached_property + @cached_property def _do_not_accept_idna_regex(self) -> re.Pattern[str]: return re.compile( r"^" @@ -243,7 +243,7 @@ class URLValidator(Validator): # that's used to indicate absolute names in DNS. MAX_HOSTNAME_LENGTH = 253 - @functools.cached_property + @cached_property def _url_regex(self) -> re.Pattern[str]: return re.compile(self.URL_REGEX, re.IGNORECASE) @@ -318,18 +318,18 @@ class EmailValidator(Validator): r"\[([A-F0-9:.]+)\]\Z" ) - @functools.cached_property + @cached_property def _user_regex(self) -> re.Pattern[str]: return re.compile(self.USER_REGEX, re.IGNORECASE) - @functools.cached_property + @cached_property def _domain_regex(self) -> re.Pattern[str]: print("evaluating domain regex!!!") return re.compile( r"^" + HOSTNAME_REGEX + DOMAIN_REGEX + TLD_NO_FQDN_REGEX + r"\Z", re.IGNORECASE ) - @functools.cached_property + @cached_property def _literal_regex(self) -> re.Pattern[str]: return re.compile(self.LITERAL_REGEX, re.IGNORECASE) From ca432c9046b70a994f6409e8a7098db94125270b Mon Sep 17 00:00:00 2001 From: seladb Date: Tue, 12 May 2026 01:12:32 -0700 Subject: [PATCH 4/4] Fix `CHANGELOG.rst` --- CHANGELOG.rst | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 5794c7488..eb53a8c9e 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -26,6 +26,7 @@ Fixed Added ^^^^^ +- ``QuerySet.union()`` — SQL UNION query support for combining results from multiple QuerySets, including support for union across different models, ``union(all=True)`` for duplicates, ``order_by()``, ``limit()``, and ``count()``. - Tests for model validators. (#2137) Fixed @@ -1062,7 +1063,7 @@ Removals: 0.15.15 ------- -- Add ability to suppply a ``to_field=`` parameter for FK/O2O to a non-PK but still uniquely indexed remote field. (#287) +- Add ability to supply a ``to_field=`` parameter for FK/O2O to a non-PK but still uniquely indexed remote field. (#287) 0.15.14 ------- @@ -1613,7 +1614,7 @@ Docs/examples: 0.10.9 ------ -- Uses macros on SQLite driver to minimise syncronisation. ``aiosqlite>=0.7.0`` +- Uses macros on SQLite driver to minimise synchronisation. ``aiosqlite>=0.7.0`` - Uses prepared statements for insert, large insert performance increase. - Pre-generate base pypika query object per model, providing general purpose speedup. @@ -1733,7 +1734,7 @@ Docs/examples: - Fixed ``DatetimeField`` and ``DateField`` to work as expected on SQLite. - Added ``PyLint`` plugin. -- Added test class to mange DB state for testing isolation. +- Added test class to manage DB state for testing isolation. 0.8.0 -----