new URL parsing (#755)

* new URL parsing, fix #603, fix #541

* AnyUrl parts and more tests

* more coverage and db DSNs

* remove DSN methods

* tests for urlstr

* remove debug

* make AnyStr a subtype of str

* fix with cython

* rearranging networking code

* allowing international domains, cleanup

* support international domains

* better URL builder

* allow underscores in subdomains and domains

* tests for json and schema, max length

* urlstr > stricturl

* updating docs

* tweak docs examples

* tweak docs
This commit is contained in:
Samuel Colvin
2019-09-02 11:37:33 +01:00
committed by GitHub
parent 956f45962e
commit 79017111aa
26 changed files with 1044 additions and 835 deletions
+1 -1
View File
@@ -9,7 +9,7 @@ install:
SKIP_CYTHON=1 pip install -e .
.PHONY: build-cython-trace
build-cython-trace: clean
build-cython-trace:
python setup.py build_ext --force --inplace --define CYTHON_TRACE
.PHONY: build-cython
+1
View File
@@ -0,0 +1 @@
**Breaking Change:** complete rewrite of ``URL`` parsing logic
+2 -2
View File
@@ -1,9 +1,9 @@
from pydantic import UrlStr
from pydantic import AnyUrl
from pydantic.dataclasses import dataclass
@dataclass
class NavbarButton:
href: UrlStr
href: AnyUrl
@dataclass
class Navbar:
+30 -19
View File
@@ -1,12 +1,36 @@
import uuid
from decimal import Decimal
from ipaddress import IPv4Address, IPv6Address, IPv4Interface, IPv6Interface, IPv4Network, IPv6Network
from ipaddress import IPv4Address, IPv4Interface, IPv4Network, IPv6Address, IPv6Interface, IPv6Network
from pathlib import Path
from uuid import UUID
from pydantic import (DSN, UUID1, UUID3, UUID4, UUID5, BaseModel, DirectoryPath, EmailStr, FilePath, NameEmail,
NegativeFloat, NegativeInt, PositiveFloat, PositiveInt, PyObject, UrlStr, conbytes, condecimal,
confloat, conint, conlist, constr, IPvAnyAddress, IPvAnyInterface, IPvAnyNetwork, SecretStr, SecretBytes)
from pydantic import (
UUID1,
UUID3,
UUID4,
UUID5,
BaseModel,
DirectoryPath,
EmailStr,
FilePath,
IPvAnyAddress,
IPvAnyInterface,
IPvAnyNetwork,
NameEmail,
NegativeFloat,
NegativeInt,
PositiveFloat,
PositiveInt,
PyObject,
SecretBytes,
SecretStr,
conbytes,
condecimal,
confloat,
conint,
conlist,
constr,
)
class Model(BaseModel):
@@ -39,19 +63,9 @@ class Model(BaseModel):
email_address: EmailStr = None
email_and_name: NameEmail = None
url: UrlStr = None
password: SecretStr = None
password_bytes: SecretBytes = None
db_name = 'foobar'
db_user = 'postgres'
db_password: str = None
db_host = 'localhost'
db_port = '5432'
db_driver = 'postgres'
db_query: dict = None
dsn: DSN = None
decimal: Decimal = None
decimal_positive: condecimal(gt=0) = None
decimal_negative: condecimal(lt=0) = None
@@ -72,6 +86,7 @@ class Model(BaseModel):
ip_v4_interface: IPv4Interface = None
ip_v6_interface: IPv6Interface = None
m = Model(
cos_function='math.cos',
path_to_something='/home',
@@ -94,10 +109,8 @@ m = Model(
short_list=[1, 2],
email_address='Samuel Colvin <s@muelcolvin.com >',
email_and_name='Samuel Colvin <s@muelcolvin.com >',
url='http://example.com',
password='password',
password_bytes=b'password2',
dsn='postgres://postgres@localhost:5432/foobar',
decimal=Decimal('42.24'),
decimal_positive=Decimal('21.12'),
decimal_negative=Decimal('-21.12'),
@@ -116,7 +129,7 @@ m = Model(
ip_v6_network=IPv6Network('ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/128'),
ip_vany_interface=IPv4Interface('192.168.0.0/24'),
ip_v4_interface=IPv4Interface('192.168.0.0/24'),
ip_v6_interface=IPv6Interface('ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/128')
ip_v6_interface=IPv6Interface('ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/128'),
)
print(m.dict())
"""
@@ -143,11 +156,9 @@ print(m.dict())
'email_address': 's@muelcolvin.com',
'email_and_name': <NameEmail("Samuel Colvin <s@muelcolvin.com>")>,
'is_really_a_bool': True,
'url': 'http://example.com',
'password': SecretStr('**********'),
'password_bytes': SecretBytes(b'**********'),
...
'dsn': 'postgres://postgres@localhost:5432/foobar',
'decimal': Decimal('42.24'),
'decimal_positive': Decimal('21.12'),
'decimal_negative': Decimal('-21.12'),
+42
View File
@@ -0,0 +1,42 @@
from pydantic import BaseModel, HttpUrl, PostgresDsn, ValidationError, validator
class MyModel(BaseModel):
url: HttpUrl
m = MyModel(url='http://www.example.com')
# the repr() method for a url will display all properties of the url
print(repr(m.url))
#> <HttpUrl('http://www.example.com' scheme='http' host='www.example.com'
#> tld='com' host_type='domain')>
print(m.url.scheme)
#> http
print(m.url.host)
#> www.example.com
print(m.url.host_type)
#> domain
print(m.url.port)
#> None
class MyDatabaseModel(BaseModel):
db: PostgresDsn
@validator('db')
def check_db_name(cls, v):
assert v.path and len(v.path) > 1, 'database must be provided'
return v
m = MyDatabaseModel(db='postgres://user:pass@localhost:5432/foobar')
print(m.db)
#> postgres://user:pass@localhost:5432/foobar
try:
MyDatabaseModel(db='postgres://user:pass@localhost:5432')
except ValidationError as e:
print(e)
"""
1 validation error for MyDatabaseModel
db
database must be provided (type=assertion_error)
"""
+16
View File
@@ -0,0 +1,16 @@
from pydantic import BaseModel, HttpUrl
class MyModel(BaseModel):
url: HttpUrl
m1 = MyModel(url='http://puny£code.com')
print(m1.url)
#> http://xn--punycode-eja.com
print(m1.url.host_type)
#> int_domain
m2 = MyModel(url='https://www.аррӏе.com/')
print(m2.url)
#> https://www.xn--80ak6aa92e.com/
print(m2.url.host_type)
#> int_domain
+29
View File
@@ -0,0 +1,29 @@
from pydantic import BaseModel, HttpUrl, ValidationError
class MyModel(BaseModel):
url: HttpUrl
m = MyModel(url='http://www.example.com')
print(m.url)
#> http://www.example.com
try:
MyModel(url='ftp://invalid.url')
except ValidationError as e:
print(e)
"""
1 validation error for MyModel
url
URL scheme not permitted (type=value_error.url.scheme;
allowed_schemes={'http', 'https'})
"""
try:
MyModel(url='not a url')
except ValidationError as e:
print(e)
"""
1 validation error for MyModel
url
invalid or missing URL scheme (type=value_error.url.scheme)
"""
+89 -2
View File
@@ -600,7 +600,6 @@ Exotic Types
(This script is complete, it should run "as is")
Booleans
~~~~~~~~
@@ -627,7 +626,6 @@ Here is a script demonstrating some of these behaviors:
(This script is complete, it should run "as is")
Callable
~~~~~~~~
@@ -643,6 +641,95 @@ Fields can also be of type ``Callable``:
callable, no validation of arguments, their types or the return
type is performed.
URLs
....
For URI/URL validation the following types are available:
- ``AnyUrl``: any scheme allowed, TLD not required
- ``AnyHttpUrl``: schema ``http`` or ``https``, TLD not required
- ``HttpUrl``: schema ``http`` or ``https``, TLD required, max length 2083
- ``PostgresDsn``: schema ``postgres`` or ``postgresql``, userinfo required, TLD not required
- ``RedisDsn``: schema ``redis``, userinfo required, tld not required
- ``stricturl``, method with the following keyword arguments:
- ``strip_whitespace: bool = True``
- ``min_length: int = 1``
- ``max_length: int = 2 ** 16``
- ``tld_required: bool = True``
- ``allowed_schemes: Optional[Set[str]] = None``
If you require custom types they can be created in a similar way to the application specific types defined above.
The above types (which all inherit from ``AnyUrl``) will attempt to give descriptive errors when invalid URLs are
provided:
.. literalinclude:: examples/urls.py
(This script is complete, it should run "as is")
URL Properties
~~~~~~~~~~~~~~
Assuming an input URL of ``http://samuel:pass@example.com:8000/the/path/?query=here#fragment=is;this=bit``,
the above types export the following properties:
- ``scheme``: always set - the url schema e.g. ``http`` above
- ``host``: always set - the url host e.g. ``example.com`` above
- ``host_type``: always set - describes the type of host, either:
- ``domain``: e.g. for ``example.com``,
- ``int_domain``: international domain, see :ref:`below <int_domains>`, e.g. for ``exampl£e.org``,
- ``ipv4``: an IP V4 address, e.g. for ``127.0.0.1``, or
- ``ipv6``: an IP V6 address, e.g. for ``2001:db8::ff00:42``
- ``user``: optional - the username if included e.g. ``samuel`` above
- ``password``: optional - the password if included e.g. ``pass`` above
- ``tld``: optional - the top level domain e.g. ``com`` above,
**Note: this will be wrong for any two level domain e.g. "co.uk".** You'll need to implement your own list of TLDs
if you require full TLD validation
- ``port``: optional - the port e.g. ``8000`` above
- ``path``: optional - the path e.g. ``/the/path/`` above
- ``query``: optional - the URL query (aka GET arguments or "search string") e.g. ``query=here`` above
- ``fragment``: optional - the fragment e.g. ``fragment=is;this=bit`` above
If further validation is required, these properties can be used by validators to enforce specific behaviour:
.. literalinclude:: examples/url_properties.py
(This script is complete, it should run "as is")
.. _int_domains:
International Domains
~~~~~~~~~~~~~~~~~~~~~
"International domains" (e.g. a URL where the host includes non-ascii characters) will be encode via
`punycode <https://en.wikipedia.org/wiki/Punycode>`_ (see
`this article <https://www.xudongz.com/blog/2017/idn-phishing/>`_ for a good description of why this is important):
.. literalinclude:: examples/url_punycode.py
(This script is complete, it should run "as is")
Underscores in Hostnames
~~~~~~~~~~~~~~~~~~~~~~~~
.. note::
In *pydantic* underscores are allowed in all parts of a domain except the tld.
Technically this might be wrong - in theory the hostname cannot have underscores but subdomains can.
To explain this; consider the following two cases:
- ``exam_ple.co.uk`` hostname is ``exam_ple``, should not be allowed as there's an underscore in there
- ``foo_bar.example.com`` hostname is ``example`` should be allowed since the underscore is in the subdomain
Without having an exhaustive list of TLDs it would be impossible to differentiate between these two. Therefore
underscores are allowed, you could do further validation in a validator if you wanted.
Also, chrome currently accepts ``http://exam_ple.com`` as a URL, so we're in good (or at least big) company.
Color Type
..........
+1 -1
View File
@@ -136,7 +136,7 @@ table = [
''
],
[
'UrlStr',
'AnyUrl',
'string',
'{"format": "uri"}',
'JSON Schema Validation',
+1
View File
@@ -6,6 +6,7 @@ from .error_wrappers import ValidationError
from .errors import *
from .fields import Required
from .main import *
from .networks import *
from .parse import Protocol
from .schema import Schema
from .types import *
+33 -14
View File
@@ -1,6 +1,6 @@
from decimal import Decimal
from pathlib import Path
from typing import Any, Union
from typing import Any, Set, Union
from .typing import AnyType, display_as_type
@@ -67,26 +67,45 @@ class DictError(PydanticTypeError):
msg_template = 'value is not a valid dict'
class DSNDriverIsEmptyError(PydanticValueError):
code = 'dsn.driver_is_empty'
msg_template = '"driver" field may not be empty'
class EmailError(PydanticValueError):
msg_template = 'value is not a valid email address'
class UrlSchemeError(PydanticValueError):
class UrlError(PydanticValueError):
code = 'url'
class UrlSchemeError(UrlError):
code = 'url.scheme'
msg_template = 'url scheme "{scheme}" is not allowed'
def __init__(self, *, scheme: str) -> None:
super().__init__(scheme=scheme)
msg_template = 'invalid or missing URL scheme'
class UrlRegexError(PydanticValueError):
code = 'url.regex'
msg_template = 'url string does not match regex'
class UrlSchemePermittedError(UrlError):
code = 'url.scheme'
msg_template = 'URL scheme not permitted'
def __init__(self, allowed_schemes: Set[str]):
super().__init__(allowed_schemes=allowed_schemes)
class UrlUserInfoError(UrlError):
code = 'url.userinfo'
msg_template = 'userinfo required in URL but missing'
class UrlHostError(UrlError):
code = 'url.host'
msg_template = 'URL host invalid'
class UrlHostTldError(UrlError):
code = 'url.host'
msg_template = 'URL host invalid, top level domain required'
class UrlExtraError(UrlError):
code = 'url.extra'
msg_template = 'URL invalid, extra characters found after valid URL: {extra!r}'
class EnumError(PydanticTypeError):
+2 -11
View File
@@ -22,17 +22,8 @@ from . import errors as errors_
from .class_validators import Validator, make_generic_validator
from .error_wrappers import ErrorWrapper
from .types import Json, JsonWrapper
from .typing import (
AnyCallable,
AnyType,
Callable,
ForwardRef,
display_as_type,
is_literal_type,
lenient_issubclass,
literal_values,
)
from .utils import sequence_like
from .typing import AnyCallable, AnyType, Callable, ForwardRef, display_as_type, is_literal_type, literal_values
from .utils import lenient_issubclass, sequence_like
from .validators import NoneType, constant_validator, dict_validator, find_validators
try:
+383
View File
@@ -0,0 +1,383 @@
import re
from ipaddress import (
IPv4Address,
IPv4Interface,
IPv4Network,
IPv6Address,
IPv6Interface,
IPv6Network,
_BaseAddress,
_BaseNetwork,
)
from typing import TYPE_CHECKING, Any, Dict, Generator, Optional, Set, Tuple, Type, Union, cast, no_type_check
from . import errors
from .utils import change_exception
from .validators import constr_length_validator, not_none_validator, str_validator
if TYPE_CHECKING: # pragma: no cover
from .fields import Field
from .main import BaseConfig # noqa: F401
from .typing import AnyCallable
CallableGenerator = Generator[AnyCallable, None, None]
try:
import email_validator
except ImportError:
email_validator = None
NetworkType = Union[str, bytes, int, Tuple[Union[str, bytes, int], Union[str, int]]]
__all__ = [
'AnyUrl',
'AnyHttpUrl',
'HttpUrl',
'stricturl',
'EmailStr',
'NameEmail',
'IPvAnyAddress',
'IPvAnyInterface',
'IPvAnyNetwork',
'PostgresDsn',
'RedisDsn',
'validate_email',
]
host_part_names = ('domain', 'ipv4', 'ipv6')
url_regex = re.compile(
r'(?:(?P<scheme>[a-z0-9]+?)://)?' # scheme
r'(?:(?P<user>\S+)(?P<password>:\S*)?@)?' # user info
r'(?:'
r'(?P<ipv4>(?:\d{1,3}\.){3}\d{1,3})|' # ipv4
r'(?P<ipv6>\[[A-F0-9]*:[A-F0-9:]+\])|' # ipv6
r'(?P<domain>[^\s/:?#]+)' # domain, validation occurs later
r')?'
r'(?::(?P<port>\d+))?' # port
r'(?P<path>/[^\s?]*)?' # path
r'(?:\?(?P<query>[^\s#]+))?' # query
r'(?:#(?P<fragment>\S+))?', # fragment
re.IGNORECASE,
)
_ascii_chunk = r'[_0-9a-z](?:[-_0-9a-z]{0,61}[_0-9a-z])?'
_domain_ending = r'(?P<tld>\.[a-z]{2,63})?\.?'
ascii_domain_regex = re.compile(fr'(?:{_ascii_chunk}\.)*?{_ascii_chunk}{_domain_ending}', re.IGNORECASE)
_int_chunk = r'[_0-9a-\U00040000](?:[-_0-9a-\U00040000]{0,61}[_0-9a-\U00040000])?'
int_domain_regex = re.compile(fr'(?:{_int_chunk}\.)*?{_int_chunk}{_domain_ending}', re.IGNORECASE)
class AnyUrl(str):
strip_whitespace = True
min_length = 1
max_length = 2 ** 16
allowed_schemes: Optional[Set[str]] = None
tld_required: bool = False
user_required: bool = False
__slots__ = ('scheme', 'user', 'password', 'host', 'tld', 'host_type', 'port', 'path', 'query', 'fragment')
@no_type_check
def __new__(cls, url: Optional[str], **kwargs) -> object:
return str.__new__(cls, cls.build(**kwargs) if url is None else url)
def __init__(
self,
url: str,
*,
scheme: str,
user: Optional[str] = None,
password: Optional[str] = None,
host: str,
tld: Optional[str] = None,
host_type: str = 'domain',
port: Optional[str] = None,
path: Optional[str] = None,
query: Optional[str] = None,
fragment: Optional[str] = None,
) -> None:
str.__init__(url)
self.scheme = scheme
self.user = user
self.password = password
self.host = host
self.tld = tld
self.host_type = host_type
self.port = port
self.path = path
self.query = query
self.fragment = fragment
@classmethod
def build(
cls,
*,
scheme: str,
user: Optional[str] = None,
password: Optional[str] = None,
host: str,
port: Optional[str] = None,
path: Optional[str] = None,
query: Optional[str] = None,
fragment: Optional[str] = None,
**kwargs: str,
) -> str:
url = scheme + '://'
if user:
url += user
if password:
url += ':' + password
url += '@'
url += host
if port:
url += ':' + port
if path:
url += path
if query:
url += '?' + query
if fragment:
url += '#' + fragment
return url
@classmethod
def __get_validators__(cls) -> 'CallableGenerator':
yield not_none_validator
yield cls.validate
@classmethod
def validate(cls, value: Any, field: 'Field', config: 'BaseConfig') -> 'AnyUrl':
if type(value) == cls:
return value
value = str_validator(value)
if cls.strip_whitespace:
value = value.strip()
url: str = cast(str, constr_length_validator(value, field, config))
m = url_regex.match(url)
# the regex should always match, if it doesn't please report with details of the URL tried
assert m, 'URL regex failed unexpectedly'
parts = m.groupdict()
scheme = parts['scheme']
if scheme is None:
raise errors.UrlSchemeError()
if cls.allowed_schemes and scheme.lower() not in cls.allowed_schemes:
raise errors.UrlSchemePermittedError(cls.allowed_schemes)
user = parts['user']
if cls.user_required and user is None:
raise errors.UrlUserInfoError()
host, tld, host_type, rebuild = cls.validate_host(parts)
if m.end() != len(url):
raise errors.UrlExtraError(extra=url[m.end() :])
return cls(
None if rebuild else url,
scheme=scheme,
user=user,
password=parts['password'],
host=host,
tld=tld,
host_type=host_type,
port=parts['port'],
path=parts['path'],
query=parts['query'],
fragment=parts['fragment'],
)
@classmethod
def validate_host(cls, parts: Dict[str, str]) -> Tuple[str, Optional[str], str, bool]:
host, tld, host_type, rebuild = None, None, None, False
for f in ('domain', 'ipv4', 'ipv6'):
host = parts[f]
if host:
host_type = f
break
if host is None:
raise errors.UrlHostError()
elif host_type == 'domain':
d = ascii_domain_regex.fullmatch(host)
if d is None:
d = int_domain_regex.fullmatch(host)
if not d:
raise errors.UrlHostError()
host_type = 'int_domain'
rebuild = True
host = host.encode('idna').decode('ascii')
tld = d.group('tld')
if tld is not None:
tld = tld[1:]
elif cls.tld_required:
raise errors.UrlHostTldError()
return host, tld, host_type, rebuild # type: ignore
def __repr__(self) -> str:
extra = ' '.join(f'{n}={getattr(self, n)!r}' for n in self.__slots__ if getattr(self, n) is not None)
return f'<{type(self).__name__}({super().__repr__()} {extra})>'
class AnyHttpUrl(AnyUrl):
allowed_schemes = {'http', 'https'}
class HttpUrl(AnyUrl):
allowed_schemes = {'http', 'https'}
tld_required = True
# https://stackoverflow.com/questions/417142/what-is-the-maximum-length-of-a-url-in-different-browsers
max_length = 2083
class PostgresDsn(AnyUrl):
allowed_schemes = {'postgres', 'postgresql'}
user_required = True
class RedisDsn(AnyUrl):
allowed_schemes = {'redis'}
user_required = True
def stricturl(
*,
strip_whitespace: bool = True,
min_length: int = 1,
max_length: int = 2 ** 16,
tld_required: bool = True,
allowed_schemes: Optional[Set[str]] = None,
) -> Type[AnyUrl]:
# use kwargs then define conf in a dict to aid with IDE type hinting
namespace = dict(
strip_whitespace=strip_whitespace,
min_length=min_length,
max_length=max_length,
tld_required=tld_required,
allowed_schemes=allowed_schemes,
)
return type('UrlValue', (AnyUrl,), namespace)
class EmailStr(str):
@classmethod
def __get_validators__(cls) -> 'CallableGenerator':
# included here and below so the error happens straight away
if email_validator is None:
raise ImportError('email-validator is not installed, run `pip install pydantic[email]`')
yield str_validator
yield cls.validate
@classmethod
def validate(cls, value: str) -> str:
return validate_email(value)[1]
class NameEmail:
__slots__ = 'name', 'email'
def __init__(self, name: str, email: str):
self.name = name
self.email = email
@classmethod
def __get_validators__(cls) -> 'CallableGenerator':
if email_validator is None:
raise ImportError('email-validator is not installed, run `pip install pydantic[email]`')
yield str_validator
yield cls.validate
@classmethod
def validate(cls, value: str) -> 'NameEmail':
return cls(*validate_email(value))
def __str__(self) -> str:
return f'{self.name} <{self.email}>'
def __repr__(self) -> str:
return f'<NameEmail("{self}")>'
class IPvAnyAddress(_BaseAddress):
@classmethod
def __get_validators__(cls) -> 'CallableGenerator':
yield cls.validate
@classmethod
def validate(cls, value: Union[str, bytes, int]) -> Union[IPv4Address, IPv6Address]:
try:
return IPv4Address(value)
except ValueError:
pass
with change_exception(errors.IPvAnyAddressError, ValueError):
return IPv6Address(value)
class IPvAnyInterface(_BaseAddress):
@classmethod
def __get_validators__(cls) -> 'CallableGenerator':
yield cls.validate
@classmethod
def validate(cls, value: NetworkType) -> Union[IPv4Interface, IPv6Interface]:
try:
return IPv4Interface(value)
except ValueError:
pass
with change_exception(errors.IPvAnyInterfaceError, ValueError):
return IPv6Interface(value)
class IPvAnyNetwork(_BaseNetwork): # type: ignore
@classmethod
def __get_validators__(cls) -> 'CallableGenerator':
yield cls.validate
@classmethod
def validate(cls, value: NetworkType) -> Union[IPv4Network, IPv6Network]:
# Assume IP Network is defined with a default value for ``strict`` argument.
# Define your own class if you want to specify network address check strictness.
try:
return IPv4Network(value)
except ValueError:
pass
with change_exception(errors.IPvAnyNetworkError, ValueError):
return IPv6Network(value)
pretty_email_regex = re.compile(r'([\w ]*?) *<(.*)> *')
def validate_email(value: str) -> Tuple[str, str]:
"""
Brutally simple email address validation. Note unlike most email address validation
* raw ip address (literal) domain parts are not allowed.
* "John Doe <local_part@domain.com>" style "pretty" email addresses are processed
* the local part check is extremely basic. This raises the possibility of unicode spoofing, but no better
solution is really possible.
* spaces are striped from the beginning and end of addresses but no error is raised
See RFC 5322 but treat it with suspicion, there seems to exist no universally acknowledged test for a valid email!
"""
if email_validator is None:
raise ImportError('email-validator is not installed, run `pip install pydantic[email]`')
m = pretty_email_regex.fullmatch(value)
name: Optional[str] = None
if m:
name, value = m.groups()
email = value.strip()
try:
email_validator.validate_email(email, check_deliverability=False)
except email_validator.EmailNotValidError as e:
raise errors.EmailError() from e
return name or email[: email.index('@')], email.lower()
+5 -18
View File
@@ -13,8 +13,8 @@ from pydantic.color import Color
from .fields import SHAPE_LIST, SHAPE_MAPPING, SHAPE_SET, SHAPE_SINGLETON, SHAPE_TUPLE, Field
from .json import pydantic_encoder
from .networks import AnyUrl, EmailStr, IPvAnyAddress, IPvAnyInterface, IPvAnyNetwork, NameEmail
from .types import (
DSN,
UUID1,
UUID3,
UUID4,
@@ -25,31 +25,19 @@ from .types import (
ConstrainedList,
ConstrainedStr,
DirectoryPath,
EmailStr,
FilePath,
IPvAnyAddress,
IPvAnyInterface,
IPvAnyNetwork,
Json,
NameEmail,
SecretBytes,
SecretStr,
StrictBool,
UrlStr,
condecimal,
confloat,
conint,
conlist,
constr,
)
from .typing import (
is_callable_type,
is_literal_type,
is_new_type,
lenient_issubclass,
literal_values,
new_type_supertype,
)
from .typing import is_callable_type, is_literal_type, is_new_type, literal_values, new_type_supertype
from .utils import lenient_issubclass
if TYPE_CHECKING: # pragma: no cover
from .main import BaseModel # noqa: F401
@@ -667,8 +655,7 @@ validation_attribute_to_schema_keyword = {
# Order is important, subclasses of str must go before str, etc
field_class_to_schema_enum_enabled: Tuple[Tuple[Any, Dict[str, Any]], ...] = (
(EmailStr, {'type': 'string', 'format': 'email'}),
(UrlStr, {'type': 'string', 'format': 'uri'}),
(DSN, {'type': 'string', 'format': 'dsn'}),
(AnyUrl, {'type': 'string', 'format': 'uri'}),
(SecretStr, {'type': 'string', 'writeOnly': True}),
(str, {'type': 'string'}),
(SecretBytes, {'type': 'string', 'writeOnly': True}),
@@ -828,7 +815,7 @@ def get_annotation_from_schema(annotation: Any, schema: Schema) -> Type[Any]:
if isinstance(annotation, type):
attrs: Optional[Tuple[str, ...]] = None
constraint_func: Optional[Callable[..., type]] = None
if issubclass(annotation, str) and not issubclass(annotation, (EmailStr, DSN, UrlStr, ConstrainedStr)):
if issubclass(annotation, str) and not issubclass(annotation, (EmailStr, AnyUrl, ConstrainedStr)):
attrs = ('max_length', 'min_length', 'regex')
constraint_func = constr
elif lenient_issubclass(annotation, numeric_types) and not issubclass(
+4 -209
View File
@@ -1,26 +1,18 @@
import json
import re
from decimal import Decimal
from ipaddress import (
IPv4Address,
IPv4Interface,
IPv4Network,
IPv6Address,
IPv6Interface,
IPv6Network,
_BaseAddress,
_BaseNetwork,
)
from pathlib import Path
from types import new_class
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Pattern, Set, Tuple, Type, TypeVar, Union, cast
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Pattern, Type, TypeVar, Union, cast
from uuid import UUID
from . import errors
from .typing import AnyType
from .utils import change_exception, import_string, make_dsn, url_regex_generator, validate_email
from .utils import import_string
from .validators import (
bytes_validator,
constr_length_validator,
constr_strip_whitespace,
decimal_validator,
float_validator,
int_validator,
@@ -49,12 +41,7 @@ __all__ = [
'conlist',
'ConstrainedStr',
'constr',
'EmailStr',
'UrlStr',
'urlstr',
'NameEmail',
'PyObject',
'DSN',
'ConstrainedInt',
'conint',
'PositiveInt',
@@ -73,9 +60,6 @@ __all__ = [
'DirectoryPath',
'Json',
'JsonWrapper',
'IPvAnyAddress',
'IPvAnyInterface',
'IPvAnyNetwork',
'SecretStr',
'SecretBytes',
'StrictBool',
@@ -88,7 +72,6 @@ NoneStrBytes = Optional[StrBytes]
OptionalInt = Optional[int]
OptionalIntFloat = Union[OptionalInt, float]
OptionalIntFloatDecimal = Union[OptionalIntFloat, Decimal]
NetworkType = Union[str, bytes, int, Tuple[Union[str, bytes, int], Union[str, int]]]
if TYPE_CHECKING: # pragma: no cover
from .fields import Field
@@ -213,21 +196,6 @@ def constr(
return type('ConstrainedStrValue', (ConstrainedStr,), namespace)
class EmailStr(str):
@classmethod
def __get_validators__(cls) -> 'CallableGenerator':
# included here and below so the error happens straight away
if email_validator is None:
raise ImportError('email-validator is not installed, run `pip install pydantic[email]`')
yield str_validator
yield cls.validate
@classmethod
def validate(cls, value: str) -> str:
return validate_email(value)[1]
class StrictBool(int):
"""
StrictBool to allow for bools which are not type-coerced.
@@ -248,85 +216,6 @@ class StrictBool(int):
raise errors.StrictBoolError()
class UrlStr(str):
strip_whitespace = True
min_length = 1
max_length = 2 ** 16
schemes: Optional[Set[str]] = None
relative = False # whether to allow relative URLs
require_tld = True # whether to reject non-FQDN hostnames
@classmethod
def __get_validators__(cls) -> 'CallableGenerator':
yield not_none_validator
yield str_validator
yield constr_strip_whitespace
yield constr_length_validator
yield cls.validate
@classmethod
def validate(cls, value: str) -> str:
# Check first if the scheme is valid
schemes = cls.schemes or {'http', 'https', 'ftp', 'ftps'}
if '://' in value:
scheme = value.split('://')[0].lower()
if scheme not in schemes:
raise errors.UrlSchemeError(scheme=scheme)
regex = url_regex_generator(relative=cls.relative, require_tld=cls.require_tld)
if not regex.match(value):
raise errors.UrlRegexError()
return value
def urlstr(
*,
strip_whitespace: bool = True,
min_length: int = 1,
max_length: int = 2 ** 16,
relative: bool = False,
require_tld: bool = True,
schemes: Optional[Set[str]] = None,
) -> Type[str]:
# use kwargs then define conf in a dict to aid with IDE type hinting
namespace = dict(
strip_whitespace=strip_whitespace,
min_length=min_length,
max_length=max_length,
relative=relative,
require_tld=require_tld,
schemes=schemes,
)
return type('UrlStrValue', (UrlStr,), namespace)
class NameEmail:
__slots__ = 'name', 'email'
def __init__(self, name: str, email: str):
self.name = name
self.email = email
@classmethod
def __get_validators__(cls) -> 'CallableGenerator':
if email_validator is None:
raise ImportError('email-validator is not installed, run `pip install pydantic[email]`')
yield str_validator
yield cls.validate
@classmethod
def validate(cls, value: str) -> 'NameEmail':
return cls(*validate_email(value))
def __str__(self) -> str:
return f'{self.name} <{self.email}>'
def __repr__(self) -> str:
return f'<NameEmail("{self}")>'
class PyObject:
validate_always = True
@@ -351,28 +240,6 @@ class PyObject:
raise errors.PyObjectError(error_message=str(e))
class DSN(str):
prefix = 'db_'
fields = 'driver', 'user', 'password', 'host', 'port', 'name', 'query'
validate_always = True
@classmethod
def __get_validators__(cls) -> 'CallableGenerator':
yield str_validator
yield cls.validate
@classmethod
def validate(cls, value: str, values: Dict[str, Any]) -> str:
if value:
return value
kwargs = {f: values.get(cls.prefix + f) for f in cls.fields}
if kwargs['driver'] is None:
raise errors.DSNDriverIsEmptyError()
return make_dsn(**kwargs) # type: ignore
class ConstrainedNumberMeta(type):
def __new__(cls, name: str, bases: Any, dct: Dict[str, Any]) -> 'ConstrainedInt':
new_cls = cast('ConstrainedInt', type.__new__(cls, name, bases, dct))
@@ -585,56 +452,6 @@ class Json(metaclass=JsonMeta):
raise errors.JsonTypeError()
class IPvAnyAddress(_BaseAddress):
@classmethod
def __get_validators__(cls) -> 'CallableGenerator':
yield cls.validate
@classmethod
def validate(cls, value: Union[str, bytes, int]) -> Union[IPv4Address, IPv6Address]:
try:
return IPv4Address(value)
except ValueError:
pass
with change_exception(errors.IPvAnyAddressError, ValueError):
return IPv6Address(value)
class IPvAnyInterface(_BaseAddress):
@classmethod
def __get_validators__(cls) -> 'CallableGenerator':
yield cls.validate
@classmethod
def validate(cls, value: NetworkType) -> Union[IPv4Interface, IPv6Interface]:
try:
return IPv4Interface(value)
except ValueError:
pass
with change_exception(errors.IPvAnyInterfaceError, ValueError):
return IPv6Interface(value)
class IPvAnyNetwork(_BaseNetwork): # type: ignore
@classmethod
def __get_validators__(cls) -> 'CallableGenerator':
yield cls.validate
@classmethod
def validate(cls, value: NetworkType) -> Union[IPv4Network, IPv6Network]:
# Assume IP Network is defined with a default value for ``strict`` argument.
# Define your own class if you want to specify network address check strictness.
try:
return IPv4Network(value)
except ValueError:
pass
with change_exception(errors.IPvAnyNetworkError, ValueError):
return IPv6Network(value)
class SecretStr:
@classmethod
def __get_validators__(cls) -> 'CallableGenerator':
@@ -685,25 +502,3 @@ class SecretBytes:
def get_secret_value(self) -> bytes:
return self._secret_value
def constr_length_validator(v: 'StrBytes', field: 'Field', config: 'BaseConfig') -> 'StrBytes':
v_len = len(v)
min_length = field.type_.min_length or config.min_anystr_length # type: ignore
if min_length is not None and v_len < min_length:
raise errors.AnyStrMinLengthError(limit_value=min_length)
max_length = field.type_.max_length or config.max_anystr_length # type: ignore
if max_length is not None and v_len > max_length:
raise errors.AnyStrMaxLengthError(limit_value=max_length)
return v
def constr_strip_whitespace(v: 'StrBytes', field: 'Field', config: 'BaseConfig') -> 'StrBytes':
strip_whitespace = field.type_.strip_whitespace or config.anystr_strip_whitespace # type: ignore
if strip_whitespace:
v = v.strip()
return v
+1 -6
View File
@@ -71,7 +71,6 @@ __all__ = (
'AnyCallable',
'AnyType',
'display_as_type',
'lenient_issubclass',
'resolve_annotations',
'is_callable_type',
'is_literal_type',
@@ -100,7 +99,7 @@ def display_as_type(v: AnyType) -> str:
if not isinstance(v, typing_base) and not isinstance(v, type):
v = type(v)
if lenient_issubclass(v, Enum):
if isinstance(v, type) and issubclass(v, Enum):
if issubclass(v, int):
return 'int'
elif issubclass(v, str):
@@ -115,10 +114,6 @@ def display_as_type(v: AnyType) -> str:
return str(v)
def lenient_issubclass(cls: Any, class_or_tuple: Union[AnyType, Tuple[AnyType, ...]]) -> bool:
return isinstance(cls, type) and issubclass(cls, class_or_tuple)
def resolve_annotations(raw_annotations: Dict[str, AnyType], module_name: Optional[str]) -> Dict[str, AnyType]:
"""
Partially taken from typing.get_type_hints.
+6 -109
View File
@@ -1,97 +1,21 @@
import inspect
import re
from contextlib import contextmanager
from functools import lru_cache
from importlib import import_module
from typing import TYPE_CHECKING, Any, Dict, Generator, List, Optional, Pattern, Set, Tuple, Type, Union, no_type_check
from typing import TYPE_CHECKING, Any, Generator, List, Optional, Set, Tuple, Type, Union, no_type_check
from . import errors
from .typing import AnyType
try:
import email_validator
from typing_extensions import Literal
except ImportError:
email_validator = None
Literal = None # type: ignore
if TYPE_CHECKING: # pragma: no cover
from .main import BaseModel # noqa: F401
from .typing import SetIntStr, DictIntStrAny, IntStr # noqa: F401
PRETTY_REGEX = re.compile(r'([\w ]*?) *<(.*)> *')
def validate_email(value: str) -> Tuple[str, str]:
"""
Brutally simple email address validation. Note unlike most email address validation
* raw ip address (literal) domain parts are not allowed.
* "John Doe <local_part@domain.com>" style "pretty" email addresses are processed
* the local part check is extremely basic. This raises the possibility of unicode spoofing, but no better
solution is really possible.
* spaces are striped from the beginning and end of addresses but no error is raised
See RFC 5322 but treat it with suspicion, there seems to exist no universally acknowledged test for a valid email!
"""
if email_validator is None:
raise ImportError('email-validator is not installed, run `pip install pydantic[email]`')
m = PRETTY_REGEX.fullmatch(value)
name: Optional[str] = None
if m:
name, value = m.groups()
email = value.strip()
try:
email_validator.validate_email(email, check_deliverability=False)
except email_validator.EmailNotValidError as e:
raise errors.EmailError() from e
return name or email[: email.index('@')], email.lower()
def _rfc_1738_quote(text: str) -> str:
return re.sub(r'[:@/]', lambda m: '%{:X}'.format(ord(m.group(0))), text)
def make_dsn(
*,
driver: str,
user: str = None,
password: str = None,
host: str = None,
port: str = None,
name: str = None,
query: Dict[str, Any] = None,
) -> str:
"""
Create a DSN from from connection settings.
Stolen approximately from sqlalchemy/engine/url.py:URL.
"""
s = driver + '://'
if user is not None:
s += _rfc_1738_quote(user)
if password is not None:
s += ':' + _rfc_1738_quote(password)
s += '@'
if host is not None:
if ':' in host:
s += '[{}]'.format(host)
else:
s += host
if port is not None:
s += ':{}'.format(int(port))
if name is not None:
s += '/' + name
query = query or {}
if query:
keys = list(query)
keys.sort()
s += '?' + '&'.join('{}={}'.format(k, query[k]) for k in keys)
return s
def import_string(dotted_path: str) -> Any:
"""
Stolen approximately from django. Import a dotted module path and return the attribute/class designated by the
@@ -152,35 +76,8 @@ def validate_field_name(bases: List[Type['BaseModel']], field_name: str) -> None
)
@lru_cache(maxsize=None)
def url_regex_generator(*, relative: bool, require_tld: bool) -> Pattern[str]:
"""
Url regex generator taken from Marshmallow library,
for details please follow library source code:
https://github.com/marshmallow-code/marshmallow/blob/298870ef6c089fb4d91efae9ca4168453ffe00d2/marshmallow/validate.py#L37
"""
return re.compile(
r''.join(
(
r'^',
r'(' if relative else r'',
r'(?:[a-z0-9\.\-\+]*)://', # scheme is validated separately
r'(?:[^:@]+?:[^:@]*?@|)', # basic auth
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+',
r'(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|', # domain...
r'localhost|', # localhost...
(
r'(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.?)|' if not require_tld else r''
), # allow dotless hostnames
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|', # ...or ipv4
r'\[[A-F0-9]*:[A-F0-9:]+\])', # ...or ipv6
r'(?::\d+)?', # optional port
r')?' if relative else r'', # host is optional, allow for relative URLs
r'(?:/?|[/?]\S+)$',
)
),
re.IGNORECASE,
)
def lenient_issubclass(cls: Any, class_or_tuple: Union[AnyType, Tuple[AnyType, ...]]) -> bool:
return isinstance(cls, type) and issubclass(cls, class_or_tuple)
def in_ipython() -> bool:
+22
View File
@@ -370,6 +370,28 @@ def make_literal_validator(type_: Any) -> Callable[[Any], Any]:
return literal_validator
def constr_length_validator(v: 'StrBytes', field: 'Field', config: 'BaseConfig') -> 'StrBytes':
v_len = len(v)
min_length = field.type_.min_length or config.min_anystr_length # type: ignore
if min_length is not None and v_len < min_length:
raise errors.AnyStrMinLengthError(limit_value=min_length)
max_length = field.type_.max_length or config.max_anystr_length # type: ignore
if max_length is not None and v_len > max_length:
raise errors.AnyStrMaxLengthError(limit_value=max_length)
return v
def constr_strip_whitespace(v: 'StrBytes', field: 'Field', config: 'BaseConfig') -> 'StrBytes':
strip_whitespace = field.type_.strip_whitespace or config.anystr_strip_whitespace # type: ignore
if strip_whitespace:
v = v.strip()
return v
T = TypeVar('T')
+1
View File
@@ -10,6 +10,7 @@ max-line-length = 120
max-complexity = 14
inline-quotes = '
multiline-quotes = """
ignore = E203, W503
[bdist_wheel]
python-tag = py36.py37.py38
+4 -4
View File
@@ -156,12 +156,12 @@ class Foo(BaseModel):
def test_forward_ref_dataclass(create_module):
module = create_module(
"""
from pydantic import UrlStr
from pydantic import AnyUrl
from pydantic.dataclasses import dataclass
@dataclass
class Dataclass:
url: UrlStr
url: AnyUrl
"""
)
m = module.Dataclass('http://example.com ')
@@ -173,12 +173,12 @@ def test_forward_ref_dataclass_with_future_annotations(create_module):
module = create_module(
"""
from __future__ import annotations
from pydantic import UrlStr
from pydantic import AnyUrl
from pydantic.dataclasses import dataclass
@dataclass
class Dataclass:
url: UrlStr
url: AnyUrl
"""
)
m = module.Dataclass('http://example.com ')
+364
View File
@@ -0,0 +1,364 @@
import pytest
from pydantic import AnyUrl, BaseModel, HttpUrl, PostgresDsn, RedisDsn, ValidationError, stricturl
from pydantic.networks import validate_email
try:
import email_validator
except ImportError:
email_validator = None
@pytest.mark.parametrize(
'value',
[
'http://example.org',
'http://test',
'http://localhost',
'https://example.org/whatever/next/',
'postgres://user:pass@localhost:5432/app',
'postgres://just-user@localhost:5432/app',
'https://example.org',
'http://localhost',
'http://localhost/',
'http://localhost:8000',
'http://localhost:8000/',
'https://foo_bar.example.com/',
'ftp://example.org',
'ftps://example.org',
'http://example.co.jp',
'http://www.example.com/a%C2%B1b',
'http://www.example.com/~username/',
'http://info.example.com?fred',
'http://info.example.com/?fred',
'http://xn--mgbh0fb.xn--kgbechtv/',
'http://example.com/blue/red%3Fand+green',
'http://www.example.com/?array%5Bkey%5D=value',
'http://xn--rsum-bpad.example.org/',
'http://123.45.67.8/',
'http://123.45.67.8:8329/',
'http://[2001:db8::ff00:42]:8329',
'http://[2001::1]:8329',
'http://[2001:db8::1]/',
'http://www.example.com:8000/foo',
'http://www.cwi.nl:80/%7Eguido/Python.html',
'https://www.python.org/путь',
'http://андрей@example.com',
AnyUrl('https://example.com', scheme='https', host='example.com'),
'https://exam_ple.com/',
],
)
def test_any_url_success(value):
class Model(BaseModel):
v: AnyUrl
assert Model(v=value).v, value
@pytest.mark.parametrize(
'value,err_type,err_msg,err_ctx',
[
('http:///example.com/', 'value_error.url.host', 'URL host invalid', None),
('https:///example.com/', 'value_error.url.host', 'URL host invalid', None),
('http://.example.com:8000/foo', 'value_error.url.host', 'URL host invalid', None),
('https://example.org\\', 'value_error.url.host', 'URL host invalid', None),
('https://exampl$e.org', 'value_error.url.host', 'URL host invalid', None),
('http://??', 'value_error.url.host', 'URL host invalid', None),
('http://.', 'value_error.url.host', 'URL host invalid', None),
('http://..', 'value_error.url.host', 'URL host invalid', None),
(
'https://example.org more',
'value_error.url.extra',
"URL invalid, extra characters found after valid URL: ' more'",
{'extra': ' more'},
),
('$https://example.org', 'value_error.url.scheme', 'invalid or missing URL scheme', None),
('../icons/logo.gif', 'value_error.url.scheme', 'invalid or missing URL scheme', None),
('abc', 'value_error.url.scheme', 'invalid or missing URL scheme', None),
('..', 'value_error.url.scheme', 'invalid or missing URL scheme', None),
('/', 'value_error.url.scheme', 'invalid or missing URL scheme', None),
(' ', 'value_error.any_str.min_length', 'ensure this value has at least 1 characters', {'limit_value': 1}),
('', 'value_error.any_str.min_length', 'ensure this value has at least 1 characters', {'limit_value': 1}),
(None, 'type_error.none.not_allowed', 'none is not an allowed value', None),
(
'http://2001:db8::ff00:42:8329',
'value_error.url.extra',
"URL invalid, extra characters found after valid URL: ':db8::ff00:42:8329'",
{'extra': ':db8::ff00:42:8329'},
),
('http://[192.168.1.1]:8329', 'value_error.url.host', 'URL host invalid', None),
],
)
def test_any_url_invalid(value, err_type, err_msg, err_ctx):
class Model(BaseModel):
v: AnyUrl
with pytest.raises(ValidationError) as exc_info:
Model(v=value)
assert len(exc_info.value.errors()) == 1, exc_info.value.errors()
error = exc_info.value.errors()[0]
# debug(error)
assert error['type'] == err_type, value
assert error['msg'] == err_msg, value
assert error.get('ctx') == err_ctx, value
def test_any_url_obj():
class Model(BaseModel):
v: AnyUrl
url = Model(v='http://example.org').v
assert str(url) == 'http://example.org'
assert repr(url) == ("<AnyUrl('http://example.org' scheme='http' host='example.org' tld='org' host_type='domain')>")
assert url.scheme == 'http'
assert url.host == 'example.org'
assert url.tld == 'org'
assert url.host_type == 'domain'
assert url.port is None
assert url == AnyUrl('http://example.org', scheme='https', host='example.org')
url2 = Model(v='http://user:password@example.org:1234/the/path/?query=here#fragment=is;this=bit').v
assert str(url2) == 'http://user:password@example.org:1234/the/path/?query=here#fragment=is;this=bit'
assert repr(url2) == (
"<AnyUrl('http://user:password@example.org:1234/the/path/?query=here#fragment=is;this=bit' "
"scheme='http' user='user:password' host='example.org' tld='org' host_type='domain' port='1234' "
"path='/the/path/' query='query=here' fragment='fragment=is;this=bit')>"
)
assert url2.scheme == 'http'
assert url2.user == 'user:password'
assert url2.host == 'example.org'
assert url.host_type == 'domain'
assert url2.port == '1234'
assert url2.path == '/the/path/'
assert url2.query == 'query=here'
assert url2.fragment == 'fragment=is;this=bit'
url3 = Model(v='ftp://123.45.67.8:8329/').v
assert url3.scheme == 'ftp'
assert url3.host == '123.45.67.8'
assert url3.host_type == 'ipv4'
assert url3.port == '8329'
url4 = Model(v='wss://[2001:db8::ff00:42]:8329').v
assert url4.scheme == 'wss'
assert url4.host == '[2001:db8::ff00:42]'
assert url4.host_type == 'ipv6'
assert url4.port == '8329'
url5 = Model(v='https://£££.org').v
assert url5.host == 'xn--9aaa.org'
assert url5.host_type == 'int_domain'
assert str(url5) == 'https://xn--9aaa.org'
url = Model(v='http://example.co.uk').v
assert str(url) == 'http://example.co.uk'
assert url.scheme == 'http'
assert url.host == 'example.co.uk'
assert url.tld == 'uk' # wrong but no better solution
assert url.host_type == 'domain'
@pytest.mark.parametrize(
'value',
[
'http://example.org',
'http://example.org/foobar',
'http://example.org.',
'http://example.org./foobar',
'HTTP://EXAMPLE.ORG',
'https://example.org',
'https://example.org?a=1&b=2',
'https://example.org#a=3;b=3',
'https://foo_bar.example.com/',
'https://exam_ple.com/', # should perhaps fail? I think it's contrary to the RFC but chrome allows it
],
)
def test_http_url_success(value):
class Model(BaseModel):
v: HttpUrl
assert Model(v=value).v == value, value
@pytest.mark.parametrize(
'value,err_type,err_msg,err_ctx',
[
(
'ftp://example.com/',
'value_error.url.scheme',
'URL scheme not permitted',
{'allowed_schemes': {'https', 'http'}},
),
('http://foobar/', 'value_error.url.host', 'URL host invalid, top level domain required', None),
('http://localhost/', 'value_error.url.host', 'URL host invalid, top level domain required', None),
(
'x' * 2084,
'value_error.any_str.max_length',
'ensure this value has at most 2083 characters',
{'limit_value': 2083},
),
],
)
def test_http_url_invalid(value, err_type, err_msg, err_ctx):
class Model(BaseModel):
v: HttpUrl
with pytest.raises(ValidationError) as exc_info:
Model(v=value)
assert len(exc_info.value.errors()) == 1, exc_info.value.errors()
error = exc_info.value.errors()[0]
assert error['type'] == err_type, value
assert error['msg'] == err_msg, value
assert error.get('ctx') == err_ctx, value
@pytest.mark.parametrize(
'input,output',
[
(' https://www.example.com \n', 'https://www.example.com'),
(b'https://www.example.com', 'https://www.example.com'),
# https://www.xudongz.com/blog/2017/idn-phishing/ accepted but converted
('https://www.аррӏе.com/', 'https://www.xn--80ak6aa92e.com/'),
('https://exampl£e.org', 'https://xn--example-gia.org'),
],
)
def test_coerse_url(input, output):
class Model(BaseModel):
v: HttpUrl
assert Model(v=input).v == output
def test_postgres_dsns():
class Model(BaseModel):
a: PostgresDsn
assert Model(a='postgres://user:pass@localhost:5432/app').a == 'postgres://user:pass@localhost:5432/app'
assert Model(a='postgresql://user:pass@localhost:5432/app').a == 'postgresql://user:pass@localhost:5432/app'
with pytest.raises(ValidationError) as exc_info:
Model(a='http://example.org')
assert exc_info.value.errors()[0]['type'] == 'value_error.url.scheme'
def test_redis_dsns():
class Model(BaseModel):
a: RedisDsn
assert Model(a='redis://user:pass@localhost:5432/app').a == 'redis://user:pass@localhost:5432/app'
with pytest.raises(ValidationError) as exc_info:
Model(a='http://example.org')
assert exc_info.value.errors()[0]['type'] == 'value_error.url.scheme'
with pytest.raises(ValidationError) as exc_info:
Model(a='redis://localhost:5432/app')
error = exc_info.value.errors()[0]
assert error == {'loc': ('a',), 'msg': 'userinfo required in URL but missing', 'type': 'value_error.url.userinfo'}
def test_custom_schemes():
class Model(BaseModel):
v: stricturl(strip_whitespace=False, allowed_schemes={'ws', 'wss'})
assert Model(v='ws://example.org').v == 'ws://example.org'
with pytest.raises(ValidationError):
Model(v='http://example.org')
with pytest.raises(ValidationError):
Model(v='ws://example.org ')
@pytest.mark.parametrize(
'kwargs,expected',
[
(dict(scheme='ws', user='foo', host='example.net'), 'ws://foo@example.net'),
(dict(scheme='ws', user='foo', password='x', host='example.net'), 'ws://foo:x@example.net'),
(dict(scheme='ws', host='example.net', query='a=b', fragment='c=d'), 'ws://example.net?a=b#c=d'),
(dict(scheme='http', host='example.net', port='1234'), 'http://example.net:1234'),
],
)
def test_build_url(kwargs, expected):
assert AnyUrl(None, **kwargs) == expected
def test_son():
class Model(BaseModel):
v: HttpUrl
m = Model(v='http://foo@example.net')
assert m.json() == '{"v": "http://foo@example.net"}'
assert m.schema() == {
'title': 'Model',
'type': 'object',
'properties': {'v': {'title': 'V', 'minLength': 1, 'maxLength': 2083, 'type': 'string', 'format': 'uri'}},
'required': ['v'],
}
@pytest.mark.skipif(not email_validator, reason='email_validator not installed')
@pytest.mark.parametrize(
'value,name,email',
[
('foobar@example.com', 'foobar', 'foobar@example.com'),
('s@muelcolvin.com', 's', 's@muelcolvin.com'),
('Samuel Colvin <s@muelcolvin.com>', 'Samuel Colvin', 's@muelcolvin.com'),
('foobar <foobar@example.com>', 'foobar', 'foobar@example.com'),
(' foo.bar@example.com', 'foo.bar', 'foo.bar@example.com'),
('foo.bar@example.com ', 'foo.bar', 'foo.bar@example.com'),
('foo BAR <foobar@example.com >', 'foo BAR', 'foobar@example.com'),
('FOO bar <foobar@example.com> ', 'FOO bar', 'foobar@example.com'),
('<FOOBAR@example.com> ', 'FOOBAR', 'foobar@example.com'),
('ñoñó@example.com', 'ñoñó', 'ñoñó@example.com'),
('我買@example.com', '我買', '我買@example.com'),
('甲斐黒川日本@example.com', '甲斐黒川日本', '甲斐黒川日本@example.com'),
(
'чебурашкаящик-с-апельсинами.рф@example.com',
'чебурашкаящик-с-апельсинами.рф',
'чебурашкаящик-с-апельсинами.рф@example.com',
),
('उदाहरण.परीक्ष@domain.with.idn.tld', 'उदाहरण.परीक्ष', 'उदाहरण.परीक्ष@domain.with.idn.tld'),
('foo.bar@example.com', 'foo.bar', 'foo.bar@example.com'),
('foo.bar@exam-ple.com ', 'foo.bar', 'foo.bar@exam-ple.com'),
('ιωάννης@εεττ.gr', 'ιωάννης', 'ιωάννης@εεττ.gr'),
],
)
def test_address_valid(value, name, email):
assert validate_email(value) == (name, email)
@pytest.mark.skipif(not email_validator, reason='email_validator not installed')
@pytest.mark.parametrize(
'value',
[
'f oo.bar@example.com ',
'foo.bar@exam\nple.com ',
'foobar',
'foobar <foobar@example.com',
'@example.com',
'foobar@.example.com',
'foobar@.com',
'foo bar@example.com',
'foo@bar@example.com',
'\n@example.com',
'\r@example.com',
'\f@example.com',
' @example.com',
'\u0020@example.com',
'\u001f@example.com',
'"@example.com',
'\"@example.com',
',@example.com',
'foobar <foobar<@example.com>',
],
)
def test_address_invalid(value):
with pytest.raises(ValueError):
validate_email(value)
@pytest.mark.skipif(email_validator, reason='email_validator is installed')
def test_email_validator_not_installed():
with pytest.raises(ImportError):
validate_email('s@muelcolvin.com')
+4 -17
View File
@@ -4,6 +4,7 @@ import tempfile
from datetime import date, datetime, time, timedelta
from decimal import Decimal
from enum import Enum, IntEnum
from ipaddress import IPv4Address, IPv4Interface, IPv4Network, IPv6Address, IPv6Interface, IPv6Network
from pathlib import Path
from typing import Any, Callable, Dict, List, NewType, Optional, Set, Tuple, Union
from uuid import UUID
@@ -12,6 +13,7 @@ import pytest
from pydantic import BaseModel, Schema, ValidationError, validator
from pydantic.color import Color
from pydantic.networks import AnyUrl, EmailStr, IPvAnyAddress, IPvAnyInterface, IPvAnyNetwork, NameEmail, stricturl
from pydantic.schema import (
get_flat_models_from_model,
get_flat_models_from_models,
@@ -20,7 +22,6 @@ from pydantic.schema import (
schema,
)
from pydantic.types import (
DSN,
UUID1,
UUID3,
UUID4,
@@ -31,19 +32,8 @@ from pydantic.types import (
ConstrainedInt,
ConstrainedStr,
DirectoryPath,
EmailStr,
FilePath,
IPv4Address,
IPv4Interface,
IPv4Network,
IPv6Address,
IPv6Interface,
IPv6Network,
IPvAnyAddress,
IPvAnyInterface,
IPvAnyNetwork,
Json,
NameEmail,
NegativeFloat,
NegativeInt,
NoneBytes,
@@ -57,13 +47,11 @@ from pydantic.types import (
StrBytes,
StrictBool,
StrictStr,
UrlStr,
conbytes,
condecimal,
confloat,
conint,
constr,
urlstr,
)
from pydantic.typing import Literal
@@ -541,12 +529,11 @@ def test_str_constrained_types(field_type, expected_schema):
@pytest.mark.parametrize(
'field_type,expected_schema',
[
(UrlStr, {'title': 'A', 'type': 'string', 'format': 'uri', 'minLength': 1, 'maxLength': 2 ** 16}),
(AnyUrl, {'title': 'A', 'type': 'string', 'format': 'uri', 'minLength': 1, 'maxLength': 2 ** 16}),
(
urlstr(min_length=5, max_length=10),
stricturl(min_length=5, max_length=10),
{'title': 'A', 'type': 'string', 'format': 'uri', 'minLength': 5, 'maxLength': 10},
),
(DSN, {'title': 'A', 'type': 'string', 'format': 'dsn'}),
],
)
def test_special_str_types(field_type, expected_schema):
+1 -37
View File
@@ -6,13 +6,12 @@ from datetime import date, datetime, time, timedelta
from decimal import Decimal
from enum import Enum, IntEnum
from pathlib import Path
from typing import Dict, FrozenSet, Iterator, List, MutableSet, NewType, Optional, Pattern, Sequence, Set, Tuple
from typing import Dict, FrozenSet, Iterator, List, MutableSet, NewType, Pattern, Sequence, Set, Tuple
from uuid import UUID
import pytest
from pydantic import (
DSN,
UUID1,
UUID3,
UUID4,
@@ -206,41 +205,6 @@ def test_constrained_str_too_long():
]
class DsnModel(BaseModel):
db_name: Optional[str] = 'foobar'
db_user: Optional[str] = 'postgres'
db_password: str = None
db_host: Optional[str] = 'localhost'
db_port: Optional[str] = '5432'
db_driver: str = 'postgres'
db_query: dict = None
dsn: DSN = None
def test_dsn_compute():
m = DsnModel()
assert m.dsn == 'postgres://postgres@localhost:5432/foobar'
def test_dsn_define():
m = DsnModel(dsn='postgres://postgres@localhost:5432/different')
assert m.dsn == 'postgres://postgres@localhost:5432/different'
def test_dsn_pw_host():
m = DsnModel(db_password='pword', db_host='before:after', db_query={'v': 1})
assert m.dsn == 'postgres://postgres:pword@[before:after]:5432/foobar?v=1'
def test_dsn_no_driver():
with pytest.raises(ValidationError) as exc_info:
DsnModel(db_driver=None)
assert exc_info.value.errors() == [
{'loc': ('db_driver',), 'msg': 'none is not an allowed value', 'type': 'type_error.none.not_allowed'},
{'loc': ('dsn',), 'msg': '"driver" field may not be empty', 'type': 'value_error.dsn.driver_is_empty'},
]
def test_module_import():
class PyObjectModel(BaseModel):
module: PyObject = 'os.path'
-303
View File
@@ -1,303 +0,0 @@
import pytest
from pydantic import BaseModel, ValidationError, urlstr
@pytest.mark.parametrize(
'value',
[
'http://example.org',
'https://example.org',
'ftp://example.org',
'ftps://example.org',
'http://example.co.jp',
'http://www.example.com/a%C2%B1b',
'http://www.example.com/~username/',
'http://info.example.com/?fred',
'http://xn--mgbh0fb.xn--kgbechtv/',
'http://example.com/blue/red%3Fand+green',
'http://www.example.com/?array%5Bkey%5D=value',
'http://xn--rsum-bpad.example.org/',
'http://123.45.67.8/',
'http://123.45.67.8:8329/',
'http://[2001:db8::ff00:42]:8329',
'http://[2001::1]:8329',
'http://www.example.com:8000/foo',
],
)
def test_url_str_absolute_success(value):
class Model(BaseModel):
v: urlstr(relative=False)
assert Model(v=value).v == value
@pytest.mark.parametrize(
'value,errors',
[
(
'http:///example.com/',
[{'loc': ('v',), 'msg': 'url string does not match regex', 'type': 'value_error.url.regex'}],
),
(
'https:///example.com/',
[{'loc': ('v',), 'msg': 'url string does not match regex', 'type': 'value_error.url.regex'}],
),
(
'https://example.org\\',
[{'loc': ('v',), 'msg': 'url string does not match regex', 'type': 'value_error.url.regex'}],
),
(
'ftp:///example.com/',
[{'loc': ('v',), 'msg': 'url string does not match regex', 'type': 'value_error.url.regex'}],
),
(
'ftps:///example.com/',
[{'loc': ('v',), 'msg': 'url string does not match regex', 'type': 'value_error.url.regex'}],
),
(
'http//example.org',
[{'loc': ('v',), 'msg': 'url string does not match regex', 'type': 'value_error.url.regex'}],
),
('http:///', [{'loc': ('v',), 'msg': 'url string does not match regex', 'type': 'value_error.url.regex'}]),
(
'http:/example.org',
[{'loc': ('v',), 'msg': 'url string does not match regex', 'type': 'value_error.url.regex'}],
),
(
'foo://example.org',
[
{
'loc': ('v',),
'msg': 'url scheme "foo" is not allowed',
'type': 'value_error.url.scheme',
'ctx': {'scheme': 'foo'},
}
],
),
(
'../icons/logo.gif',
[{'loc': ('v',), 'msg': 'url string does not match regex', 'type': 'value_error.url.regex'}],
),
(
'http://2001:db8::ff00:42:8329',
[{'loc': ('v',), 'msg': 'url string does not match regex', 'type': 'value_error.url.regex'}],
),
(
'http://[192.168.1.1]:8329',
[{'loc': ('v',), 'msg': 'url string does not match regex', 'type': 'value_error.url.regex'}],
),
('abc', [{'loc': ('v',), 'msg': 'url string does not match regex', 'type': 'value_error.url.regex'}]),
('..', [{'loc': ('v',), 'msg': 'url string does not match regex', 'type': 'value_error.url.regex'}]),
('/', [{'loc': ('v',), 'msg': 'url string does not match regex', 'type': 'value_error.url.regex'}]),
(
' ',
[
{
'loc': ('v',),
'msg': 'ensure this value has at least 1 characters',
'type': 'value_error.any_str.min_length',
'ctx': {'limit_value': 1},
}
],
),
(
'',
[
{
'loc': ('v',),
'msg': 'ensure this value has at least 1 characters',
'type': 'value_error.any_str.min_length',
'ctx': {'limit_value': 1},
}
],
),
(None, [{'loc': ('v',), 'msg': 'none is not an allowed value', 'type': 'type_error.none.not_allowed'}]),
],
)
def test_url_str_absolute_fails(value, errors):
class Model(BaseModel):
v: urlstr(relative=False)
with pytest.raises(ValidationError) as exc_info:
Model(v=value)
assert exc_info.value.errors() == errors
@pytest.mark.parametrize(
'value',
[
'http://example.org',
'http://123.45.67.8/',
'http://example.com/foo/bar/../baz',
'https://example.com/../icons/logo.gif',
'http://example.com/./icons/logo.gif',
'ftp://example.com/../../../../g',
'http://example.com/g?y/./x',
],
)
def test_url_str_relative_success(value):
class Model(BaseModel):
v: urlstr(relative=True)
assert Model(v=value).v == value
@pytest.mark.parametrize(
'value,errors',
[
(
'http//example.org',
[{'loc': ('v',), 'msg': 'url string does not match regex', 'type': 'value_error.url.regex'}],
),
(
'suppliers.html',
[{'loc': ('v',), 'msg': 'url string does not match regex', 'type': 'value_error.url.regex'}],
),
(
'../icons/logo.gif',
[{'loc': ('v',), 'msg': 'url string does not match regex', 'type': 'value_error.url.regex'}],
),
(
'\\icons/logo.gif',
[{'loc': ('v',), 'msg': 'url string does not match regex', 'type': 'value_error.url.regex'}],
),
('../.../g', [{'loc': ('v',), 'msg': 'url string does not match regex', 'type': 'value_error.url.regex'}]),
('...', [{'loc': ('v',), 'msg': 'url string does not match regex', 'type': 'value_error.url.regex'}]),
('\\', [{'loc': ('v',), 'msg': 'url string does not match regex', 'type': 'value_error.url.regex'}]),
(
' ',
[
{
'loc': ('v',),
'msg': 'ensure this value has at least 1 characters',
'type': 'value_error.any_str.min_length',
'ctx': {'limit_value': 1},
}
],
),
(
'',
[
{
'loc': ('v',),
'msg': 'ensure this value has at least 1 characters',
'type': 'value_error.any_str.min_length',
'ctx': {'limit_value': 1},
}
],
),
(None, [{'loc': ('v',), 'msg': 'none is not an allowed value', 'type': 'type_error.none.not_allowed'}]),
],
)
def test_url_str_relative_fails(value, errors):
class Model(BaseModel):
v: urlstr(relative=True)
with pytest.raises(ValidationError) as exc_info:
Model(v=value)
assert exc_info.value.errors() == errors
@pytest.mark.parametrize(
'value',
[
'http://example.org',
'http://123.45.67.8/',
'http://example',
'http://example.',
'http://example:80',
'http://user.name:pass.word@example',
'http://example/foo/bar',
],
)
def test_url_str_dont_require_tld_success(value):
class Model(BaseModel):
v: urlstr(require_tld=False)
assert Model(v=value).v == value
@pytest.mark.parametrize(
'value,errors',
[
('http//example', [{'loc': ('v',), 'msg': 'url string does not match regex', 'type': 'value_error.url.regex'}]),
(
'http://.example.org',
[{'loc': ('v',), 'msg': 'url string does not match regex', 'type': 'value_error.url.regex'}],
),
(
'http:///foo/bar',
[{'loc': ('v',), 'msg': 'url string does not match regex', 'type': 'value_error.url.regex'}],
),
(
'http:// /foo/bar',
[{'loc': ('v',), 'msg': 'url string does not match regex', 'type': 'value_error.url.regex'}],
),
(
'',
[
{
'loc': ('v',),
'msg': 'ensure this value has at least 1 characters',
'type': 'value_error.any_str.min_length',
'ctx': {'limit_value': 1},
}
],
),
(None, [{'loc': ('v',), 'msg': 'none is not an allowed value', 'type': 'type_error.none.not_allowed'}]),
],
)
def test_url_str_dont_require_tld_fails(value, errors):
class Model(BaseModel):
v: urlstr(require_tld=False)
with pytest.raises(ValidationError) as exc_info:
Model(v=value)
assert exc_info.value.errors() == errors
def test_url_str_absolute_custom_scheme():
class Model(BaseModel):
v: urlstr(relative=False)
# By default, ws not allowed
url = 'ws://test.test'
with pytest.raises(ValidationError) as exc_info:
Model(v=url)
assert exc_info.value.errors() == [
{
'loc': ('v',),
'msg': 'url scheme "ws" is not allowed',
'type': 'value_error.url.scheme',
'ctx': {'scheme': 'ws'},
}
]
class Model(BaseModel):
v: urlstr(relative=False, schemes={'http', 'https', 'ws'})
assert Model(v=url).v == url
def test_url_str_relative_and_custom_schemes():
class Model(BaseModel):
v: urlstr(relative=True)
# By default, ws not allowed
url = 'ws://test.test'
with pytest.raises(ValidationError) as exc_info:
Model(v=url)
assert exc_info.value.errors() == [
{
'loc': ('v',),
'msg': 'url scheme "ws" is not allowed',
'type': 'value_error.url.scheme',
'ctx': {'scheme': 'ws'},
}
]
class Model(BaseModel):
v: urlstr(relative=True, schemes={'http', 'https', 'ws'})
assert Model(v=url).v == url
+2 -82
View File
@@ -4,88 +4,8 @@ from typing import NewType, Union
import pytest
from pydantic.typing import display_as_type, is_new_type, lenient_issubclass, new_type_supertype
from pydantic.utils import ValueItems, import_string, make_dsn, truncate, validate_email
try:
import email_validator
except ImportError:
email_validator = None
@pytest.mark.skipif(not email_validator, reason='email_validator not installed')
@pytest.mark.parametrize(
'value,name,email',
[
('foobar@example.com', 'foobar', 'foobar@example.com'),
('s@muelcolvin.com', 's', 's@muelcolvin.com'),
('Samuel Colvin <s@muelcolvin.com>', 'Samuel Colvin', 's@muelcolvin.com'),
('foobar <foobar@example.com>', 'foobar', 'foobar@example.com'),
(' foo.bar@example.com', 'foo.bar', 'foo.bar@example.com'),
('foo.bar@example.com ', 'foo.bar', 'foo.bar@example.com'),
('foo BAR <foobar@example.com >', 'foo BAR', 'foobar@example.com'),
('FOO bar <foobar@example.com> ', 'FOO bar', 'foobar@example.com'),
('<FOOBAR@example.com> ', 'FOOBAR', 'foobar@example.com'),
('ñoñó@example.com', 'ñoñó', 'ñoñó@example.com'),
('我買@example.com', '我買', '我買@example.com'),
('甲斐黒川日本@example.com', '甲斐黒川日本', '甲斐黒川日本@example.com'),
(
'чебурашкаящик-с-апельсинами.рф@example.com',
'чебурашкаящик-с-апельсинами.рф',
'чебурашкаящик-с-апельсинами.рф@example.com',
),
('उदाहरण.परीक्ष@domain.with.idn.tld', 'उदाहरण.परीक्ष', 'उदाहरण.परीक्ष@domain.with.idn.tld'),
('foo.bar@example.com', 'foo.bar', 'foo.bar@example.com'),
('foo.bar@exam-ple.com ', 'foo.bar', 'foo.bar@exam-ple.com'),
('ιωάννης@εεττ.gr', 'ιωάννης', 'ιωάννης@εεττ.gr'),
],
)
def test_address_valid(value, name, email):
assert validate_email(value) == (name, email)
@pytest.mark.skipif(not email_validator, reason='email_validator not installed')
@pytest.mark.parametrize(
'value',
[
'f oo.bar@example.com ',
'foo.bar@exam\nple.com ',
'foobar',
'foobar <foobar@example.com',
'@example.com',
'foobar@.example.com',
'foobar@.com',
'foo bar@example.com',
'foo@bar@example.com',
'\n@example.com',
'\r@example.com',
'\f@example.com',
' @example.com',
'\u0020@example.com',
'\u001f@example.com',
'"@example.com',
'\"@example.com',
',@example.com',
'foobar <foobar<@example.com>',
],
)
def test_address_invalid(value):
with pytest.raises(ValueError):
validate_email(value)
@pytest.mark.skipif(email_validator, reason='email_validator is installed')
def test_email_validator_not_installed():
with pytest.raises(ImportError):
validate_email('s@muelcolvin.com')
def test_empty_dsn():
assert make_dsn(driver='foobar') == 'foobar://'
def test_dsn_odd_user():
assert make_dsn(driver='foobar', user='foo@bar') == 'foobar://foo%40bar@'
from pydantic.typing import display_as_type, is_new_type, new_type_supertype
from pydantic.utils import ValueItems, import_string, lenient_issubclass, truncate
def test_import_module():