url parsing in pydantic-core (#4732)

* url parsing in pydantic-core

* fix tests

* fix mypy tests
This commit is contained in:
Samuel Colvin
2022-11-08 09:42:38 +00:00
committed by GitHub
parent 9e1f2a6f7c
commit 8d98c499df
11 changed files with 250 additions and 841 deletions
+1 -1
View File
@@ -41,7 +41,7 @@ __all__ = [
'AnyHttpUrl',
'FileUrl',
'HttpUrl',
'stricturl',
'UrlConstraints',
'EmailStr',
'NameEmail',
'IPvAnyAddress',
+11 -1
View File
@@ -15,7 +15,7 @@ from pathlib import PurePath
from typing import Any, Callable
from uuid import UUID
from pydantic_core import PydanticCustomError, core_schema
from pydantic_core import MultiHostUrl, PydanticCustomError, Url, core_schema
from typing_extensions import get_args
from . import _validators
@@ -204,3 +204,13 @@ def ip_v6_interface_schema(_schema_generator: GenerateSchema, _obj: Any) -> core
@schema_function(IPv6Network)
def ip_v6_network_schema(_schema_generator: GenerateSchema, _obj: Any) -> core_schema.FunctionPlainSchema:
return core_schema.function_plain_schema(_validators.ip_v6_network_validator)
@schema_function(Url)
def url_schema(_schema_generator: GenerateSchema, _obj: Any) -> core_schema.UrlSchema:
return {'type': 'url'}
@schema_function(MultiHostUrl)
def multi_host_url_schema(_schema_generator: GenerateSchema, _obj: Any) -> core_schema.MultiHostUrlSchema:
return {'type': 'multi-host-url'}
+58 -519
View File
@@ -1,54 +1,30 @@
from __future__ import annotations as _annotations
import dataclasses as _dataclasses
import re
from ipaddress import IPv4Address, IPv4Interface, IPv4Network, IPv6Address, IPv6Interface, IPv6Network
from typing import TYPE_CHECKING, Any, Callable, Collection, Generator, Match, Pattern, cast, no_type_check
from typing import TYPE_CHECKING, Any
import typing_extensions
from pydantic_core import PydanticCustomError, core_schema
from pydantic_core import MultiHostUrl, PydanticCustomError, Url, core_schema
from typing_extensions import Annotated
from ._internal import _repr, _utils
from ._internal import _fields, _repr
if TYPE_CHECKING:
import email_validator
from typing_extensions import TypedDict
CallableGenerator = Generator[Callable[..., Any], None, None]
class Parts(TypedDict, total=False):
scheme: str
user: str | None
password: str | None
ipv4: str | None
ipv6: str | None
domain: str | None
port: str | None
path: str | None
query: str | None
fragment: str | None
class HostParts(TypedDict, total=False):
host: str
tld: str | None
host_type: str | None
port: str | None
rebuild: bool
NetworkType = str | bytes | int | tuple[str | bytes | int, str | int] # type: ignore[misc]
else:
email_validator = None
class Parts(dict):
pass
__all__ = [
'AnyUrl',
'AnyHttpUrl',
'FileUrl',
'HttpUrl',
'stricturl',
'UrlConstraints',
'EmailStr',
'NameEmail',
'IPvAnyAddress',
@@ -63,495 +39,58 @@ __all__ = [
'validate_email',
]
_url_regex_cache = None
_multi_host_url_regex_cache = None
_ascii_domain_regex_cache = None
_int_domain_regex_cache = None
_host_regex_cache = None
_host_regex = (
r'(?:'
r'(?P<ipv4>(?:\d{1,3}\.){3}\d{1,3})(?=$|[/:#?])|' # ipv4
r'(?P<ipv6>\[[A-F0-9]*:[A-F0-9:]+\])(?=$|[/:#?])|' # ipv6
r'(?P<domain>[^\s/:?#]+)' # domain, validation occurs later
r')?'
r'(?::(?P<port>\d+))?' # port
)
_scheme_regex = r'(?:(?P<scheme>[a-z][a-z0-9+\-.]+)://)?' # scheme https://tools.ietf.org/html/rfc3986#appendix-A
_user_info_regex = r'(?:(?P<user>[^\s:/]*)(?::(?P<password>[^\s/]*))?@)?'
_path_regex = r'(?P<path>/[^\s?#]*)?'
_query_regex = r'(?:\?(?P<query>[^\s#]*))?'
_fragment_regex = r'(?:#(?P<fragment>[^\s#]*))?'
def url_regex() -> Pattern[str]:
global _url_regex_cache
if _url_regex_cache is None:
_url_regex_cache = re.compile(
rf'{_scheme_regex}{_user_info_regex}{_host_regex}{_path_regex}{_query_regex}{_fragment_regex}',
re.IGNORECASE,
)
return _url_regex_cache
def multi_host_url_regex() -> Pattern[str]:
"""
Compiled multi host url regex.
Additionally to `url_regex` it allows to match multiple hosts.
E.g. host1.db.net,host2.db.net
"""
global _multi_host_url_regex_cache
if _multi_host_url_regex_cache is None:
_multi_host_url_regex_cache = re.compile(
rf'{_scheme_regex}{_user_info_regex}'
r'(?P<hosts>([^/]*))' # validation occurs later
rf'{_path_regex}{_query_regex}{_fragment_regex}',
re.IGNORECASE,
)
return _multi_host_url_regex_cache
def ascii_domain_regex() -> Pattern[str]:
global _ascii_domain_regex_cache
if _ascii_domain_regex_cache is None:
ascii_chunk = r'[_0-9a-z](?:[-_0-9a-z]{0,61}[_0-9a-z])?'
ascii_domain_ending = r'(?P<tld>\.[a-z]{2,63})?\.?'
_ascii_domain_regex_cache = re.compile(
fr'(?:{ascii_chunk}\.)*?{ascii_chunk}{ascii_domain_ending}', re.IGNORECASE
)
return _ascii_domain_regex_cache
def int_domain_regex() -> Pattern[str]:
global _int_domain_regex_cache
if _int_domain_regex_cache is None:
int_chunk = r'[_0-9a-\U00040000](?:[-_0-9a-\U00040000]{0,61}[_0-9a-\U00040000])?'
int_domain_ending = r'(?P<tld>(\.[^\W\d_]{2,63})|(\.(?:xn--)[_0-9a-z-]{2,63}))?\.?'
_int_domain_regex_cache = re.compile(fr'(?:{int_chunk}\.)*?{int_chunk}{int_domain_ending}', re.IGNORECASE)
return _int_domain_regex_cache
def host_regex() -> Pattern[str]:
global _host_regex_cache
if _host_regex_cache is None:
_host_regex_cache = re.compile(
_host_regex,
re.IGNORECASE,
)
return _host_regex_cache
class AnyUrl(str):
strip_whitespace = True
min_length = 1
max_length = 2**16
allowed_schemes: Collection[str] | None = None
tld_required: bool = False
user_required: bool = False
host_required: bool = True
hidden_parts: set[str] = set()
__slots__ = ('scheme', 'user', 'password', 'host', 'tld', 'host_type', 'port', 'path', 'query', 'fragment')
@no_type_check
def __new__(cls, url: str | None, **kwargs) -> object:
return str.__new__(cls, cls.build(**kwargs) if url is None else url)
def __init__(
self,
url: str,
*,
scheme: str,
user: str | None = None,
password: str | None = None,
host: str | None = None,
tld: str | None = None,
host_type: str = 'domain',
port: str | None = None,
path: str | None = None,
query: str | None = None,
fragment: str | None = None,
) -> None:
str.__init__(url)
self.scheme = scheme
self.user = user
self.password = password
self.host = host
self.tld = tld
self.host_type = host_type
self.port = port
self.path = path
self.query = query
self.fragment = fragment
@classmethod
def build(
cls,
*,
scheme: str,
user: str | None = None,
password: str | None = None,
host: str,
port: str | None = None,
path: str | None = None,
query: str | None = None,
fragment: str | None = None,
**_kwargs: str,
) -> str:
parts = Parts( # type: ignore[misc]
scheme=scheme,
user=user,
password=password,
host=host,
port=port,
path=path,
query=query,
fragment=fragment,
**_kwargs,
)
url = scheme + '://'
if user:
url += user
if password:
url += ':' + password
if user or password:
url += '@'
url += host
if port and ('port' not in cls.hidden_parts or cls.get_default_parts(parts).get('port') != port):
url += ':' + port
if path:
url += path
if query:
url += '?' + query
if fragment:
url += '#' + fragment
return url
@classmethod
def __modify_schema__(cls, field_schema: dict[str, Any]) -> None:
_utils.update_not_none(field_schema, minLength=cls.min_length, maxLength=cls.max_length, format='uri')
@classmethod
def __get_pydantic_validation_schema__(cls, **_kwargs: Any) -> core_schema.FunctionSchema:
return core_schema.FunctionSchema(
type='function',
mode='after',
function=cls.validate,
schema=core_schema.StringSchema(
type='str',
min_length=cls.min_length,
max_length=cls.max_length,
strip_whitespace=cls.strip_whitespace,
),
)
@classmethod
def validate(cls, __input_value: str, **_kwargs: Any) -> str | AnyUrl:
if __input_value.__class__ == cls:
return __input_value
url = __input_value
m = cls._match_url(url)
# the regex should always match, if it doesn't please report with details of the URL tried
assert m, 'URL regex failed unexpectedly'
original_parts = cast('Parts', m.groupdict())
parts = cls.apply_default_parts(original_parts)
parts = cls.validate_parts(parts)
if m.end() != len(url):
raise PydanticCustomError(
'url.extra',
'URL invalid, extra characters found after valid URL: {extra}',
{'extra': repr(url[m.end() :])},
)
return cls._build_url(m, url, parts)
@classmethod
def _build_url(cls, m: Match[str], url: str, parts: 'Parts') -> 'AnyUrl':
"""
Validate hosts and build the AnyUrl object. Split from `validate` so this method
can be altered in `MultiHostDsn`.
"""
host, tld, host_type, rebuild = cls.validate_host(parts)
return cls(
None if rebuild else url,
scheme=parts['scheme'],
user=parts['user'],
password=parts['password'],
host=host,
tld=tld,
host_type=host_type,
port=parts['port'],
path=parts['path'],
query=parts['query'],
fragment=parts['fragment'],
)
@staticmethod
def _match_url(url: str) -> Match[str] | None:
return url_regex().match(url)
@staticmethod
def _validate_port(port: str | None) -> None:
if port is not None and int(port) > 65_535:
raise PydanticCustomError('url.port', 'URL port invalid, port cannot exceed 65535')
@classmethod
def validate_parts(cls, parts: 'Parts', validate_port: bool = True) -> 'Parts':
"""
A method used to validate parts of a URL.
Could be overridden to set default values for parts if missing
"""
scheme = parts['scheme']
if scheme is None:
raise PydanticCustomError('url.scheme', 'invalid or missing URL scheme')
if cls.allowed_schemes and scheme.lower() not in cls.allowed_schemes:
raise PydanticCustomError(
'url.scheme',
'URL scheme not permitted',
{'allowed_schemes': ', '.join(sorted(set(cls.allowed_schemes)))},
)
if validate_port:
cls._validate_port(parts['port'])
user = parts['user']
if cls.user_required and user is None:
raise PydanticCustomError('url.userinfo', 'userinfo required in URL but missing')
return parts
@classmethod
def validate_host(cls, parts: 'Parts') -> tuple[str, str | None, str, bool]:
tld, host_type, rebuild = None, None, False
for f in ('domain', 'ipv4', 'ipv6'):
host = parts[f] # type: ignore[literal-required]
if host:
host_type = f
break
if host is None:
if cls.host_required:
raise PydanticCustomError('url.host', 'URL host invalid')
elif host_type == 'domain':
is_international = False
d = ascii_domain_regex().fullmatch(host)
if d is None:
d = int_domain_regex().fullmatch(host)
if d is None:
raise PydanticCustomError('url.host', 'URL host invalid')
is_international = True
tld = d.group('tld')
if tld is None and not is_international:
d = int_domain_regex().fullmatch(host)
assert d is not None
tld = d.group('tld')
is_international = True
if tld is not None:
tld = tld[1:]
elif cls.tld_required:
raise PydanticCustomError('url.host', 'URL host invalid, top level domain required')
if is_international:
host_type = 'int_domain'
rebuild = True
host = host.encode('idna').decode('ascii')
if tld is not None:
tld = tld.encode('idna').decode('ascii')
return host, tld, host_type, rebuild # type: ignore
@staticmethod
def get_default_parts(parts: 'Parts') -> 'Parts':
return {}
@classmethod
def apply_default_parts(cls, parts: 'Parts') -> 'Parts':
for key, value in cls.get_default_parts(parts).items():
if not parts[key]: # type: ignore[literal-required]
parts[key] = value # type: ignore[literal-required]
return parts
def __repr__(self) -> str:
extra = ', '.join(f'{n}={getattr(self, n)!r}' for n in self.__slots__ if getattr(self, n) is not None)
return f'{self.__class__.__name__}({super().__repr__()}, {extra})'
class AnyHttpUrl(AnyUrl):
allowed_schemes = {'http', 'https'}
__slots__ = ()
class HttpUrl(AnyHttpUrl):
tld_required = True
# https://stackoverflow.com/questions/417142/what-is-the-maximum-length-of-a-url-in-different-browsers
max_length = 2083
hidden_parts = {'port'}
@staticmethod
def get_default_parts(parts: 'Parts') -> 'Parts':
return {'port': '80' if parts['scheme'] == 'http' else '443'}
class FileUrl(AnyUrl):
allowed_schemes = {'file'}
host_required = False
__slots__ = ()
class MultiHostDsn(AnyUrl):
__slots__ = AnyUrl.__slots__ + ('hosts',)
def __init__(self, *args: Any, hosts: list[HostParts] | None = None, **kwargs: Any):
super().__init__(*args, **kwargs)
self.hosts = hosts
@staticmethod
def _match_url(url: str) -> Match[str] | None:
return multi_host_url_regex().match(url)
@classmethod
def validate_parts(cls, parts: 'Parts', validate_port: bool = True) -> 'Parts':
return super().validate_parts(parts, validate_port=False)
@classmethod
def _build_url(cls, m: Match[str], url: str, parts: 'Parts') -> 'MultiHostDsn':
hosts_parts: list[HostParts] = []
host_re = host_regex()
for host in m.groupdict()['hosts'].split(','):
d: Parts = host_re.match(host).groupdict() # type: ignore
host, tld, host_type, rebuild = cls.validate_host(d)
port = d.get('port')
cls._validate_port(port)
hosts_parts.append(
{
'host': host,
'host_type': host_type,
'tld': tld,
'rebuild': rebuild,
'port': port,
}
)
if len(hosts_parts) > 1:
return cls(
None if any([hp['rebuild'] for hp in hosts_parts]) else url,
scheme=parts['scheme'],
user=parts['user'],
password=parts['password'],
path=parts['path'],
query=parts['query'],
fragment=parts['fragment'],
host_type=None,
hosts=hosts_parts,
)
else:
# backwards compatibility with single host
host_part = hosts_parts[0]
return cls(
None if host_part['rebuild'] else url,
scheme=parts['scheme'],
user=parts['user'],
password=parts['password'],
host=host_part['host'],
tld=host_part['tld'],
host_type=host_part['host_type'],
port=host_part.get('port'),
path=parts['path'],
query=parts['query'],
fragment=parts['fragment'],
)
class PostgresDsn(MultiHostDsn):
allowed_schemes = {
'postgres',
'postgresql',
'postgresql+asyncpg',
'postgresql+pg8000',
'postgresql+psycopg',
'postgresql+psycopg2',
'postgresql+psycopg2cffi',
'postgresql+py-postgresql',
'postgresql+pygresql',
}
user_required = True
__slots__ = ()
class CockroachDsn(AnyUrl):
allowed_schemes = {
'cockroachdb',
'cockroachdb+psycopg2',
'cockroachdb+asyncpg',
}
user_required = True
class AmqpDsn(AnyUrl):
allowed_schemes = {'amqp', 'amqps'}
host_required = False
class RedisDsn(AnyUrl):
__slots__ = ()
allowed_schemes = {'redis', 'rediss'}
host_required = False
@staticmethod
def get_default_parts(parts: 'Parts') -> 'Parts':
return {
'domain': 'localhost' if not (parts['ipv4'] or parts['ipv6']) else '',
'port': '6379',
'path': '/0',
}
class MongoDsn(AnyUrl):
allowed_schemes = {'mongodb'}
# TODO: Needed to generic "Parts" for "Replica Set", "Sharded Cluster", and other mongodb deployment modes
@staticmethod
def get_default_parts(parts: 'Parts') -> 'Parts':
return {'port': '27017'}
class KafkaDsn(AnyUrl):
allowed_schemes = {'kafka'}
@staticmethod
def get_default_parts(parts: 'Parts') -> 'Parts':
return {'domain': 'localhost', 'port': '9092'}
def stricturl(
*,
strip_whitespace: bool = True,
min_length: int = 1,
max_length: int = 2**16,
tld_required: bool = True,
host_required: bool = True,
allowed_schemes: Collection[str] | None = None,
) -> type[AnyUrl]:
# use kwargs then define conf in a dict to aid with IDE type hinting
namespace = dict(
strip_whitespace=strip_whitespace,
min_length=min_length,
max_length=max_length,
tld_required=tld_required,
host_required=host_required,
allowed_schemes=allowed_schemes,
)
return type('UrlValue', (AnyUrl,), namespace)
@_dataclasses.dataclass
class UrlConstraints(_fields.PydanticMetadata):
max_length: int | None = None
allowed_schemes: list[str] | None = None
host_required: bool | None = None
default_host: str | None = None
default_port: int | None = None
default_path: str | None = None
AnyUrl = Url
# host_required is false because all schemes are "special" so host is required by rust-url automatically
AnyHttpUrl = Annotated[Url, UrlConstraints(allowed_schemes=['http', 'https'])]
HttpUrl = Annotated[Url, UrlConstraints(max_length=2083, allowed_schemes=['http', 'https'])]
FileUrl = Annotated[Url, UrlConstraints(allowed_schemes=['file'])]
PostgresDsn = Annotated[
MultiHostUrl,
UrlConstraints(
host_required=True,
allowed_schemes=[
'postgres',
'postgresql',
'postgresql+asyncpg',
'postgresql+pg8000',
'postgresql+psycopg',
'postgresql+psycopg2',
'postgresql+psycopg2cffi',
'postgresql+py-postgresql',
'postgresql+pygresql',
],
),
]
CockroachDsn = Annotated[
Url,
UrlConstraints(
host_required=True,
allowed_schemes=[
'cockroachdb',
'cockroachdb+psycopg2',
'cockroachdb+asyncpg',
],
),
]
AmqpDsn = Annotated[Url, UrlConstraints(allowed_schemes=['amqp', 'amqps'])]
RedisDsn = Annotated[
Url,
UrlConstraints(allowed_schemes=['redis', 'rediss'], default_host='localhost', default_port=6379, default_path='/0'),
]
MongoDsn = Annotated[MultiHostUrl, UrlConstraints(allowed_schemes=['mongodb', 'mongodb+srv'], default_port=27017)]
KafkaDsn = Annotated[Url, UrlConstraints(allowed_schemes=['kafka'], default_host='localhost', default_port=9092)]
def import_email_validator() -> None:
@@ -563,7 +102,7 @@ def import_email_validator() -> None:
if TYPE_CHECKING:
EmailStr = typing_extensions.Annotated[str, ...]
EmailStr = Annotated[str, ...]
else:
class EmailStr:
+1 -1
View File
@@ -51,7 +51,7 @@ classifiers = [
requires-python = '>=3.7'
dependencies = [
'typing-extensions>=4.1.0',
'pydantic-core>=0.6.0',
'pydantic-core>=0.7.1',
'annotated-types>=0.4.0',
]
optional-dependencies = { email = ['email-validator>=1.0.3'] }
+2 -2
View File
@@ -55,7 +55,7 @@ pre-commit==2.20.0
# via -r requirements/linting.in
pycodestyle==2.9.1
# via flake8
pydantic-core==0.6.0
pydantic-core==0.7.1
# via -r requirements/linting.in
pyflakes==2.5.0
# via flake8
@@ -63,7 +63,7 @@ pyupgrade==2.37.3
# via -r requirements/linting.in
pyyaml==6.0
# via pre-commit
ruff==0.0.95
ruff==0.0.105
# via -r requirements/linting.in
sortedcontainers==2.4.0
# via hypothesis
+1 -1
View File
@@ -12,7 +12,7 @@ email-validator==1.2.1
# via pydantic (pyproject.toml)
idna==3.3
# via email-validator
pydantic-core==0.6.0
pydantic-core==0.7.1
# via pydantic (pyproject.toml)
typing-extensions==4.3.0
# via
+1 -1
View File
@@ -6,7 +6,7 @@
#
annotated-types==0.4.0
# via pydantic (pyproject.toml)
pydantic-core==0.6.0
pydantic-core==0.7.1
# via pydantic (pyproject.toml)
typing-extensions==4.3.0
# via
+12 -5
View File
@@ -9,7 +9,8 @@ from pathlib import Path, PurePath
from typing import Any, Dict, ForwardRef, Generic, List, Optional, Type, TypeVar
from uuid import UUID
from typing_extensions import TypedDict
from pydantic_core import Url
from typing_extensions import Annotated, TypedDict
from pydantic import (
UUID1,
@@ -35,8 +36,8 @@ from pydantic import (
StrictFloat,
StrictInt,
StrictStr,
UrlConstraints,
root_validator,
stricturl,
validate_arguments,
validator,
)
@@ -238,9 +239,15 @@ validated.my_dir_path_str.absolute()
validated.my_json['hello'].capitalize()
validated.my_json_list[0].capitalize()
stricturl(allowed_schemes={'http'})
stricturl(allowed_schemes=frozenset({'http'}))
stricturl(allowed_schemes=('s3', 's3n', 's3a'))
class UrlModel(BaseModel):
x: Annotated[Url, UrlConstraints(allowed_schemes=['http'])] = Field(None)
y: Annotated[Url, UrlConstraints(allowed_schemes=['http'])] = Field(None)
z: Annotated[Url, UrlConstraints(allowed_schemes=['s3', 's3n', 's3a'])] = Field(None)
url_model = UrlModel(x='http://example.com')
assert url_model.x.host == 'example.com'
class SomeDict(TypedDict):
+156 -303
View File
@@ -1,9 +1,9 @@
import pytest
from pydantic_core import PydanticCustomError
from pydantic_core import PydanticCustomError, Url
from typing_extensions import Annotated
from pydantic import (
AmqpDsn,
AnyHttpUrl,
AnyUrl,
BaseModel,
CockroachDsn,
@@ -14,8 +14,9 @@ from pydantic import (
NameEmail,
PostgresDsn,
RedisDsn,
Strict,
UrlConstraints,
ValidationError,
stricturl,
)
from pydantic.networks import validate_email
@@ -70,7 +71,7 @@ except ImportError:
'http://www.cwi.nl:80/%7Eguido/Python.html',
'https://www.python.org/путь',
'http://андрей@example.com',
AnyUrl('https://example.com', scheme='https', host='example.com'),
# AnyUrl('https://example.com', scheme='https', host='example.com'),
'https://exam_ple.com/',
'http://twitter.com/@handle/',
'http://11.11.11.11.example.com/action',
@@ -94,56 +95,35 @@ def test_any_url_success(value):
@pytest.mark.parametrize(
'value,err_kind,err_msg,err_ctx',
'value,err_type,err_msg',
[
('http:///example.com/', 'url.host', 'URL host invalid', None),
('https:///example.com/', 'url.host', 'URL host invalid', None),
('http://.example.com:8000/foo', 'url.host', 'URL host invalid', None),
('https://example.org\\', 'url.host', 'URL host invalid', None),
('https://exampl$e.org', 'url.host', 'URL host invalid', None),
('http://??', 'url.host', 'URL host invalid', None),
('http://.', 'url.host', 'URL host invalid', None),
('http://..', 'url.host', 'URL host invalid', None),
('http:///', 'url_parsing', 'Input should be a valid URL, empty host'),
('http://??', 'url_parsing', 'Input should be a valid URL, empty host'),
(
'https://example.org more',
'url.extra',
"URL invalid, extra characters found after valid URL: ' more'",
{'extra': "' more'"},
'url_parsing',
'Input should be a valid URL, invalid domain character',
),
('$https://example.org', 'url.scheme', 'invalid or missing URL scheme', None),
('../icons/logo.gif', 'url.scheme', 'invalid or missing URL scheme', None),
('abc', 'url.scheme', 'invalid or missing URL scheme', None),
('..', 'url.scheme', 'invalid or missing URL scheme', None),
('/', 'url.scheme', 'invalid or missing URL scheme', None),
('+http://example.com/', 'url.scheme', 'invalid or missing URL scheme', None),
('ht*tp://example.com/', 'url.scheme', 'invalid or missing URL scheme', None),
(' ', 'string_too_short', 'String should have at least 1 characters', {'min_length': 1}),
('', 'string_too_short', 'String should have at least 1 characters', {'min_length': 1}),
(None, 'string_type', 'Input should be a valid string', None),
('$https://example.org', 'url_parsing', 'Input should be a valid URL, relative URL without a base'),
('../icons/logo.gif', 'url_parsing', 'Input should be a valid URL, relative URL without a base'),
('abc', 'url_parsing', 'Input should be a valid URL, relative URL without a base'),
('..', 'url_parsing', 'Input should be a valid URL, relative URL without a base'),
('/', 'url_parsing', 'Input should be a valid URL, relative URL without a base'),
('+http://example.com/', 'url_parsing', 'Input should be a valid URL, relative URL without a base'),
('ht*tp://example.com/', 'url_parsing', 'Input should be a valid URL, relative URL without a base'),
(' ', 'url_parsing', 'Input should be a valid URL, relative URL without a base'),
('', 'url_parsing', 'Input should be a valid URL, relative URL without a base'),
(None, 'url_type', 'URL input should be a string or URL'),
(
'http://2001:db8::ff00:42:8329',
'url.extra',
"URL invalid, extra characters found after valid URL: ':db8::ff00:42:8329'",
{'extra': "':db8::ff00:42:8329'"},
'url_parsing',
'Input should be a valid URL, invalid port number',
),
('http://[192.168.1.1]:8329', 'url.host', 'URL host invalid', None),
('http://example.com:99999', 'url.port', 'URL port invalid, port cannot exceed 65535', None),
(
'http://example##',
'url.extra',
"URL invalid, extra characters found after valid URL: '#'",
{'extra': "'#'"},
),
(
'http://example/##',
'url.extra',
"URL invalid, extra characters found after valid URL: '#'",
{'extra': "'#'"},
),
('file:///foo/bar', 'url.host', 'URL host invalid', None),
('http://[192.168.1.1]:8329', 'url_parsing', 'Input should be a valid URL, invalid IPv6 address'),
('http://example.com:99999', 'url_parsing', 'Input should be a valid URL, invalid port number'),
],
)
def test_any_url_invalid(value, err_kind, err_msg, err_ctx):
def test_any_url_invalid(value, err_type, err_msg):
class Model(BaseModel):
v: AnyUrl
@@ -152,9 +132,7 @@ def test_any_url_invalid(value, err_kind, err_msg, err_ctx):
assert len(exc_info.value.errors()) == 1, exc_info.value.errors()
error = exc_info.value.errors()[0]
# debug(error)
assert error['type'] == err_kind, value
assert error['msg'] == err_msg, value
assert error.get('ctx') == err_ctx, value
assert {'type': error['type'], 'msg': error['msg']} == {'type': err_type, 'msg': err_msg}
def validate_url(s):
@@ -166,30 +144,22 @@ def validate_url(s):
def test_any_url_parts():
url = validate_url('http://example.org')
assert str(url) == 'http://example.org'
assert repr(url) == "AnyUrl('http://example.org', scheme='http', host='example.org', tld='org', host_type='domain')"
assert str(url) == 'http://example.org/'
assert repr(url) == "Url('http://example.org/')"
assert url.scheme == 'http'
assert url.host == 'example.org'
assert url.tld == 'org'
assert url.host_type == 'domain'
assert url.port is None
assert url == AnyUrl('http://example.org', scheme='https', host='example.org')
assert url.port == 80
def test_url_repr():
url = validate_url('http://user:password@example.org:1234/the/path/?query=here#fragment=is;this=bit')
assert str(url) == 'http://user:password@example.org:1234/the/path/?query=here#fragment=is;this=bit'
assert repr(url) == (
"AnyUrl('http://user:password@example.org:1234/the/path/?query=here#fragment=is;this=bit', "
"scheme='http', user='user', password='password', host='example.org', tld='org', host_type='domain', "
"port='1234', path='/the/path/', query='query=here', fragment='fragment=is;this=bit')"
)
assert repr(url) == "Url('http://user:password@example.org:1234/the/path/?query=here#fragment=is;this=bit')"
assert url.scheme == 'http'
assert url.user == 'user'
assert url.username == 'user'
assert url.password == 'password'
assert url.host == 'example.org'
assert url.host_type == 'domain'
assert url.port == '1234'
assert url.port == 1234
assert url.path == '/the/path/'
assert url.query == 'query=here'
assert url.fragment == 'fragment=is;this=bit'
@@ -199,9 +169,8 @@ def test_ipv4_port():
url = validate_url('ftp://123.45.67.8:8329/')
assert url.scheme == 'ftp'
assert url.host == '123.45.67.8'
assert url.host_type == 'ipv4'
assert url.port == '8329'
assert url.user is None
assert url.port == 8329
assert url.username is None
assert url.password is None
@@ -209,9 +178,8 @@ def test_ipv4_no_port():
url = validate_url('ftp://123.45.67.8')
assert url.scheme == 'ftp'
assert url.host == '123.45.67.8'
assert url.host_type == 'ipv4'
assert url.port is None
assert url.user is None
assert url.port == 21
assert url.username is None
assert url.password is None
@@ -219,36 +187,32 @@ def test_ipv6_port():
url = validate_url('wss://[2001:db8::ff00:42]:8329')
assert url.scheme == 'wss'
assert url.host == '[2001:db8::ff00:42]'
assert url.host_type == 'ipv6'
assert url.port == '8329'
assert url.port == 8329
def test_int_domain():
url = validate_url('https://£££.org')
assert url.host == 'xn--9aaa.org'
assert url.host_type == 'int_domain'
assert str(url) == 'https://xn--9aaa.org'
assert str(url) == 'https://xn--9aaa.org/'
def test_co_uk():
url = validate_url('http://example.co.uk')
assert str(url) == 'http://example.co.uk'
assert str(url) == 'http://example.co.uk/'
assert url.scheme == 'http'
assert url.host == 'example.co.uk'
assert url.tld == 'uk' # wrong but no better solution
assert url.host_type == 'domain'
def test_user_no_password():
url = validate_url('http://user:@example.org')
assert url.user == 'user'
assert url.password == ''
assert url.username == 'user'
assert url.password is None
assert url.host == 'example.org'
def test_user_info_no_user():
url = validate_url('http://:password@example.org')
assert url.user == ''
assert url.username is None
assert url.password == 'password'
assert url.host == 'example.org'
@@ -257,7 +221,7 @@ def test_at_in_path():
url = validate_url('https://twitter.com/@handle')
assert url.scheme == 'https'
assert url.host == 'twitter.com'
assert url.user is None
assert url.username is None
assert url.password is None
assert url.path == '/@handle'
@@ -272,52 +236,46 @@ def test_fragment_without_query():
@pytest.mark.parametrize(
'value',
'value,expected',
[
'http://example.org',
'http://example.org/foobar',
'http://example.org.',
'http://example.org./foobar',
'HTTP://EXAMPLE.ORG',
'https://example.org',
'https://example.org?a=1&b=2',
'https://example.org#a=3;b=3',
'https://foo_bar.example.com/',
'https://exam_ple.com/', # should perhaps fail? I think it's contrary to the RFC but chrome allows it
'https://example.xn--p1ai',
'https://example.xn--vermgensberatung-pwb',
'https://example.xn--zfr164b',
('http://example.org', 'http://example.org/'),
('http://example.org/foobar', 'http://example.org/foobar'),
('http://example.org.', 'http://example.org./'),
('http://example.org./foobar', 'http://example.org./foobar'),
('HTTP://EXAMPLE.ORG', 'http://example.org/'),
('https://example.org', 'https://example.org/'),
('https://example.org?a=1&b=2', 'https://example.org/?a=1&b=2'),
('https://example.org#a=3;b=3', 'https://example.org/#a=3;b=3'),
('https://foo_bar.example.com/', 'https://foo_bar.example.com/'),
('https://exam_ple.com/', 'https://exam_ple.com/'),
('https://example.xn--p1ai', 'https://example.xn--p1ai/'),
('https://example.xn--vermgensberatung-pwb', 'https://example.xn--vermgensberatung-pwb/'),
('https://example.xn--zfr164b', 'https://example.xn--zfr164b/'),
],
)
def test_http_url_success(value):
def test_http_url_success(value, expected):
class Model(BaseModel):
v: HttpUrl
assert Model(v=value).v == value
assert str(Model(v=value).v) == expected
@pytest.mark.parametrize(
'value,err_kind,err_msg,err_ctx',
'value,err_type,err_msg',
[
(
'ftp://example.com/',
'url.scheme',
'URL scheme not permitted',
{'allowed_schemes': 'http, https'},
'url_scheme',
"URL scheme should be 'http' or 'https'",
),
('http://foobar/', 'url.host', 'URL host invalid, top level domain required', None),
('http://localhost/', 'url.host', 'URL host invalid, top level domain required', None),
('https://example.123', 'url.host', 'URL host invalid, top level domain required', None),
('https://example.ab123', 'url.host', 'URL host invalid, top level domain required', None),
(
'x' * 2084,
'string_too_long',
'String should have at most 2083 characters',
{'max_length': 2083},
'url_too_long',
'URL should have at most 2083 characters',
),
],
)
def test_http_url_invalid(value, err_kind, err_msg, err_ctx):
def test_http_url_invalid(value, err_type, err_msg):
class Model(BaseModel):
v: HttpUrl
@@ -325,100 +283,63 @@ def test_http_url_invalid(value, err_kind, err_msg, err_ctx):
Model(v=value)
assert len(exc_info.value.errors()) == 1, exc_info.value.errors()
error = exc_info.value.errors()[0]
assert error['type'] == err_kind, value
assert error['msg'] == err_msg, value
assert error.get('ctx') == err_ctx, value
assert {'type': error['type'], 'msg': error['msg']} == {'type': err_type, 'msg': err_msg}
@pytest.mark.parametrize(
'input,output',
[
(' https://www.example.com \n', 'https://www.example.com'),
(b'https://www.example.com', 'https://www.example.com'),
(' https://www.example.com \n', 'https://www.example.com/'),
(b'https://www.example.com', 'https://www.example.com/'),
# https://www.xudongz.com/blog/2017/idn-phishing/ accepted but converted
('https://www.аррӏе.com/', 'https://www.xn--80ak6aa92e.com/'),
('https://exampl£e.org', 'https://xn--example-gia.org'),
('https://example.珠宝', 'https://example.xn--pbt977c'),
('https://example.vermögensberatung', 'https://example.xn--vermgensberatung-pwb'),
('https://example.рф', 'https://example.xn--p1ai'),
('https://exampl£e.珠宝', 'https://xn--example-gia.xn--pbt977c'),
('https://exampl£e.org', 'https://xn--example-gia.org/'),
('https://example.珠宝', 'https://example.xn--pbt977c/'),
('https://example.vermögensberatung', 'https://example.xn--vermgensberatung-pwb/'),
('https://example.рф', 'https://example.xn--p1ai/'),
('https://exampl£e.珠宝', 'https://xn--example-gia.xn--pbt977c/'),
],
)
def test_coerse_url(input, output):
def test_coerce_url(input, output):
class Model(BaseModel):
v: HttpUrl
assert Model(v=input).v == output
assert str(Model(v=input).v) == output
@pytest.mark.parametrize(
'input,output',
'value,expected',
[
(' https://www.example.com \n', 'com'),
(b'https://www.example.com', 'com'),
('https://www.example.com?param=value', 'com'),
('https://example.珠宝', 'xn--pbt977c'),
('https://exampl£e.珠宝', 'xn--pbt977c'),
('https://example.vermögensberatung', 'xn--vermgensberatung-pwb'),
('https://example.рф', 'xn--p1ai'),
('https://example.рф?param=value', 'xn--p1ai'),
('file:///foo/bar', 'file:///foo/bar'),
('file://localhost/foo/bar', 'file:///foo/bar'),
('file:////localhost/foo/bar', 'file:///localhost/foo/bar'),
],
)
def test_parses_tld(input, output):
class Model(BaseModel):
v: HttpUrl
assert Model(v=input).v.tld == output
@pytest.mark.parametrize(
'value',
['file:///foo/bar', 'file://localhost/foo/bar', 'file:////localhost/foo/bar'],
)
def test_file_url_success(value):
def test_file_url_success(value, expected):
class Model(BaseModel):
v: FileUrl
assert Model(v=value).v == value
def test_get_default_parts():
class MyConnectionString(AnyUrl):
@staticmethod
def get_default_parts(parts):
# get default parts allows to generate custom conn strings to services
return {
'user': 'admin',
'password': '123',
}
class C(BaseModel):
connection: MyConnectionString
c = C(connection='protocol://service:8080')
assert c.connection == 'protocol://admin:123@service:8080'
assert c.connection.user == 'admin'
assert c.connection.password == '123'
assert str(Model(v=value).v) == expected
@pytest.mark.parametrize(
'url,port',
'url,expected_port, expected_str',
[
('https://www.example.com', '443'),
('https://www.example.com:443', '443'),
('https://www.example.com:8089', '8089'),
('http://www.example.com', '80'),
('http://www.example.com:80', '80'),
('http://www.example.com:8080', '8080'),
('https://www.example.com/', 443, 'https://www.example.com/'),
('https://www.example.com:443/', 443, 'https://www.example.com/'),
('https://www.example.com:8089/', 8089, 'https://www.example.com:8089/'),
('http://www.example.com/', 80, 'http://www.example.com/'),
('http://www.example.com:80/', 80, 'http://www.example.com/'),
('http://www.example.com:8080/', 8080, 'http://www.example.com:8080/'),
],
)
def test_http_urls_default_port(url, port):
def test_http_urls_default_port(url, expected_port, expected_str):
class Model(BaseModel):
v: HttpUrl
m = Model(v=url)
assert m.v.port == port
assert m.v == url
assert m.v.port == expected_port
assert str(m.v) == expected_str
@pytest.mark.parametrize(
@@ -434,7 +355,7 @@ def test_postgres_dsns(dsn):
class Model(BaseModel):
a: PostgresDsn
assert Model(a=dsn).a == dsn
assert str(Model(a=dsn).a) == dsn
@pytest.mark.parametrize(
@@ -443,56 +364,50 @@ def test_postgres_dsns(dsn):
(
'postgres://user:pass@host1.db.net:4321,/foo/bar:5432/app',
{
'type': 'url.host',
'type': 'url_parsing',
'loc': ('a',),
'msg': 'URL host invalid',
'msg': 'Input should be a valid URL, empty host',
'input': 'postgres://user:pass@host1.db.net:4321,/foo/bar:5432/app',
},
),
(
'postgres://user:pass@host1.db.net,/app',
{
'type': 'url.host',
'type': 'url_parsing',
'loc': ('a',),
'msg': 'URL host invalid',
'msg': 'Input should be a valid URL, empty host',
'input': 'postgres://user:pass@host1.db.net,/app',
},
),
(
'postgres://user:pass@/foo/bar:5432,host1.db.net:4321/app',
{
'type': 'url.host',
'type': 'url_parsing',
'loc': ('a',),
'msg': 'URL host invalid',
'msg': 'Input should be a valid URL, empty host',
'input': 'postgres://user:pass@/foo/bar:5432,host1.db.net:4321/app',
},
),
(
'postgres://localhost:5432/app',
{
'type': 'url.userinfo',
'loc': ('a',),
'msg': 'userinfo required in URL but missing',
'input': 'postgres://localhost:5432/app',
},
),
(
'postgres://user@/foo/bar:5432/app',
{
'type': 'url.host',
'type': 'url_parsing',
'loc': ('a',),
'msg': 'URL host invalid',
'msg': 'Input should be a valid URL, empty host',
'input': 'postgres://user@/foo/bar:5432/app',
},
),
(
'http://example.org',
{
'type': 'url.scheme',
'type': 'url_scheme',
'loc': ('a',),
'msg': 'URL scheme not permitted',
'msg': (
"URL scheme should be 'postgres', 'postgresql', 'postgresql+asyncpg', 'postgresql+pg8000', "
"'postgresql+psycopg', 'postgresql+psycopg2', 'postgresql+psycopg2cffi', "
"'postgresql+py-postgresql' or 'postgresql+pygresql'"
),
'input': 'http://example.org',
'ctx': {'allowed_schemes': ', '.join(sorted(PostgresDsn.allowed_schemes))},
},
),
),
@@ -504,6 +419,7 @@ def test_postgres_dsns_validation_error(dsn, error_message):
with pytest.raises(ValidationError) as exc_info:
Model(a=dsn)
error = exc_info.value.errors()[0]
error.pop('ctx', None)
assert error == error_message
@@ -512,56 +428,40 @@ def test_multihost_postgres_dsns():
a: PostgresDsn
any_multihost_url = Model(a='postgres://user:pass@host1.db.net:4321,host2.db.net:6432/app').a
assert any_multihost_url == 'postgres://user:pass@host1.db.net:4321,host2.db.net:6432/app'
assert str(any_multihost_url) == 'postgres://user:pass@host1.db.net:4321,host2.db.net:6432/app'
assert any_multihost_url.scheme == 'postgres'
assert any_multihost_url.host is None
assert any_multihost_url.host_type is None
assert any_multihost_url.tld is None
assert any_multihost_url.port is None
assert any_multihost_url.path == '/app'
assert any_multihost_url.hosts == [
{'host': 'host1.db.net', 'port': '4321', 'tld': 'net', 'host_type': 'domain', 'rebuild': False},
{'host': 'host2.db.net', 'port': '6432', 'tld': 'net', 'host_type': 'domain', 'rebuild': False},
# insert_assert(any_multihost_url.hosts())
assert any_multihost_url.hosts() == [
{'username': 'user', 'password': 'pass', 'host': 'host1.db.net', 'port': 4321},
{'username': None, 'password': None, 'host': 'host2.db.net', 'port': 6432},
]
any_multihost_url = Model(a='postgres://user:pass@host.db.net:4321/app').a
assert any_multihost_url.scheme == 'postgres'
assert any_multihost_url == 'postgres://user:pass@host.db.net:4321/app'
assert any_multihost_url.host == 'host.db.net'
assert any_multihost_url.host_type == 'domain'
assert any_multihost_url.tld == 'net'
assert any_multihost_url.port == '4321'
assert str(any_multihost_url) == 'postgres://user:pass@host.db.net:4321/app'
assert any_multihost_url.path == '/app'
assert any_multihost_url.hosts is None
# insert_assert(any_multihost_url.hosts())
assert any_multihost_url.hosts() == [{'username': 'user', 'password': 'pass', 'host': 'host.db.net', 'port': 4321}]
def test_cockroach_dsns():
class Model(BaseModel):
a: CockroachDsn
assert Model(a='cockroachdb://user:pass@localhost:5432/app').a == 'cockroachdb://user:pass@localhost:5432/app'
assert str(Model(a='cockroachdb://user:pass@localhost:5432/app').a) == 'cockroachdb://user:pass@localhost:5432/app'
assert (
Model(a='cockroachdb+psycopg2://user:pass@localhost:5432/app').a
str(Model(a='cockroachdb+psycopg2://user:pass@localhost:5432/app').a)
== 'cockroachdb+psycopg2://user:pass@localhost:5432/app'
)
assert (
Model(a='cockroachdb+asyncpg://user:pass@localhost:5432/app').a
str(Model(a='cockroachdb+asyncpg://user:pass@localhost:5432/app').a)
== 'cockroachdb+asyncpg://user:pass@localhost:5432/app'
)
with pytest.raises(ValidationError) as exc_info:
Model(a='http://example.org')
assert exc_info.value.errors()[0]['type'] == 'url.scheme'
with pytest.raises(ValidationError) as exc_info:
Model(a='cockroachdb://localhost:5432/app')
# error = exc_info.value.errors()[0]
# insert_assert(error)
#
# with pytest.raises(ValidationError) as exc_info:
# Model(a='cockroachdb://user@/foo/bar:5432/app')
# error = exc_info.value.errors()[0]
# insert_assert(error)
assert exc_info.value.errors()[0]['type'] == 'url_scheme'
def test_amqp_dsns():
@@ -569,21 +469,21 @@ def test_amqp_dsns():
a: AmqpDsn
m = Model(a='amqp://user:pass@localhost:1234/app')
assert m.a == 'amqp://user:pass@localhost:1234/app'
assert m.a.user == 'user'
assert str(m.a) == 'amqp://user:pass@localhost:1234/app'
assert m.a.username == 'user'
assert m.a.password == 'pass'
m = Model(a='amqps://user:pass@localhost:5432//')
assert m.a == 'amqps://user:pass@localhost:5432//'
assert str(m.a) == 'amqps://user:pass@localhost:5432//'
with pytest.raises(ValidationError) as exc_info:
Model(a='http://example.org')
assert exc_info.value.errors()[0]['type'] == 'url.scheme'
assert exc_info.value.errors()[0]['type'] == 'url_scheme'
# Password is not required for AMQP protocol
m = Model(a='amqp://localhost:1234/app')
assert m.a == 'amqp://localhost:1234/app'
assert m.a.user is None
assert str(m.a) == 'amqp://localhost:1234/app'
assert m.a.username is None
assert m.a.password is None
# Only schema is required for AMQP protocol.
@@ -600,24 +500,24 @@ def test_redis_dsns():
a: RedisDsn
m = Model(a='redis://user:pass@localhost:1234/app')
assert m.a == 'redis://user:pass@localhost:1234/app'
assert m.a.user == 'user'
assert str(m.a) == 'redis://user:pass@localhost:1234/app'
assert m.a.username == 'user'
assert m.a.password == 'pass'
m = Model(a='rediss://user:pass@localhost:1234/app')
assert m.a == 'rediss://user:pass@localhost:1234/app'
assert str(m.a) == 'rediss://user:pass@localhost:1234/app'
m = Model(a='rediss://:pass@localhost:1234')
assert m.a == 'rediss://:pass@localhost:1234/0'
assert str(m.a) == 'rediss://:pass@localhost:1234/0'
with pytest.raises(ValidationError) as exc_info:
Model(a='http://example.org')
assert exc_info.value.errors()[0]['type'] == 'url.scheme'
assert exc_info.value.errors()[0]['type'] == 'url_scheme'
# Password is not required for Redis protocol
m = Model(a='redis://localhost:1234/app')
assert m.a == 'redis://localhost:1234/app'
assert m.a.user is None
assert str(m.a) == 'redis://localhost:1234/app'
assert m.a.username is None
assert m.a.password is None
# Only schema is required for Redis protocol. Otherwise it will be set to default
@@ -625,7 +525,7 @@ def test_redis_dsns():
m = Model(a='rediss://')
assert m.a.scheme == 'rediss'
assert m.a.host == 'localhost'
assert m.a.port == '6379'
assert m.a.port == 6379
assert m.a.path == '/0'
@@ -635,25 +535,25 @@ def test_mongodb_dsns():
# TODO: Need to unit tests about "Replica Set", "Sharded cluster" and other deployment modes of MongoDB
m = Model(a='mongodb://user:pass@localhost:1234/app')
assert m.a == 'mongodb://user:pass@localhost:1234/app'
assert m.a.user == 'user'
assert m.a.password == 'pass'
assert str(m.a) == 'mongodb://user:pass@localhost:1234/app'
# insert_assert(m.a.hosts())
assert m.a.hosts() == [{'username': 'user', 'password': 'pass', 'host': 'localhost', 'port': 1234}]
with pytest.raises(ValidationError) as exc_info:
Model(a='http://example.org')
assert exc_info.value.errors()[0]['type'] == 'url.scheme'
assert exc_info.value.errors()[0]['type'] == 'url_scheme'
# Password is not required for MongoDB protocol
m = Model(a='mongodb://localhost:1234/app')
assert m.a == 'mongodb://localhost:1234/app'
assert m.a.user is None
assert m.a.password is None
assert str(m.a) == 'mongodb://localhost:1234/app'
# insert_assert(m.a.hosts())
assert m.a.hosts() == [{'username': None, 'password': None, 'host': 'localhost', 'port': 1234}]
# Only schema and host is required for MongoDB protocol
m = Model(a='mongodb://localhost')
assert m.a.scheme == 'mongodb'
assert m.a.host == 'localhost'
assert m.a.port == '27017'
# insert_assert(m.a.hosts())
assert m.a.hosts() == [{'username': None, 'password': None, 'host': 'localhost', 'port': 27017}]
def test_kafka_dsns():
@@ -663,90 +563,43 @@ def test_kafka_dsns():
m = Model(a='kafka://')
assert m.a.scheme == 'kafka'
assert m.a.host == 'localhost'
assert m.a.port == '9092'
assert m.a == 'kafka://localhost:9092'
assert m.a.port == 9092
assert str(m.a) == 'kafka://localhost:9092'
m = Model(a='kafka://kafka1')
assert m.a == 'kafka://kafka1:9092'
assert str(m.a) == 'kafka://kafka1:9092'
with pytest.raises(ValidationError) as exc_info:
Model(a='http://example.org')
assert exc_info.value.errors()[0]['type'] == 'url.scheme'
assert exc_info.value.errors()[0]['type'] == 'url_scheme'
m = Model(a='kafka://kafka3:9093')
assert m.a.user is None
assert m.a.username is None
assert m.a.password is None
def test_custom_schemes():
class Model(BaseModel):
v: stricturl(strip_whitespace=False, allowed_schemes={'ws', 'wss'}) # noqa: F821
v: Annotated[Url, UrlConstraints(allowed_schemes=['ws', 'wss']), Strict()]
class Model2(BaseModel):
v: stricturl(host_required=False, allowed_schemes={'foo'}) # noqa: F821
v: Annotated[Url, UrlConstraints(host_required=False, allowed_schemes=['foo'])]
assert Model(v='ws://example.org').v == 'ws://example.org'
assert Model2(v='foo:///foo/bar').v == 'foo:///foo/bar'
assert str(Model(v='ws://example.org').v) == 'ws://example.org/'
assert str(Model2(v='foo:///foo/bar').v) == 'foo:///foo/bar'
with pytest.raises(ValidationError):
with pytest.raises(ValidationError, match=r"URL scheme should be 'ws' or 'wss' \[type=url_scheme,"):
Model(v='http://example.org')
with pytest.raises(ValidationError):
with pytest.raises(ValidationError, match='leading or trailing control or space character are ignored in URLs'):
Model(v='ws://example.org ')
with pytest.raises(ValidationError):
with pytest.raises(ValidationError, match=r'syntax rules, expected // \[type=url_syntax_violation,'):
Model(v='ws:///foo/bar')
@pytest.mark.parametrize(
'kwargs,expected',
[
(dict(scheme='ws', user='foo', host='example.net'), 'ws://foo@example.net'),
(dict(scheme='ws', user='foo', password='x', host='example.net'), 'ws://foo:x@example.net'),
(dict(scheme='ws', host='example.net', query='a=b', fragment='c=d'), 'ws://example.net?a=b#c=d'),
(dict(scheme='http', host='example.net', port='1234'), 'http://example.net:1234'),
],
)
def test_build_url(kwargs, expected):
assert AnyUrl(None, **kwargs) == expected
@pytest.mark.parametrize(
'kwargs,expected',
[
(dict(scheme='http', host='example.net'), 'http://example.net'),
(dict(scheme='https', host='example.net'), 'https://example.net'),
(dict(scheme='http', user='foo', host='example.net'), 'http://foo@example.net'),
(dict(scheme='https', user='foo', host='example.net'), 'https://foo@example.net'),
(dict(scheme='http', user='foo', host='example.net', port='123'), 'http://foo@example.net:123'),
(dict(scheme='https', user='foo', host='example.net', port='123'), 'https://foo@example.net:123'),
(dict(scheme='http', user='foo', password='x', host='example.net'), 'http://foo:x@example.net'),
(dict(scheme='http2', user='foo', password='x', host='example.net'), 'http2://foo:x@example.net'),
(dict(scheme='http', host='example.net', query='a=b', fragment='c=d'), 'http://example.net?a=b#c=d'),
(dict(scheme='http2', host='example.net', query='a=b', fragment='c=d'), 'http2://example.net?a=b#c=d'),
(dict(scheme='http', host='example.net', port='1234'), 'http://example.net:1234'),
(dict(scheme='https', host='example.net', port='1234'), 'https://example.net:1234'),
],
)
@pytest.mark.parametrize('klass', [AnyHttpUrl, HttpUrl])
def test_build_any_http_url(klass, kwargs, expected):
assert klass(None, **kwargs) == expected
@pytest.mark.parametrize(
'klass, kwargs,expected',
[
(AnyHttpUrl, dict(scheme='http', user='foo', host='example.net', port='80'), 'http://foo@example.net:80'),
(AnyHttpUrl, dict(scheme='https', user='foo', host='example.net', port='443'), 'https://foo@example.net:443'),
(HttpUrl, dict(scheme='http', user='foo', host='example.net', port='80'), 'http://foo@example.net'),
(HttpUrl, dict(scheme='https', user='foo', host='example.net', port='443'), 'https://foo@example.net'),
],
)
def test_build_http_url_port(klass, kwargs, expected):
assert klass(None, **kwargs) == expected
def test_son():
@pytest.mark.xfail(reason='URL to JSON not yet supported')
def test_json():
class Model(BaseModel):
v: HttpUrl
+5 -5
View File
@@ -36,7 +36,7 @@ from pydantic.color import Color
from pydantic.dataclasses import dataclass
from pydantic.fields import FieldInfo
from pydantic.generics import GenericModel
from pydantic.networks import AnyUrl, EmailStr, IPvAnyAddress, IPvAnyInterface, IPvAnyNetwork, NameEmail, stricturl
from pydantic.networks import AnyUrl, EmailStr, IPvAnyAddress, IPvAnyInterface, IPvAnyNetwork, NameEmail
from pydantic.schema import (
get_flat_models_from_model,
get_flat_models_from_models,
@@ -808,10 +808,10 @@ def test_str_constrained_types(field_type, expected_schema):
'field_type,expected_schema',
[
(AnyUrl, {'title': 'A', 'type': 'string', 'format': 'uri', 'minLength': 1, 'maxLength': 2**16}),
(
stricturl(min_length=5, max_length=10),
{'title': 'A', 'type': 'string', 'format': 'uri', 'minLength': 5, 'maxLength': 10},
),
# (
# stricturl(min_length=5, max_length=10),
# {'title': 'A', 'type': 'string', 'format': 'uri', 'minLength': 5, 'maxLength': 10},
# ),
],
)
def test_special_str_types(field_type, expected_schema):
+2 -2
View File
@@ -2822,7 +2822,7 @@ def test_json_not_str():
{
'type': 'json_type',
'loc': ('json_obj',),
'msg': 'JSON input should be str, bytes or bytearray',
'msg': 'JSON input should be string, bytes or bytearray',
'input': 12,
}
]
@@ -2880,7 +2880,7 @@ def test_json_required():
json_obj: Json
assert JsonRequired(json_obj='["x", "y", "z"]').dict() == {'json_obj': ['x', 'y', 'z']}
with pytest.raises(ValidationError, match=r'JSON input should be str, bytes or bytearray \[type=json_type,'):
with pytest.raises(ValidationError, match=r'JSON input should be string, bytes or bytearray \[type=json_type,'):
JsonRequired(json_obj=None)
with pytest.raises(ValidationError, match=r'Field required \[type=missing,'):
JsonRequired()