From 8d98c499dfb231e4860f8fd431b87c217e6d49df Mon Sep 17 00:00:00 2001 From: Samuel Colvin Date: Tue, 8 Nov 2022 09:42:38 +0000 Subject: [PATCH] url parsing in pydantic-core (#4732) * url parsing in pydantic-core * fix tests * fix mypy tests --- pydantic/__init__.py | 2 +- pydantic/_internal/_std_types_schema.py | 12 +- pydantic/networks.py | 577 +++--------------------- pyproject.toml | 2 +- requirements/linting.txt | 4 +- requirements/pyproject-all.txt | 2 +- requirements/pyproject-min.txt | 2 +- tests/mypy/modules/success.py | 17 +- tests/test_networks.py | 459 +++++++------------ tests/test_schema.py | 10 +- tests/test_types.py | 4 +- 11 files changed, 250 insertions(+), 841 deletions(-) diff --git a/pydantic/__init__.py b/pydantic/__init__.py index b512d66..91fcb5f 100644 --- a/pydantic/__init__.py +++ b/pydantic/__init__.py @@ -41,7 +41,7 @@ __all__ = [ 'AnyHttpUrl', 'FileUrl', 'HttpUrl', - 'stricturl', + 'UrlConstraints', 'EmailStr', 'NameEmail', 'IPvAnyAddress', diff --git a/pydantic/_internal/_std_types_schema.py b/pydantic/_internal/_std_types_schema.py index 9aac6bb..4c3d150 100644 --- a/pydantic/_internal/_std_types_schema.py +++ b/pydantic/_internal/_std_types_schema.py @@ -15,7 +15,7 @@ from pathlib import PurePath from typing import Any, Callable from uuid import UUID -from pydantic_core import PydanticCustomError, core_schema +from pydantic_core import MultiHostUrl, PydanticCustomError, Url, core_schema from typing_extensions import get_args from . import _validators @@ -204,3 +204,13 @@ def ip_v6_interface_schema(_schema_generator: GenerateSchema, _obj: Any) -> core @schema_function(IPv6Network) def ip_v6_network_schema(_schema_generator: GenerateSchema, _obj: Any) -> core_schema.FunctionPlainSchema: return core_schema.function_plain_schema(_validators.ip_v6_network_validator) + + +@schema_function(Url) +def url_schema(_schema_generator: GenerateSchema, _obj: Any) -> core_schema.UrlSchema: + return {'type': 'url'} + + +@schema_function(MultiHostUrl) +def multi_host_url_schema(_schema_generator: GenerateSchema, _obj: Any) -> core_schema.MultiHostUrlSchema: + return {'type': 'multi-host-url'} diff --git a/pydantic/networks.py b/pydantic/networks.py index c2db1f7..b5a9389 100644 --- a/pydantic/networks.py +++ b/pydantic/networks.py @@ -1,54 +1,30 @@ from __future__ import annotations as _annotations +import dataclasses as _dataclasses import re from ipaddress import IPv4Address, IPv4Interface, IPv4Network, IPv6Address, IPv6Interface, IPv6Network -from typing import TYPE_CHECKING, Any, Callable, Collection, Generator, Match, Pattern, cast, no_type_check +from typing import TYPE_CHECKING, Any -import typing_extensions -from pydantic_core import PydanticCustomError, core_schema +from pydantic_core import MultiHostUrl, PydanticCustomError, Url, core_schema +from typing_extensions import Annotated -from ._internal import _repr, _utils +from ._internal import _fields, _repr if TYPE_CHECKING: import email_validator - from typing_extensions import TypedDict - - CallableGenerator = Generator[Callable[..., Any], None, None] - - class Parts(TypedDict, total=False): - scheme: str - user: str | None - password: str | None - ipv4: str | None - ipv6: str | None - domain: str | None - port: str | None - path: str | None - query: str | None - fragment: str | None - - class HostParts(TypedDict, total=False): - host: str - tld: str | None - host_type: str | None - port: str | None - rebuild: bool NetworkType = str | bytes | int | tuple[str | bytes | int, str | int] # type: ignore[misc] else: email_validator = None - class Parts(dict): - pass - __all__ = [ 'AnyUrl', 'AnyHttpUrl', 'FileUrl', 'HttpUrl', - 'stricturl', + 'UrlConstraints', 'EmailStr', 'NameEmail', 'IPvAnyAddress', @@ -63,495 +39,58 @@ __all__ = [ 'validate_email', ] -_url_regex_cache = None -_multi_host_url_regex_cache = None -_ascii_domain_regex_cache = None -_int_domain_regex_cache = None -_host_regex_cache = None -_host_regex = ( - r'(?:' - r'(?P(?:\d{1,3}\.){3}\d{1,3})(?=$|[/:#?])|' # ipv4 - r'(?P\[[A-F0-9]*:[A-F0-9:]+\])(?=$|[/:#?])|' # ipv6 - r'(?P[^\s/:?#]+)' # domain, validation occurs later - r')?' - r'(?::(?P\d+))?' # port -) -_scheme_regex = r'(?:(?P[a-z][a-z0-9+\-.]+)://)?' # scheme https://tools.ietf.org/html/rfc3986#appendix-A -_user_info_regex = r'(?:(?P[^\s:/]*)(?::(?P[^\s/]*))?@)?' -_path_regex = r'(?P/[^\s?#]*)?' -_query_regex = r'(?:\?(?P[^\s#]*))?' -_fragment_regex = r'(?:#(?P[^\s#]*))?' - - -def url_regex() -> Pattern[str]: - global _url_regex_cache - if _url_regex_cache is None: - _url_regex_cache = re.compile( - rf'{_scheme_regex}{_user_info_regex}{_host_regex}{_path_regex}{_query_regex}{_fragment_regex}', - re.IGNORECASE, - ) - return _url_regex_cache - - -def multi_host_url_regex() -> Pattern[str]: - """ - Compiled multi host url regex. - - Additionally to `url_regex` it allows to match multiple hosts. - E.g. host1.db.net,host2.db.net - """ - global _multi_host_url_regex_cache - if _multi_host_url_regex_cache is None: - _multi_host_url_regex_cache = re.compile( - rf'{_scheme_regex}{_user_info_regex}' - r'(?P([^/]*))' # validation occurs later - rf'{_path_regex}{_query_regex}{_fragment_regex}', - re.IGNORECASE, - ) - return _multi_host_url_regex_cache - - -def ascii_domain_regex() -> Pattern[str]: - global _ascii_domain_regex_cache - if _ascii_domain_regex_cache is None: - ascii_chunk = r'[_0-9a-z](?:[-_0-9a-z]{0,61}[_0-9a-z])?' - ascii_domain_ending = r'(?P\.[a-z]{2,63})?\.?' - _ascii_domain_regex_cache = re.compile( - fr'(?:{ascii_chunk}\.)*?{ascii_chunk}{ascii_domain_ending}', re.IGNORECASE - ) - return _ascii_domain_regex_cache - - -def int_domain_regex() -> Pattern[str]: - global _int_domain_regex_cache - if _int_domain_regex_cache is None: - int_chunk = r'[_0-9a-\U00040000](?:[-_0-9a-\U00040000]{0,61}[_0-9a-\U00040000])?' - int_domain_ending = r'(?P(\.[^\W\d_]{2,63})|(\.(?:xn--)[_0-9a-z-]{2,63}))?\.?' - _int_domain_regex_cache = re.compile(fr'(?:{int_chunk}\.)*?{int_chunk}{int_domain_ending}', re.IGNORECASE) - return _int_domain_regex_cache - - -def host_regex() -> Pattern[str]: - global _host_regex_cache - if _host_regex_cache is None: - _host_regex_cache = re.compile( - _host_regex, - re.IGNORECASE, - ) - return _host_regex_cache - - -class AnyUrl(str): - strip_whitespace = True - min_length = 1 - max_length = 2**16 - allowed_schemes: Collection[str] | None = None - tld_required: bool = False - user_required: bool = False - host_required: bool = True - hidden_parts: set[str] = set() - - __slots__ = ('scheme', 'user', 'password', 'host', 'tld', 'host_type', 'port', 'path', 'query', 'fragment') - - @no_type_check - def __new__(cls, url: str | None, **kwargs) -> object: - return str.__new__(cls, cls.build(**kwargs) if url is None else url) - - def __init__( - self, - url: str, - *, - scheme: str, - user: str | None = None, - password: str | None = None, - host: str | None = None, - tld: str | None = None, - host_type: str = 'domain', - port: str | None = None, - path: str | None = None, - query: str | None = None, - fragment: str | None = None, - ) -> None: - str.__init__(url) - self.scheme = scheme - self.user = user - self.password = password - self.host = host - self.tld = tld - self.host_type = host_type - self.port = port - self.path = path - self.query = query - self.fragment = fragment - - @classmethod - def build( - cls, - *, - scheme: str, - user: str | None = None, - password: str | None = None, - host: str, - port: str | None = None, - path: str | None = None, - query: str | None = None, - fragment: str | None = None, - **_kwargs: str, - ) -> str: - parts = Parts( # type: ignore[misc] - scheme=scheme, - user=user, - password=password, - host=host, - port=port, - path=path, - query=query, - fragment=fragment, - **_kwargs, - ) - - url = scheme + '://' - if user: - url += user - if password: - url += ':' + password - if user or password: - url += '@' - url += host - if port and ('port' not in cls.hidden_parts or cls.get_default_parts(parts).get('port') != port): - url += ':' + port - if path: - url += path - if query: - url += '?' + query - if fragment: - url += '#' + fragment - return url - - @classmethod - def __modify_schema__(cls, field_schema: dict[str, Any]) -> None: - _utils.update_not_none(field_schema, minLength=cls.min_length, maxLength=cls.max_length, format='uri') - - @classmethod - def __get_pydantic_validation_schema__(cls, **_kwargs: Any) -> core_schema.FunctionSchema: - return core_schema.FunctionSchema( - type='function', - mode='after', - function=cls.validate, - schema=core_schema.StringSchema( - type='str', - min_length=cls.min_length, - max_length=cls.max_length, - strip_whitespace=cls.strip_whitespace, - ), - ) - - @classmethod - def validate(cls, __input_value: str, **_kwargs: Any) -> str | AnyUrl: - if __input_value.__class__ == cls: - return __input_value - url = __input_value - m = cls._match_url(url) - # the regex should always match, if it doesn't please report with details of the URL tried - assert m, 'URL regex failed unexpectedly' - - original_parts = cast('Parts', m.groupdict()) - parts = cls.apply_default_parts(original_parts) - parts = cls.validate_parts(parts) - - if m.end() != len(url): - raise PydanticCustomError( - 'url.extra', - 'URL invalid, extra characters found after valid URL: {extra}', - {'extra': repr(url[m.end() :])}, - ) - - return cls._build_url(m, url, parts) - - @classmethod - def _build_url(cls, m: Match[str], url: str, parts: 'Parts') -> 'AnyUrl': - """ - Validate hosts and build the AnyUrl object. Split from `validate` so this method - can be altered in `MultiHostDsn`. - """ - host, tld, host_type, rebuild = cls.validate_host(parts) - - return cls( - None if rebuild else url, - scheme=parts['scheme'], - user=parts['user'], - password=parts['password'], - host=host, - tld=tld, - host_type=host_type, - port=parts['port'], - path=parts['path'], - query=parts['query'], - fragment=parts['fragment'], - ) - - @staticmethod - def _match_url(url: str) -> Match[str] | None: - return url_regex().match(url) - - @staticmethod - def _validate_port(port: str | None) -> None: - if port is not None and int(port) > 65_535: - raise PydanticCustomError('url.port', 'URL port invalid, port cannot exceed 65535') - - @classmethod - def validate_parts(cls, parts: 'Parts', validate_port: bool = True) -> 'Parts': - """ - A method used to validate parts of a URL. - Could be overridden to set default values for parts if missing - """ - scheme = parts['scheme'] - if scheme is None: - raise PydanticCustomError('url.scheme', 'invalid or missing URL scheme') - - if cls.allowed_schemes and scheme.lower() not in cls.allowed_schemes: - raise PydanticCustomError( - 'url.scheme', - 'URL scheme not permitted', - {'allowed_schemes': ', '.join(sorted(set(cls.allowed_schemes)))}, - ) - - if validate_port: - cls._validate_port(parts['port']) - - user = parts['user'] - if cls.user_required and user is None: - raise PydanticCustomError('url.userinfo', 'userinfo required in URL but missing') - return parts - - @classmethod - def validate_host(cls, parts: 'Parts') -> tuple[str, str | None, str, bool]: - tld, host_type, rebuild = None, None, False - for f in ('domain', 'ipv4', 'ipv6'): - host = parts[f] # type: ignore[literal-required] - if host: - host_type = f - break - - if host is None: - if cls.host_required: - raise PydanticCustomError('url.host', 'URL host invalid') - elif host_type == 'domain': - is_international = False - d = ascii_domain_regex().fullmatch(host) - if d is None: - d = int_domain_regex().fullmatch(host) - if d is None: - raise PydanticCustomError('url.host', 'URL host invalid') - is_international = True - - tld = d.group('tld') - if tld is None and not is_international: - d = int_domain_regex().fullmatch(host) - assert d is not None - tld = d.group('tld') - is_international = True - - if tld is not None: - tld = tld[1:] - elif cls.tld_required: - raise PydanticCustomError('url.host', 'URL host invalid, top level domain required') - - if is_international: - host_type = 'int_domain' - rebuild = True - host = host.encode('idna').decode('ascii') - if tld is not None: - tld = tld.encode('idna').decode('ascii') - - return host, tld, host_type, rebuild # type: ignore - - @staticmethod - def get_default_parts(parts: 'Parts') -> 'Parts': - return {} - - @classmethod - def apply_default_parts(cls, parts: 'Parts') -> 'Parts': - for key, value in cls.get_default_parts(parts).items(): - if not parts[key]: # type: ignore[literal-required] - parts[key] = value # type: ignore[literal-required] - return parts - - def __repr__(self) -> str: - extra = ', '.join(f'{n}={getattr(self, n)!r}' for n in self.__slots__ if getattr(self, n) is not None) - return f'{self.__class__.__name__}({super().__repr__()}, {extra})' - - -class AnyHttpUrl(AnyUrl): - allowed_schemes = {'http', 'https'} - - __slots__ = () - - -class HttpUrl(AnyHttpUrl): - tld_required = True - # https://stackoverflow.com/questions/417142/what-is-the-maximum-length-of-a-url-in-different-browsers - max_length = 2083 - hidden_parts = {'port'} - - @staticmethod - def get_default_parts(parts: 'Parts') -> 'Parts': - return {'port': '80' if parts['scheme'] == 'http' else '443'} - - -class FileUrl(AnyUrl): - allowed_schemes = {'file'} - host_required = False - - __slots__ = () - - -class MultiHostDsn(AnyUrl): - __slots__ = AnyUrl.__slots__ + ('hosts',) - - def __init__(self, *args: Any, hosts: list[HostParts] | None = None, **kwargs: Any): - super().__init__(*args, **kwargs) - self.hosts = hosts - - @staticmethod - def _match_url(url: str) -> Match[str] | None: - return multi_host_url_regex().match(url) - - @classmethod - def validate_parts(cls, parts: 'Parts', validate_port: bool = True) -> 'Parts': - return super().validate_parts(parts, validate_port=False) - - @classmethod - def _build_url(cls, m: Match[str], url: str, parts: 'Parts') -> 'MultiHostDsn': - hosts_parts: list[HostParts] = [] - host_re = host_regex() - for host in m.groupdict()['hosts'].split(','): - d: Parts = host_re.match(host).groupdict() # type: ignore - host, tld, host_type, rebuild = cls.validate_host(d) - port = d.get('port') - cls._validate_port(port) - hosts_parts.append( - { - 'host': host, - 'host_type': host_type, - 'tld': tld, - 'rebuild': rebuild, - 'port': port, - } - ) - - if len(hosts_parts) > 1: - return cls( - None if any([hp['rebuild'] for hp in hosts_parts]) else url, - scheme=parts['scheme'], - user=parts['user'], - password=parts['password'], - path=parts['path'], - query=parts['query'], - fragment=parts['fragment'], - host_type=None, - hosts=hosts_parts, - ) - else: - # backwards compatibility with single host - host_part = hosts_parts[0] - return cls( - None if host_part['rebuild'] else url, - scheme=parts['scheme'], - user=parts['user'], - password=parts['password'], - host=host_part['host'], - tld=host_part['tld'], - host_type=host_part['host_type'], - port=host_part.get('port'), - path=parts['path'], - query=parts['query'], - fragment=parts['fragment'], - ) - - -class PostgresDsn(MultiHostDsn): - allowed_schemes = { - 'postgres', - 'postgresql', - 'postgresql+asyncpg', - 'postgresql+pg8000', - 'postgresql+psycopg', - 'postgresql+psycopg2', - 'postgresql+psycopg2cffi', - 'postgresql+py-postgresql', - 'postgresql+pygresql', - } - user_required = True - - __slots__ = () - - -class CockroachDsn(AnyUrl): - allowed_schemes = { - 'cockroachdb', - 'cockroachdb+psycopg2', - 'cockroachdb+asyncpg', - } - user_required = True - - -class AmqpDsn(AnyUrl): - allowed_schemes = {'amqp', 'amqps'} - host_required = False - - -class RedisDsn(AnyUrl): - __slots__ = () - allowed_schemes = {'redis', 'rediss'} - host_required = False - - @staticmethod - def get_default_parts(parts: 'Parts') -> 'Parts': - return { - 'domain': 'localhost' if not (parts['ipv4'] or parts['ipv6']) else '', - 'port': '6379', - 'path': '/0', - } - - -class MongoDsn(AnyUrl): - allowed_schemes = {'mongodb'} - - # TODO: Needed to generic "Parts" for "Replica Set", "Sharded Cluster", and other mongodb deployment modes - @staticmethod - def get_default_parts(parts: 'Parts') -> 'Parts': - return {'port': '27017'} - - -class KafkaDsn(AnyUrl): - allowed_schemes = {'kafka'} - - @staticmethod - def get_default_parts(parts: 'Parts') -> 'Parts': - return {'domain': 'localhost', 'port': '9092'} - - -def stricturl( - *, - strip_whitespace: bool = True, - min_length: int = 1, - max_length: int = 2**16, - tld_required: bool = True, - host_required: bool = True, - allowed_schemes: Collection[str] | None = None, -) -> type[AnyUrl]: - # use kwargs then define conf in a dict to aid with IDE type hinting - namespace = dict( - strip_whitespace=strip_whitespace, - min_length=min_length, - max_length=max_length, - tld_required=tld_required, - host_required=host_required, - allowed_schemes=allowed_schemes, - ) - return type('UrlValue', (AnyUrl,), namespace) +@_dataclasses.dataclass +class UrlConstraints(_fields.PydanticMetadata): + max_length: int | None = None + allowed_schemes: list[str] | None = None + host_required: bool | None = None + default_host: str | None = None + default_port: int | None = None + default_path: str | None = None + + +AnyUrl = Url +# host_required is false because all schemes are "special" so host is required by rust-url automatically +AnyHttpUrl = Annotated[Url, UrlConstraints(allowed_schemes=['http', 'https'])] +HttpUrl = Annotated[Url, UrlConstraints(max_length=2083, allowed_schemes=['http', 'https'])] +FileUrl = Annotated[Url, UrlConstraints(allowed_schemes=['file'])] +PostgresDsn = Annotated[ + MultiHostUrl, + UrlConstraints( + host_required=True, + allowed_schemes=[ + 'postgres', + 'postgresql', + 'postgresql+asyncpg', + 'postgresql+pg8000', + 'postgresql+psycopg', + 'postgresql+psycopg2', + 'postgresql+psycopg2cffi', + 'postgresql+py-postgresql', + 'postgresql+pygresql', + ], + ), +] + +CockroachDsn = Annotated[ + Url, + UrlConstraints( + host_required=True, + allowed_schemes=[ + 'cockroachdb', + 'cockroachdb+psycopg2', + 'cockroachdb+asyncpg', + ], + ), +] +AmqpDsn = Annotated[Url, UrlConstraints(allowed_schemes=['amqp', 'amqps'])] +RedisDsn = Annotated[ + Url, + UrlConstraints(allowed_schemes=['redis', 'rediss'], default_host='localhost', default_port=6379, default_path='/0'), +] +MongoDsn = Annotated[MultiHostUrl, UrlConstraints(allowed_schemes=['mongodb', 'mongodb+srv'], default_port=27017)] +KafkaDsn = Annotated[Url, UrlConstraints(allowed_schemes=['kafka'], default_host='localhost', default_port=9092)] def import_email_validator() -> None: @@ -563,7 +102,7 @@ def import_email_validator() -> None: if TYPE_CHECKING: - EmailStr = typing_extensions.Annotated[str, ...] + EmailStr = Annotated[str, ...] else: class EmailStr: diff --git a/pyproject.toml b/pyproject.toml index 21e7b02..ed2f34d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,7 +51,7 @@ classifiers = [ requires-python = '>=3.7' dependencies = [ 'typing-extensions>=4.1.0', - 'pydantic-core>=0.6.0', + 'pydantic-core>=0.7.1', 'annotated-types>=0.4.0', ] optional-dependencies = { email = ['email-validator>=1.0.3'] } diff --git a/requirements/linting.txt b/requirements/linting.txt index 292c1ee..f04ca6f 100644 --- a/requirements/linting.txt +++ b/requirements/linting.txt @@ -55,7 +55,7 @@ pre-commit==2.20.0 # via -r requirements/linting.in pycodestyle==2.9.1 # via flake8 -pydantic-core==0.6.0 +pydantic-core==0.7.1 # via -r requirements/linting.in pyflakes==2.5.0 # via flake8 @@ -63,7 +63,7 @@ pyupgrade==2.37.3 # via -r requirements/linting.in pyyaml==6.0 # via pre-commit -ruff==0.0.95 +ruff==0.0.105 # via -r requirements/linting.in sortedcontainers==2.4.0 # via hypothesis diff --git a/requirements/pyproject-all.txt b/requirements/pyproject-all.txt index 0bf4654..08c3f82 100644 --- a/requirements/pyproject-all.txt +++ b/requirements/pyproject-all.txt @@ -12,7 +12,7 @@ email-validator==1.2.1 # via pydantic (pyproject.toml) idna==3.3 # via email-validator -pydantic-core==0.6.0 +pydantic-core==0.7.1 # via pydantic (pyproject.toml) typing-extensions==4.3.0 # via diff --git a/requirements/pyproject-min.txt b/requirements/pyproject-min.txt index fde3861..3a40f92 100644 --- a/requirements/pyproject-min.txt +++ b/requirements/pyproject-min.txt @@ -6,7 +6,7 @@ # annotated-types==0.4.0 # via pydantic (pyproject.toml) -pydantic-core==0.6.0 +pydantic-core==0.7.1 # via pydantic (pyproject.toml) typing-extensions==4.3.0 # via diff --git a/tests/mypy/modules/success.py b/tests/mypy/modules/success.py index 386a56c..7ab50ac 100644 --- a/tests/mypy/modules/success.py +++ b/tests/mypy/modules/success.py @@ -9,7 +9,8 @@ from pathlib import Path, PurePath from typing import Any, Dict, ForwardRef, Generic, List, Optional, Type, TypeVar from uuid import UUID -from typing_extensions import TypedDict +from pydantic_core import Url +from typing_extensions import Annotated, TypedDict from pydantic import ( UUID1, @@ -35,8 +36,8 @@ from pydantic import ( StrictFloat, StrictInt, StrictStr, + UrlConstraints, root_validator, - stricturl, validate_arguments, validator, ) @@ -238,9 +239,15 @@ validated.my_dir_path_str.absolute() validated.my_json['hello'].capitalize() validated.my_json_list[0].capitalize() -stricturl(allowed_schemes={'http'}) -stricturl(allowed_schemes=frozenset({'http'})) -stricturl(allowed_schemes=('s3', 's3n', 's3a')) + +class UrlModel(BaseModel): + x: Annotated[Url, UrlConstraints(allowed_schemes=['http'])] = Field(None) + y: Annotated[Url, UrlConstraints(allowed_schemes=['http'])] = Field(None) + z: Annotated[Url, UrlConstraints(allowed_schemes=['s3', 's3n', 's3a'])] = Field(None) + + +url_model = UrlModel(x='http://example.com') +assert url_model.x.host == 'example.com' class SomeDict(TypedDict): diff --git a/tests/test_networks.py b/tests/test_networks.py index 566f817..2ca8018 100644 --- a/tests/test_networks.py +++ b/tests/test_networks.py @@ -1,9 +1,9 @@ import pytest -from pydantic_core import PydanticCustomError +from pydantic_core import PydanticCustomError, Url +from typing_extensions import Annotated from pydantic import ( AmqpDsn, - AnyHttpUrl, AnyUrl, BaseModel, CockroachDsn, @@ -14,8 +14,9 @@ from pydantic import ( NameEmail, PostgresDsn, RedisDsn, + Strict, + UrlConstraints, ValidationError, - stricturl, ) from pydantic.networks import validate_email @@ -70,7 +71,7 @@ except ImportError: 'http://www.cwi.nl:80/%7Eguido/Python.html', 'https://www.python.org/путь', 'http://андрей@example.com', - AnyUrl('https://example.com', scheme='https', host='example.com'), + # AnyUrl('https://example.com', scheme='https', host='example.com'), 'https://exam_ple.com/', 'http://twitter.com/@handle/', 'http://11.11.11.11.example.com/action', @@ -94,56 +95,35 @@ def test_any_url_success(value): @pytest.mark.parametrize( - 'value,err_kind,err_msg,err_ctx', + 'value,err_type,err_msg', [ - ('http:///example.com/', 'url.host', 'URL host invalid', None), - ('https:///example.com/', 'url.host', 'URL host invalid', None), - ('http://.example.com:8000/foo', 'url.host', 'URL host invalid', None), - ('https://example.org\\', 'url.host', 'URL host invalid', None), - ('https://exampl$e.org', 'url.host', 'URL host invalid', None), - ('http://??', 'url.host', 'URL host invalid', None), - ('http://.', 'url.host', 'URL host invalid', None), - ('http://..', 'url.host', 'URL host invalid', None), + ('http:///', 'url_parsing', 'Input should be a valid URL, empty host'), + ('http://??', 'url_parsing', 'Input should be a valid URL, empty host'), ( 'https://example.org more', - 'url.extra', - "URL invalid, extra characters found after valid URL: ' more'", - {'extra': "' more'"}, + 'url_parsing', + 'Input should be a valid URL, invalid domain character', ), - ('$https://example.org', 'url.scheme', 'invalid or missing URL scheme', None), - ('../icons/logo.gif', 'url.scheme', 'invalid or missing URL scheme', None), - ('abc', 'url.scheme', 'invalid or missing URL scheme', None), - ('..', 'url.scheme', 'invalid or missing URL scheme', None), - ('/', 'url.scheme', 'invalid or missing URL scheme', None), - ('+http://example.com/', 'url.scheme', 'invalid or missing URL scheme', None), - ('ht*tp://example.com/', 'url.scheme', 'invalid or missing URL scheme', None), - (' ', 'string_too_short', 'String should have at least 1 characters', {'min_length': 1}), - ('', 'string_too_short', 'String should have at least 1 characters', {'min_length': 1}), - (None, 'string_type', 'Input should be a valid string', None), + ('$https://example.org', 'url_parsing', 'Input should be a valid URL, relative URL without a base'), + ('../icons/logo.gif', 'url_parsing', 'Input should be a valid URL, relative URL without a base'), + ('abc', 'url_parsing', 'Input should be a valid URL, relative URL without a base'), + ('..', 'url_parsing', 'Input should be a valid URL, relative URL without a base'), + ('/', 'url_parsing', 'Input should be a valid URL, relative URL without a base'), + ('+http://example.com/', 'url_parsing', 'Input should be a valid URL, relative URL without a base'), + ('ht*tp://example.com/', 'url_parsing', 'Input should be a valid URL, relative URL without a base'), + (' ', 'url_parsing', 'Input should be a valid URL, relative URL without a base'), + ('', 'url_parsing', 'Input should be a valid URL, relative URL without a base'), + (None, 'url_type', 'URL input should be a string or URL'), ( 'http://2001:db8::ff00:42:8329', - 'url.extra', - "URL invalid, extra characters found after valid URL: ':db8::ff00:42:8329'", - {'extra': "':db8::ff00:42:8329'"}, + 'url_parsing', + 'Input should be a valid URL, invalid port number', ), - ('http://[192.168.1.1]:8329', 'url.host', 'URL host invalid', None), - ('http://example.com:99999', 'url.port', 'URL port invalid, port cannot exceed 65535', None), - ( - 'http://example##', - 'url.extra', - "URL invalid, extra characters found after valid URL: '#'", - {'extra': "'#'"}, - ), - ( - 'http://example/##', - 'url.extra', - "URL invalid, extra characters found after valid URL: '#'", - {'extra': "'#'"}, - ), - ('file:///foo/bar', 'url.host', 'URL host invalid', None), + ('http://[192.168.1.1]:8329', 'url_parsing', 'Input should be a valid URL, invalid IPv6 address'), + ('http://example.com:99999', 'url_parsing', 'Input should be a valid URL, invalid port number'), ], ) -def test_any_url_invalid(value, err_kind, err_msg, err_ctx): +def test_any_url_invalid(value, err_type, err_msg): class Model(BaseModel): v: AnyUrl @@ -152,9 +132,7 @@ def test_any_url_invalid(value, err_kind, err_msg, err_ctx): assert len(exc_info.value.errors()) == 1, exc_info.value.errors() error = exc_info.value.errors()[0] # debug(error) - assert error['type'] == err_kind, value - assert error['msg'] == err_msg, value - assert error.get('ctx') == err_ctx, value + assert {'type': error['type'], 'msg': error['msg']} == {'type': err_type, 'msg': err_msg} def validate_url(s): @@ -166,30 +144,22 @@ def validate_url(s): def test_any_url_parts(): url = validate_url('http://example.org') - assert str(url) == 'http://example.org' - assert repr(url) == "AnyUrl('http://example.org', scheme='http', host='example.org', tld='org', host_type='domain')" + assert str(url) == 'http://example.org/' + assert repr(url) == "Url('http://example.org/')" assert url.scheme == 'http' assert url.host == 'example.org' - assert url.tld == 'org' - assert url.host_type == 'domain' - assert url.port is None - assert url == AnyUrl('http://example.org', scheme='https', host='example.org') + assert url.port == 80 def test_url_repr(): url = validate_url('http://user:password@example.org:1234/the/path/?query=here#fragment=is;this=bit') assert str(url) == 'http://user:password@example.org:1234/the/path/?query=here#fragment=is;this=bit' - assert repr(url) == ( - "AnyUrl('http://user:password@example.org:1234/the/path/?query=here#fragment=is;this=bit', " - "scheme='http', user='user', password='password', host='example.org', tld='org', host_type='domain', " - "port='1234', path='/the/path/', query='query=here', fragment='fragment=is;this=bit')" - ) + assert repr(url) == "Url('http://user:password@example.org:1234/the/path/?query=here#fragment=is;this=bit')" assert url.scheme == 'http' - assert url.user == 'user' + assert url.username == 'user' assert url.password == 'password' assert url.host == 'example.org' - assert url.host_type == 'domain' - assert url.port == '1234' + assert url.port == 1234 assert url.path == '/the/path/' assert url.query == 'query=here' assert url.fragment == 'fragment=is;this=bit' @@ -199,9 +169,8 @@ def test_ipv4_port(): url = validate_url('ftp://123.45.67.8:8329/') assert url.scheme == 'ftp' assert url.host == '123.45.67.8' - assert url.host_type == 'ipv4' - assert url.port == '8329' - assert url.user is None + assert url.port == 8329 + assert url.username is None assert url.password is None @@ -209,9 +178,8 @@ def test_ipv4_no_port(): url = validate_url('ftp://123.45.67.8') assert url.scheme == 'ftp' assert url.host == '123.45.67.8' - assert url.host_type == 'ipv4' - assert url.port is None - assert url.user is None + assert url.port == 21 + assert url.username is None assert url.password is None @@ -219,36 +187,32 @@ def test_ipv6_port(): url = validate_url('wss://[2001:db8::ff00:42]:8329') assert url.scheme == 'wss' assert url.host == '[2001:db8::ff00:42]' - assert url.host_type == 'ipv6' - assert url.port == '8329' + assert url.port == 8329 def test_int_domain(): url = validate_url('https://£££.org') assert url.host == 'xn--9aaa.org' - assert url.host_type == 'int_domain' - assert str(url) == 'https://xn--9aaa.org' + assert str(url) == 'https://xn--9aaa.org/' def test_co_uk(): url = validate_url('http://example.co.uk') - assert str(url) == 'http://example.co.uk' + assert str(url) == 'http://example.co.uk/' assert url.scheme == 'http' assert url.host == 'example.co.uk' - assert url.tld == 'uk' # wrong but no better solution - assert url.host_type == 'domain' def test_user_no_password(): url = validate_url('http://user:@example.org') - assert url.user == 'user' - assert url.password == '' + assert url.username == 'user' + assert url.password is None assert url.host == 'example.org' def test_user_info_no_user(): url = validate_url('http://:password@example.org') - assert url.user == '' + assert url.username is None assert url.password == 'password' assert url.host == 'example.org' @@ -257,7 +221,7 @@ def test_at_in_path(): url = validate_url('https://twitter.com/@handle') assert url.scheme == 'https' assert url.host == 'twitter.com' - assert url.user is None + assert url.username is None assert url.password is None assert url.path == '/@handle' @@ -272,52 +236,46 @@ def test_fragment_without_query(): @pytest.mark.parametrize( - 'value', + 'value,expected', [ - 'http://example.org', - 'http://example.org/foobar', - 'http://example.org.', - 'http://example.org./foobar', - 'HTTP://EXAMPLE.ORG', - 'https://example.org', - 'https://example.org?a=1&b=2', - 'https://example.org#a=3;b=3', - 'https://foo_bar.example.com/', - 'https://exam_ple.com/', # should perhaps fail? I think it's contrary to the RFC but chrome allows it - 'https://example.xn--p1ai', - 'https://example.xn--vermgensberatung-pwb', - 'https://example.xn--zfr164b', + ('http://example.org', 'http://example.org/'), + ('http://example.org/foobar', 'http://example.org/foobar'), + ('http://example.org.', 'http://example.org./'), + ('http://example.org./foobar', 'http://example.org./foobar'), + ('HTTP://EXAMPLE.ORG', 'http://example.org/'), + ('https://example.org', 'https://example.org/'), + ('https://example.org?a=1&b=2', 'https://example.org/?a=1&b=2'), + ('https://example.org#a=3;b=3', 'https://example.org/#a=3;b=3'), + ('https://foo_bar.example.com/', 'https://foo_bar.example.com/'), + ('https://exam_ple.com/', 'https://exam_ple.com/'), + ('https://example.xn--p1ai', 'https://example.xn--p1ai/'), + ('https://example.xn--vermgensberatung-pwb', 'https://example.xn--vermgensberatung-pwb/'), + ('https://example.xn--zfr164b', 'https://example.xn--zfr164b/'), ], ) -def test_http_url_success(value): +def test_http_url_success(value, expected): class Model(BaseModel): v: HttpUrl - assert Model(v=value).v == value + assert str(Model(v=value).v) == expected @pytest.mark.parametrize( - 'value,err_kind,err_msg,err_ctx', + 'value,err_type,err_msg', [ ( 'ftp://example.com/', - 'url.scheme', - 'URL scheme not permitted', - {'allowed_schemes': 'http, https'}, + 'url_scheme', + "URL scheme should be 'http' or 'https'", ), - ('http://foobar/', 'url.host', 'URL host invalid, top level domain required', None), - ('http://localhost/', 'url.host', 'URL host invalid, top level domain required', None), - ('https://example.123', 'url.host', 'URL host invalid, top level domain required', None), - ('https://example.ab123', 'url.host', 'URL host invalid, top level domain required', None), ( 'x' * 2084, - 'string_too_long', - 'String should have at most 2083 characters', - {'max_length': 2083}, + 'url_too_long', + 'URL should have at most 2083 characters', ), ], ) -def test_http_url_invalid(value, err_kind, err_msg, err_ctx): +def test_http_url_invalid(value, err_type, err_msg): class Model(BaseModel): v: HttpUrl @@ -325,100 +283,63 @@ def test_http_url_invalid(value, err_kind, err_msg, err_ctx): Model(v=value) assert len(exc_info.value.errors()) == 1, exc_info.value.errors() error = exc_info.value.errors()[0] - assert error['type'] == err_kind, value - assert error['msg'] == err_msg, value - assert error.get('ctx') == err_ctx, value + assert {'type': error['type'], 'msg': error['msg']} == {'type': err_type, 'msg': err_msg} @pytest.mark.parametrize( 'input,output', [ - (' https://www.example.com \n', 'https://www.example.com'), - (b'https://www.example.com', 'https://www.example.com'), + (' https://www.example.com \n', 'https://www.example.com/'), + (b'https://www.example.com', 'https://www.example.com/'), # https://www.xudongz.com/blog/2017/idn-phishing/ accepted but converted ('https://www.аррӏе.com/', 'https://www.xn--80ak6aa92e.com/'), - ('https://exampl£e.org', 'https://xn--example-gia.org'), - ('https://example.珠宝', 'https://example.xn--pbt977c'), - ('https://example.vermögensberatung', 'https://example.xn--vermgensberatung-pwb'), - ('https://example.рф', 'https://example.xn--p1ai'), - ('https://exampl£e.珠宝', 'https://xn--example-gia.xn--pbt977c'), + ('https://exampl£e.org', 'https://xn--example-gia.org/'), + ('https://example.珠宝', 'https://example.xn--pbt977c/'), + ('https://example.vermögensberatung', 'https://example.xn--vermgensberatung-pwb/'), + ('https://example.рф', 'https://example.xn--p1ai/'), + ('https://exampl£e.珠宝', 'https://xn--example-gia.xn--pbt977c/'), ], ) -def test_coerse_url(input, output): +def test_coerce_url(input, output): class Model(BaseModel): v: HttpUrl - assert Model(v=input).v == output + assert str(Model(v=input).v) == output @pytest.mark.parametrize( - 'input,output', + 'value,expected', [ - (' https://www.example.com \n', 'com'), - (b'https://www.example.com', 'com'), - ('https://www.example.com?param=value', 'com'), - ('https://example.珠宝', 'xn--pbt977c'), - ('https://exampl£e.珠宝', 'xn--pbt977c'), - ('https://example.vermögensberatung', 'xn--vermgensberatung-pwb'), - ('https://example.рф', 'xn--p1ai'), - ('https://example.рф?param=value', 'xn--p1ai'), + ('file:///foo/bar', 'file:///foo/bar'), + ('file://localhost/foo/bar', 'file:///foo/bar'), + ('file:////localhost/foo/bar', 'file:///localhost/foo/bar'), ], ) -def test_parses_tld(input, output): - class Model(BaseModel): - v: HttpUrl - - assert Model(v=input).v.tld == output - - -@pytest.mark.parametrize( - 'value', - ['file:///foo/bar', 'file://localhost/foo/bar', 'file:////localhost/foo/bar'], -) -def test_file_url_success(value): +def test_file_url_success(value, expected): class Model(BaseModel): v: FileUrl - assert Model(v=value).v == value - - -def test_get_default_parts(): - class MyConnectionString(AnyUrl): - @staticmethod - def get_default_parts(parts): - # get default parts allows to generate custom conn strings to services - return { - 'user': 'admin', - 'password': '123', - } - - class C(BaseModel): - connection: MyConnectionString - - c = C(connection='protocol://service:8080') - assert c.connection == 'protocol://admin:123@service:8080' - assert c.connection.user == 'admin' - assert c.connection.password == '123' + assert str(Model(v=value).v) == expected @pytest.mark.parametrize( - 'url,port', + 'url,expected_port, expected_str', [ - ('https://www.example.com', '443'), - ('https://www.example.com:443', '443'), - ('https://www.example.com:8089', '8089'), - ('http://www.example.com', '80'), - ('http://www.example.com:80', '80'), - ('http://www.example.com:8080', '8080'), + ('https://www.example.com/', 443, 'https://www.example.com/'), + ('https://www.example.com:443/', 443, 'https://www.example.com/'), + ('https://www.example.com:8089/', 8089, 'https://www.example.com:8089/'), + ('http://www.example.com/', 80, 'http://www.example.com/'), + ('http://www.example.com:80/', 80, 'http://www.example.com/'), + ('http://www.example.com:8080/', 8080, 'http://www.example.com:8080/'), ], ) -def test_http_urls_default_port(url, port): +def test_http_urls_default_port(url, expected_port, expected_str): class Model(BaseModel): v: HttpUrl m = Model(v=url) - assert m.v.port == port - assert m.v == url + assert m.v.port == expected_port + assert str(m.v) == expected_str @pytest.mark.parametrize( @@ -434,7 +355,7 @@ def test_postgres_dsns(dsn): class Model(BaseModel): a: PostgresDsn - assert Model(a=dsn).a == dsn + assert str(Model(a=dsn).a) == dsn @pytest.mark.parametrize( @@ -443,56 +364,50 @@ def test_postgres_dsns(dsn): ( 'postgres://user:pass@host1.db.net:4321,/foo/bar:5432/app', { - 'type': 'url.host', + 'type': 'url_parsing', 'loc': ('a',), - 'msg': 'URL host invalid', + 'msg': 'Input should be a valid URL, empty host', 'input': 'postgres://user:pass@host1.db.net:4321,/foo/bar:5432/app', }, ), ( 'postgres://user:pass@host1.db.net,/app', { - 'type': 'url.host', + 'type': 'url_parsing', 'loc': ('a',), - 'msg': 'URL host invalid', + 'msg': 'Input should be a valid URL, empty host', 'input': 'postgres://user:pass@host1.db.net,/app', }, ), ( 'postgres://user:pass@/foo/bar:5432,host1.db.net:4321/app', { - 'type': 'url.host', + 'type': 'url_parsing', 'loc': ('a',), - 'msg': 'URL host invalid', + 'msg': 'Input should be a valid URL, empty host', 'input': 'postgres://user:pass@/foo/bar:5432,host1.db.net:4321/app', }, ), - ( - 'postgres://localhost:5432/app', - { - 'type': 'url.userinfo', - 'loc': ('a',), - 'msg': 'userinfo required in URL but missing', - 'input': 'postgres://localhost:5432/app', - }, - ), ( 'postgres://user@/foo/bar:5432/app', { - 'type': 'url.host', + 'type': 'url_parsing', 'loc': ('a',), - 'msg': 'URL host invalid', + 'msg': 'Input should be a valid URL, empty host', 'input': 'postgres://user@/foo/bar:5432/app', }, ), ( 'http://example.org', { - 'type': 'url.scheme', + 'type': 'url_scheme', 'loc': ('a',), - 'msg': 'URL scheme not permitted', + 'msg': ( + "URL scheme should be 'postgres', 'postgresql', 'postgresql+asyncpg', 'postgresql+pg8000', " + "'postgresql+psycopg', 'postgresql+psycopg2', 'postgresql+psycopg2cffi', " + "'postgresql+py-postgresql' or 'postgresql+pygresql'" + ), 'input': 'http://example.org', - 'ctx': {'allowed_schemes': ', '.join(sorted(PostgresDsn.allowed_schemes))}, }, ), ), @@ -504,6 +419,7 @@ def test_postgres_dsns_validation_error(dsn, error_message): with pytest.raises(ValidationError) as exc_info: Model(a=dsn) error = exc_info.value.errors()[0] + error.pop('ctx', None) assert error == error_message @@ -512,56 +428,40 @@ def test_multihost_postgres_dsns(): a: PostgresDsn any_multihost_url = Model(a='postgres://user:pass@host1.db.net:4321,host2.db.net:6432/app').a - assert any_multihost_url == 'postgres://user:pass@host1.db.net:4321,host2.db.net:6432/app' + assert str(any_multihost_url) == 'postgres://user:pass@host1.db.net:4321,host2.db.net:6432/app' assert any_multihost_url.scheme == 'postgres' - assert any_multihost_url.host is None - assert any_multihost_url.host_type is None - assert any_multihost_url.tld is None - assert any_multihost_url.port is None assert any_multihost_url.path == '/app' - assert any_multihost_url.hosts == [ - {'host': 'host1.db.net', 'port': '4321', 'tld': 'net', 'host_type': 'domain', 'rebuild': False}, - {'host': 'host2.db.net', 'port': '6432', 'tld': 'net', 'host_type': 'domain', 'rebuild': False}, + # insert_assert(any_multihost_url.hosts()) + assert any_multihost_url.hosts() == [ + {'username': 'user', 'password': 'pass', 'host': 'host1.db.net', 'port': 4321}, + {'username': None, 'password': None, 'host': 'host2.db.net', 'port': 6432}, ] any_multihost_url = Model(a='postgres://user:pass@host.db.net:4321/app').a assert any_multihost_url.scheme == 'postgres' - assert any_multihost_url == 'postgres://user:pass@host.db.net:4321/app' - assert any_multihost_url.host == 'host.db.net' - assert any_multihost_url.host_type == 'domain' - assert any_multihost_url.tld == 'net' - assert any_multihost_url.port == '4321' + assert str(any_multihost_url) == 'postgres://user:pass@host.db.net:4321/app' assert any_multihost_url.path == '/app' - assert any_multihost_url.hosts is None + # insert_assert(any_multihost_url.hosts()) + assert any_multihost_url.hosts() == [{'username': 'user', 'password': 'pass', 'host': 'host.db.net', 'port': 4321}] def test_cockroach_dsns(): class Model(BaseModel): a: CockroachDsn - assert Model(a='cockroachdb://user:pass@localhost:5432/app').a == 'cockroachdb://user:pass@localhost:5432/app' + assert str(Model(a='cockroachdb://user:pass@localhost:5432/app').a) == 'cockroachdb://user:pass@localhost:5432/app' assert ( - Model(a='cockroachdb+psycopg2://user:pass@localhost:5432/app').a + str(Model(a='cockroachdb+psycopg2://user:pass@localhost:5432/app').a) == 'cockroachdb+psycopg2://user:pass@localhost:5432/app' ) assert ( - Model(a='cockroachdb+asyncpg://user:pass@localhost:5432/app').a + str(Model(a='cockroachdb+asyncpg://user:pass@localhost:5432/app').a) == 'cockroachdb+asyncpg://user:pass@localhost:5432/app' ) with pytest.raises(ValidationError) as exc_info: Model(a='http://example.org') - assert exc_info.value.errors()[0]['type'] == 'url.scheme' - - with pytest.raises(ValidationError) as exc_info: - Model(a='cockroachdb://localhost:5432/app') - # error = exc_info.value.errors()[0] - # insert_assert(error) - # - # with pytest.raises(ValidationError) as exc_info: - # Model(a='cockroachdb://user@/foo/bar:5432/app') - # error = exc_info.value.errors()[0] - # insert_assert(error) + assert exc_info.value.errors()[0]['type'] == 'url_scheme' def test_amqp_dsns(): @@ -569,21 +469,21 @@ def test_amqp_dsns(): a: AmqpDsn m = Model(a='amqp://user:pass@localhost:1234/app') - assert m.a == 'amqp://user:pass@localhost:1234/app' - assert m.a.user == 'user' + assert str(m.a) == 'amqp://user:pass@localhost:1234/app' + assert m.a.username == 'user' assert m.a.password == 'pass' m = Model(a='amqps://user:pass@localhost:5432//') - assert m.a == 'amqps://user:pass@localhost:5432//' + assert str(m.a) == 'amqps://user:pass@localhost:5432//' with pytest.raises(ValidationError) as exc_info: Model(a='http://example.org') - assert exc_info.value.errors()[0]['type'] == 'url.scheme' + assert exc_info.value.errors()[0]['type'] == 'url_scheme' # Password is not required for AMQP protocol m = Model(a='amqp://localhost:1234/app') - assert m.a == 'amqp://localhost:1234/app' - assert m.a.user is None + assert str(m.a) == 'amqp://localhost:1234/app' + assert m.a.username is None assert m.a.password is None # Only schema is required for AMQP protocol. @@ -600,24 +500,24 @@ def test_redis_dsns(): a: RedisDsn m = Model(a='redis://user:pass@localhost:1234/app') - assert m.a == 'redis://user:pass@localhost:1234/app' - assert m.a.user == 'user' + assert str(m.a) == 'redis://user:pass@localhost:1234/app' + assert m.a.username == 'user' assert m.a.password == 'pass' m = Model(a='rediss://user:pass@localhost:1234/app') - assert m.a == 'rediss://user:pass@localhost:1234/app' + assert str(m.a) == 'rediss://user:pass@localhost:1234/app' m = Model(a='rediss://:pass@localhost:1234') - assert m.a == 'rediss://:pass@localhost:1234/0' + assert str(m.a) == 'rediss://:pass@localhost:1234/0' with pytest.raises(ValidationError) as exc_info: Model(a='http://example.org') - assert exc_info.value.errors()[0]['type'] == 'url.scheme' + assert exc_info.value.errors()[0]['type'] == 'url_scheme' # Password is not required for Redis protocol m = Model(a='redis://localhost:1234/app') - assert m.a == 'redis://localhost:1234/app' - assert m.a.user is None + assert str(m.a) == 'redis://localhost:1234/app' + assert m.a.username is None assert m.a.password is None # Only schema is required for Redis protocol. Otherwise it will be set to default @@ -625,7 +525,7 @@ def test_redis_dsns(): m = Model(a='rediss://') assert m.a.scheme == 'rediss' assert m.a.host == 'localhost' - assert m.a.port == '6379' + assert m.a.port == 6379 assert m.a.path == '/0' @@ -635,25 +535,25 @@ def test_mongodb_dsns(): # TODO: Need to unit tests about "Replica Set", "Sharded cluster" and other deployment modes of MongoDB m = Model(a='mongodb://user:pass@localhost:1234/app') - assert m.a == 'mongodb://user:pass@localhost:1234/app' - assert m.a.user == 'user' - assert m.a.password == 'pass' + assert str(m.a) == 'mongodb://user:pass@localhost:1234/app' + # insert_assert(m.a.hosts()) + assert m.a.hosts() == [{'username': 'user', 'password': 'pass', 'host': 'localhost', 'port': 1234}] with pytest.raises(ValidationError) as exc_info: Model(a='http://example.org') - assert exc_info.value.errors()[0]['type'] == 'url.scheme' + assert exc_info.value.errors()[0]['type'] == 'url_scheme' # Password is not required for MongoDB protocol m = Model(a='mongodb://localhost:1234/app') - assert m.a == 'mongodb://localhost:1234/app' - assert m.a.user is None - assert m.a.password is None + assert str(m.a) == 'mongodb://localhost:1234/app' + # insert_assert(m.a.hosts()) + assert m.a.hosts() == [{'username': None, 'password': None, 'host': 'localhost', 'port': 1234}] # Only schema and host is required for MongoDB protocol m = Model(a='mongodb://localhost') assert m.a.scheme == 'mongodb' - assert m.a.host == 'localhost' - assert m.a.port == '27017' + # insert_assert(m.a.hosts()) + assert m.a.hosts() == [{'username': None, 'password': None, 'host': 'localhost', 'port': 27017}] def test_kafka_dsns(): @@ -663,90 +563,43 @@ def test_kafka_dsns(): m = Model(a='kafka://') assert m.a.scheme == 'kafka' assert m.a.host == 'localhost' - assert m.a.port == '9092' - assert m.a == 'kafka://localhost:9092' + assert m.a.port == 9092 + assert str(m.a) == 'kafka://localhost:9092' m = Model(a='kafka://kafka1') - assert m.a == 'kafka://kafka1:9092' + assert str(m.a) == 'kafka://kafka1:9092' with pytest.raises(ValidationError) as exc_info: Model(a='http://example.org') - assert exc_info.value.errors()[0]['type'] == 'url.scheme' + assert exc_info.value.errors()[0]['type'] == 'url_scheme' m = Model(a='kafka://kafka3:9093') - assert m.a.user is None + assert m.a.username is None assert m.a.password is None def test_custom_schemes(): class Model(BaseModel): - v: stricturl(strip_whitespace=False, allowed_schemes={'ws', 'wss'}) # noqa: F821 + v: Annotated[Url, UrlConstraints(allowed_schemes=['ws', 'wss']), Strict()] class Model2(BaseModel): - v: stricturl(host_required=False, allowed_schemes={'foo'}) # noqa: F821 + v: Annotated[Url, UrlConstraints(host_required=False, allowed_schemes=['foo'])] - assert Model(v='ws://example.org').v == 'ws://example.org' - assert Model2(v='foo:///foo/bar').v == 'foo:///foo/bar' + assert str(Model(v='ws://example.org').v) == 'ws://example.org/' + assert str(Model2(v='foo:///foo/bar').v) == 'foo:///foo/bar' - with pytest.raises(ValidationError): + with pytest.raises(ValidationError, match=r"URL scheme should be 'ws' or 'wss' \[type=url_scheme,"): Model(v='http://example.org') - with pytest.raises(ValidationError): + with pytest.raises(ValidationError, match='leading or trailing control or space character are ignored in URLs'): Model(v='ws://example.org ') - with pytest.raises(ValidationError): + with pytest.raises(ValidationError, match=r'syntax rules, expected // \[type=url_syntax_violation,'): Model(v='ws:///foo/bar') -@pytest.mark.parametrize( - 'kwargs,expected', - [ - (dict(scheme='ws', user='foo', host='example.net'), 'ws://foo@example.net'), - (dict(scheme='ws', user='foo', password='x', host='example.net'), 'ws://foo:x@example.net'), - (dict(scheme='ws', host='example.net', query='a=b', fragment='c=d'), 'ws://example.net?a=b#c=d'), - (dict(scheme='http', host='example.net', port='1234'), 'http://example.net:1234'), - ], -) -def test_build_url(kwargs, expected): - assert AnyUrl(None, **kwargs) == expected - - -@pytest.mark.parametrize( - 'kwargs,expected', - [ - (dict(scheme='http', host='example.net'), 'http://example.net'), - (dict(scheme='https', host='example.net'), 'https://example.net'), - (dict(scheme='http', user='foo', host='example.net'), 'http://foo@example.net'), - (dict(scheme='https', user='foo', host='example.net'), 'https://foo@example.net'), - (dict(scheme='http', user='foo', host='example.net', port='123'), 'http://foo@example.net:123'), - (dict(scheme='https', user='foo', host='example.net', port='123'), 'https://foo@example.net:123'), - (dict(scheme='http', user='foo', password='x', host='example.net'), 'http://foo:x@example.net'), - (dict(scheme='http2', user='foo', password='x', host='example.net'), 'http2://foo:x@example.net'), - (dict(scheme='http', host='example.net', query='a=b', fragment='c=d'), 'http://example.net?a=b#c=d'), - (dict(scheme='http2', host='example.net', query='a=b', fragment='c=d'), 'http2://example.net?a=b#c=d'), - (dict(scheme='http', host='example.net', port='1234'), 'http://example.net:1234'), - (dict(scheme='https', host='example.net', port='1234'), 'https://example.net:1234'), - ], -) -@pytest.mark.parametrize('klass', [AnyHttpUrl, HttpUrl]) -def test_build_any_http_url(klass, kwargs, expected): - assert klass(None, **kwargs) == expected - - -@pytest.mark.parametrize( - 'klass, kwargs,expected', - [ - (AnyHttpUrl, dict(scheme='http', user='foo', host='example.net', port='80'), 'http://foo@example.net:80'), - (AnyHttpUrl, dict(scheme='https', user='foo', host='example.net', port='443'), 'https://foo@example.net:443'), - (HttpUrl, dict(scheme='http', user='foo', host='example.net', port='80'), 'http://foo@example.net'), - (HttpUrl, dict(scheme='https', user='foo', host='example.net', port='443'), 'https://foo@example.net'), - ], -) -def test_build_http_url_port(klass, kwargs, expected): - assert klass(None, **kwargs) == expected - - -def test_son(): +@pytest.mark.xfail(reason='URL to JSON not yet supported') +def test_json(): class Model(BaseModel): v: HttpUrl diff --git a/tests/test_schema.py b/tests/test_schema.py index d6801ed..9b74f95 100644 --- a/tests/test_schema.py +++ b/tests/test_schema.py @@ -36,7 +36,7 @@ from pydantic.color import Color from pydantic.dataclasses import dataclass from pydantic.fields import FieldInfo from pydantic.generics import GenericModel -from pydantic.networks import AnyUrl, EmailStr, IPvAnyAddress, IPvAnyInterface, IPvAnyNetwork, NameEmail, stricturl +from pydantic.networks import AnyUrl, EmailStr, IPvAnyAddress, IPvAnyInterface, IPvAnyNetwork, NameEmail from pydantic.schema import ( get_flat_models_from_model, get_flat_models_from_models, @@ -808,10 +808,10 @@ def test_str_constrained_types(field_type, expected_schema): 'field_type,expected_schema', [ (AnyUrl, {'title': 'A', 'type': 'string', 'format': 'uri', 'minLength': 1, 'maxLength': 2**16}), - ( - stricturl(min_length=5, max_length=10), - {'title': 'A', 'type': 'string', 'format': 'uri', 'minLength': 5, 'maxLength': 10}, - ), + # ( + # stricturl(min_length=5, max_length=10), + # {'title': 'A', 'type': 'string', 'format': 'uri', 'minLength': 5, 'maxLength': 10}, + # ), ], ) def test_special_str_types(field_type, expected_schema): diff --git a/tests/test_types.py b/tests/test_types.py index 825deee..1d7603a 100644 --- a/tests/test_types.py +++ b/tests/test_types.py @@ -2822,7 +2822,7 @@ def test_json_not_str(): { 'type': 'json_type', 'loc': ('json_obj',), - 'msg': 'JSON input should be str, bytes or bytearray', + 'msg': 'JSON input should be string, bytes or bytearray', 'input': 12, } ] @@ -2880,7 +2880,7 @@ def test_json_required(): json_obj: Json assert JsonRequired(json_obj='["x", "y", "z"]').dict() == {'json_obj': ['x', 'y', 'z']} - with pytest.raises(ValidationError, match=r'JSON input should be str, bytes or bytearray \[type=json_type,'): + with pytest.raises(ValidationError, match=r'JSON input should be string, bytes or bytearray \[type=json_type,'): JsonRequired(json_obj=None) with pytest.raises(ValidationError, match=r'Field required \[type=missing,'): JsonRequired()