mirror of
https://github.com/kennethreitz/requests.git
synced 2026-06-05 22:50:18 +00:00
Added tests module.
This commit is contained in:
@@ -6,10 +6,10 @@ init:
|
||||
test:
|
||||
# This runs all of the tests. To run an individual test, run py.test with
|
||||
# the -k flag, like "py.test -k test_path_is_not_double_encoded"
|
||||
py.test test_requests.py
|
||||
py.test tests
|
||||
|
||||
coverage:
|
||||
py.test --verbose --cov-report term --cov=requests test_requests.py
|
||||
py.test --verbose --cov-report term --cov=requests tests
|
||||
|
||||
ci: init
|
||||
py.test --junitxml=junit.xml
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
# coding: utf-8
|
||||
@@ -0,0 +1,20 @@
|
||||
# coding: utf-8
|
||||
from requests.compat import is_py3
|
||||
|
||||
|
||||
try:
|
||||
import StringIO
|
||||
except ImportError:
|
||||
import io as StringIO
|
||||
|
||||
try:
|
||||
from cStringIO import StringIO as cStringIO
|
||||
except ImportError:
|
||||
cStringIO = None
|
||||
|
||||
if is_py3:
|
||||
def u(s):
|
||||
return s
|
||||
else:
|
||||
def u(s):
|
||||
return s.decode('unicode-escape')
|
||||
@@ -0,0 +1,23 @@
|
||||
# coding: utf-8
|
||||
import pytest
|
||||
from requests.compat import urljoin
|
||||
|
||||
|
||||
def prepare_url(value):
|
||||
# Issue #1483: Make sure the URL always has a trailing slash
|
||||
httpbin_url = value.url.rstrip('/') + '/'
|
||||
|
||||
def inner(*suffix):
|
||||
return urljoin(httpbin_url, '/'.join(suffix))
|
||||
|
||||
return inner
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def httpbin(httpbin):
|
||||
return prepare_url(httpbin)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def httpbin_secure(httpbin_secure):
|
||||
return prepare_url(httpbin_secure)
|
||||
@@ -16,7 +16,7 @@ import pytest
|
||||
from requests.adapters import HTTPAdapter
|
||||
from requests.auth import HTTPDigestAuth, _basic_auth_str
|
||||
from requests.compat import (
|
||||
Morsel, cookielib, getproxies, str, urljoin, urlparse, is_py3,
|
||||
Morsel, cookielib, getproxies, str, urlparse,
|
||||
builtin_str, OrderedDict)
|
||||
from requests.cookies import cookiejar_from_dict, morsel_to_cookie
|
||||
from requests.exceptions import (
|
||||
@@ -28,45 +28,7 @@ from requests.sessions import SessionRedirectMixin
|
||||
from requests.models import urlencode
|
||||
from requests.hooks import default_hooks
|
||||
|
||||
try:
|
||||
import StringIO
|
||||
except ImportError:
|
||||
import io as StringIO
|
||||
|
||||
try:
|
||||
from multiprocessing.pool import ThreadPool
|
||||
except ImportError:
|
||||
ThreadPool = None
|
||||
|
||||
if is_py3:
|
||||
def u(s):
|
||||
return s
|
||||
else:
|
||||
def u(s):
|
||||
return s.decode('unicode-escape')
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def httpbin(httpbin):
|
||||
# Issue #1483: Make sure the URL always has a trailing slash
|
||||
httpbin_url = httpbin.url.rstrip('/') + '/'
|
||||
|
||||
def inner(*suffix):
|
||||
return urljoin(httpbin_url, '/'.join(suffix))
|
||||
|
||||
return inner
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def httpsbin_url(httpbin_secure):
|
||||
# Issue #1483: Make sure the URL always has a trailing slash
|
||||
httpbin_url = httpbin_secure.url.rstrip('/') + '/'
|
||||
|
||||
def inner(*suffix):
|
||||
return urljoin(httpbin_url, '/'.join(suffix))
|
||||
|
||||
return inner
|
||||
|
||||
from .compat import StringIO, u
|
||||
|
||||
# Requests to this URL should always fail with a connection timeout (nothing
|
||||
# listening on that port)
|
||||
@@ -344,8 +306,8 @@ class TestRequests:
|
||||
r = s.get(url)
|
||||
assert r.status_code == 200
|
||||
|
||||
@pytest.mark.parametrize('url, exception',
|
||||
(
|
||||
@pytest.mark.parametrize(
|
||||
'url, exception', (
|
||||
# Connecting to an unknown domain should raise a ConnectionError
|
||||
('http://doesnotexist.google.com', ConnectionError),
|
||||
# Connecting to an invalid port should raise a ConnectionError
|
||||
@@ -536,8 +498,8 @@ class TestRequests:
|
||||
headers={str('Content-Type'): 'application/octet-stream'},
|
||||
data='\xff') # compat.str is unicode.
|
||||
|
||||
def test_pyopenssl_redirect(self, httpsbin_url, httpbin_ca_bundle):
|
||||
requests.get(httpsbin_url('status', '301'), verify=httpbin_ca_bundle)
|
||||
def test_pyopenssl_redirect(self, httpbin_secure, httpbin_ca_bundle):
|
||||
requests.get(httpbin_secure('status', '301'), verify=httpbin_ca_bundle)
|
||||
|
||||
def test_urlencoded_get_query_multivalued_param(self, httpbin):
|
||||
|
||||
@@ -578,13 +540,13 @@ class TestRequests:
|
||||
assert b'name="b\'stuff\'"' not in prep.body
|
||||
|
||||
def test_unicode_method_name(self, httpbin):
|
||||
files = {'file': open('test_requests.py', 'rb')}
|
||||
files = {'file': open(__file__, 'rb')}
|
||||
r = requests.request(
|
||||
method=u('POST'), url=httpbin('post'), files=files)
|
||||
assert r.status_code == 200
|
||||
|
||||
def test_unicode_method_name_with_request_object(self, httpbin):
|
||||
files = {'file': open('test_requests.py', 'rb')}
|
||||
files = {'file': open(__file__, 'rb')}
|
||||
s = requests.Session()
|
||||
req = requests.Request(u("POST"), httpbin('post'), files=files)
|
||||
prep = s.prepare_request(req)
|
||||
@@ -851,26 +813,6 @@ class TestRequests:
|
||||
assert r.request.url == pr.request.url
|
||||
assert r.request.headers == pr.request.headers
|
||||
|
||||
def test_get_auth_from_url(self):
|
||||
url = 'http://user:pass@complex.url.com/path?query=yes'
|
||||
assert ('user', 'pass') == requests.utils.get_auth_from_url(url)
|
||||
|
||||
def test_get_auth_from_url_encoded_spaces(self):
|
||||
url = 'http://user:pass%20pass@complex.url.com/path?query=yes'
|
||||
assert ('user', 'pass pass') == requests.utils.get_auth_from_url(url)
|
||||
|
||||
def test_get_auth_from_url_not_encoded_spaces(self):
|
||||
url = 'http://user:pass pass@complex.url.com/path?query=yes'
|
||||
assert ('user', 'pass pass') == requests.utils.get_auth_from_url(url)
|
||||
|
||||
def test_get_auth_from_url_percent_chars(self):
|
||||
url = 'http://user%25user:pass@complex.url.com/path?query=yes'
|
||||
assert ('user%user', 'pass') == requests.utils.get_auth_from_url(url)
|
||||
|
||||
def test_get_auth_from_url_encoded_hashes(self):
|
||||
url = 'http://user:pass%23pass@complex.url.com/path?query=yes'
|
||||
assert ('user', 'pass#pass') == requests.utils.get_auth_from_url(url)
|
||||
|
||||
def test_cannot_send_unprepared_requests(self, httpbin):
|
||||
r = requests.Request(url=httpbin())
|
||||
with pytest.raises(ValueError):
|
||||
@@ -986,23 +928,10 @@ class TestRequests:
|
||||
assert 'unicode' in p.headers.keys()
|
||||
assert 'byte' in p.headers.keys()
|
||||
|
||||
def test_can_send_nonstring_objects_with_files(self, httpbin):
|
||||
data = {'a': 0.0}
|
||||
files = {'b': 'foo'}
|
||||
r = requests.Request('POST', httpbin('post'), data=data, files=files)
|
||||
p = r.prepare()
|
||||
|
||||
assert 'multipart/form-data' in p.headers['Content-Type']
|
||||
|
||||
def test_can_send_bytes_bytearray_objects_with_files(self, httpbin):
|
||||
# Test bytes:
|
||||
@pytest.mark.parametrize('files', ('foo', b'foo', bytearray(b'foo')))
|
||||
def test_can_send_objects_with_files(self, httpbin, files):
|
||||
data = {'a': 'this is a string'}
|
||||
files = {'b': b'foo'}
|
||||
r = requests.Request('POST', httpbin('post'), data=data, files=files)
|
||||
p = r.prepare()
|
||||
assert 'multipart/form-data' in p.headers['Content-Type']
|
||||
# Test bytearrays:
|
||||
files = {'b': bytearray(b'foo')}
|
||||
files = {'b': files}
|
||||
r = requests.Request('POST', httpbin('post'), data=data, files=files)
|
||||
p = r.prepare()
|
||||
assert 'multipart/form-data' in p.headers['Content-Type']
|
||||
@@ -1145,43 +1074,10 @@ class TestRequests:
|
||||
assert len(list(r.iter_lines())) == 3
|
||||
|
||||
|
||||
class TestContentEncodingDetection:
|
||||
|
||||
def test_none(self):
|
||||
encodings = requests.utils.get_encodings_from_content('')
|
||||
assert not len(encodings)
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'content', (
|
||||
# HTML5 meta charset attribute
|
||||
'<meta charset="UTF-8">',
|
||||
# HTML4 pragma directive
|
||||
'<meta http-equiv="Content-type" content="text/html;charset=UTF-8">',
|
||||
# XHTML 1.x served with text/html MIME type
|
||||
'<meta http-equiv="Content-type" content="text/html;charset=UTF-8" />',
|
||||
# XHTML 1.x served as XML
|
||||
'<?xml version="1.0" encoding="UTF-8"?>',
|
||||
)
|
||||
)
|
||||
def test_pragmas(self, content):
|
||||
encodings = requests.utils.get_encodings_from_content(content)
|
||||
assert len(encodings) == 1
|
||||
assert encodings[0] == 'UTF-8'
|
||||
|
||||
def test_precedence(self):
|
||||
content = '''
|
||||
<?xml version="1.0" encoding="XML"?>
|
||||
<meta charset="HTML5">
|
||||
<meta http-equiv="Content-type" content="text/html;charset=HTML4" />
|
||||
'''.strip()
|
||||
encodings = requests.utils.get_encodings_from_content(content)
|
||||
assert encodings == ['HTML5', 'HTML4', 'XML']
|
||||
|
||||
|
||||
class TestCaseInsensitiveDict:
|
||||
|
||||
@pytest.mark.parametrize('cid',
|
||||
(
|
||||
@pytest.mark.parametrize(
|
||||
'cid', (
|
||||
CaseInsensitiveDict({'Foo': 'foo', 'BAr': 'bar'}),
|
||||
CaseInsensitiveDict([('Foo', 'foo'), ('BAr', 'bar')]),
|
||||
CaseInsensitiveDict(FOO='foo', BAr='bar'),
|
||||
@@ -1321,146 +1217,6 @@ class TestCaseInsensitiveDict:
|
||||
assert cid != cid_copy
|
||||
|
||||
|
||||
class TestUtils:
|
||||
|
||||
def test_super_len_io_streams(self):
|
||||
""" Ensures that we properly deal with different kinds of IO streams. """
|
||||
# uses StringIO or io.StringIO (see import above)
|
||||
from io import BytesIO
|
||||
from requests.utils import super_len
|
||||
|
||||
assert super_len(StringIO.StringIO()) == 0
|
||||
assert super_len(
|
||||
StringIO.StringIO('with so much drama in the LBC')) == 29
|
||||
|
||||
assert super_len(BytesIO()) == 0
|
||||
assert super_len(
|
||||
BytesIO(b"it's kinda hard bein' snoop d-o-double-g")) == 40
|
||||
|
||||
try:
|
||||
import cStringIO
|
||||
except ImportError:
|
||||
pass
|
||||
else:
|
||||
assert super_len(
|
||||
cStringIO.StringIO('but some how, some way...')) == 25
|
||||
|
||||
def test_super_len_correctly_calculates_len_of_partially_read_file(self):
|
||||
"""Ensure that we handle partially consumed file like objects."""
|
||||
from requests.utils import super_len
|
||||
s = StringIO.StringIO()
|
||||
s.write('foobarbogus')
|
||||
assert super_len(s) == 0
|
||||
|
||||
def test_get_environ_proxies_ip_ranges(self):
|
||||
"""Ensures that IP addresses are correctly matches with ranges
|
||||
in no_proxy variable."""
|
||||
from requests.utils import get_environ_proxies
|
||||
os.environ['no_proxy'] = "192.168.0.0/24,127.0.0.1,localhost.localdomain,172.16.1.1"
|
||||
assert get_environ_proxies('http://192.168.0.1:5000/') == {}
|
||||
assert get_environ_proxies('http://192.168.0.1/') == {}
|
||||
assert get_environ_proxies('http://172.16.1.1/') == {}
|
||||
assert get_environ_proxies('http://172.16.1.1:5000/') == {}
|
||||
assert get_environ_proxies('http://192.168.1.1:5000/') != {}
|
||||
assert get_environ_proxies('http://192.168.1.1/') != {}
|
||||
|
||||
def test_get_environ_proxies(self):
|
||||
"""Ensures that IP addresses are correctly matches with ranges
|
||||
in no_proxy variable."""
|
||||
from requests.utils import get_environ_proxies
|
||||
os.environ['no_proxy'] = "127.0.0.1,localhost.localdomain,192.168.0.0/24,172.16.1.1"
|
||||
assert get_environ_proxies(
|
||||
'http://localhost.localdomain:5000/v1.0/') == {}
|
||||
assert get_environ_proxies('http://www.requests.com/') != {}
|
||||
|
||||
def test_select_proxies(self):
|
||||
"""Make sure we can select per-host proxies correctly."""
|
||||
from requests.utils import select_proxy
|
||||
proxies = {'http': 'http://http.proxy',
|
||||
'http://some.host': 'http://some.host.proxy'}
|
||||
assert select_proxy('hTTp://u:p@Some.Host/path', proxies) == 'http://some.host.proxy'
|
||||
assert select_proxy('hTTp://u:p@Other.Host/path', proxies) == 'http://http.proxy'
|
||||
assert select_proxy('hTTps://Other.Host', proxies) is None
|
||||
|
||||
def test_guess_filename_when_int(self):
|
||||
from requests.utils import guess_filename
|
||||
assert None is guess_filename(1)
|
||||
|
||||
def test_guess_filename_when_filename_is_an_int(self):
|
||||
from requests.utils import guess_filename
|
||||
fake = type('Fake', (object,), {'name': 1})()
|
||||
assert None is guess_filename(fake)
|
||||
|
||||
def test_guess_filename_with_file_like_obj(self):
|
||||
from requests.utils import guess_filename
|
||||
from requests import compat
|
||||
fake = type('Fake', (object,), {'name': b'value'})()
|
||||
guessed_name = guess_filename(fake)
|
||||
assert b'value' == guessed_name
|
||||
assert isinstance(guessed_name, compat.bytes)
|
||||
|
||||
def test_guess_filename_with_unicode_name(self):
|
||||
from requests.utils import guess_filename
|
||||
from requests import compat
|
||||
filename = b'value'.decode('utf-8')
|
||||
fake = type('Fake', (object,), {'name': filename})()
|
||||
guessed_name = guess_filename(fake)
|
||||
assert filename == guessed_name
|
||||
assert isinstance(guessed_name, compat.str)
|
||||
|
||||
def test_is_ipv4_address(self):
|
||||
from requests.utils import is_ipv4_address
|
||||
assert is_ipv4_address('8.8.8.8')
|
||||
assert not is_ipv4_address('8.8.8.8.8')
|
||||
assert not is_ipv4_address('localhost.localdomain')
|
||||
|
||||
def test_is_valid_cidr(self):
|
||||
from requests.utils import is_valid_cidr
|
||||
assert not is_valid_cidr('8.8.8.8')
|
||||
assert is_valid_cidr('192.168.1.0/24')
|
||||
|
||||
def test_dotted_netmask(self):
|
||||
from requests.utils import dotted_netmask
|
||||
assert dotted_netmask(8) == '255.0.0.0'
|
||||
assert dotted_netmask(24) == '255.255.255.0'
|
||||
assert dotted_netmask(25) == '255.255.255.128'
|
||||
|
||||
def test_address_in_network(self):
|
||||
from requests.utils import address_in_network
|
||||
assert address_in_network('192.168.1.1', '192.168.1.0/24')
|
||||
assert not address_in_network('172.16.0.1', '192.168.1.0/24')
|
||||
|
||||
def test_get_auth_from_url(self):
|
||||
"""Ensures that username and password in well-encoded URI as per
|
||||
RFC 3986 are correctly extracted."""
|
||||
from requests.utils import get_auth_from_url
|
||||
from requests.compat import quote
|
||||
percent_encoding_test_chars = "%!*'();:@&=+$,/?#[] "
|
||||
url_address = "request.com/url.html#test"
|
||||
url = "http://" + quote(
|
||||
percent_encoding_test_chars, '') + ':' + quote(
|
||||
percent_encoding_test_chars, '') + '@' + url_address
|
||||
(username, password) = get_auth_from_url(url)
|
||||
assert username == percent_encoding_test_chars
|
||||
assert password == percent_encoding_test_chars
|
||||
|
||||
def test_requote_uri_with_unquoted_percents(self):
|
||||
"""Ensure we handle unquoted percent signs in redirects.
|
||||
|
||||
See: https://github.com/kennethreitz/requests/issues/2356
|
||||
"""
|
||||
from requests.utils import requote_uri
|
||||
bad_uri = 'http://example.com/fiz?buz=%ppicture'
|
||||
quoted = 'http://example.com/fiz?buz=%25ppicture'
|
||||
assert quoted == requote_uri(bad_uri)
|
||||
|
||||
def test_requote_uri_properly_requotes(self):
|
||||
"""Ensure requoting doesn't break expectations."""
|
||||
from requests.utils import requote_uri
|
||||
quoted = 'http://example.com/fiz?buz=%25ppicture'
|
||||
assert quoted == requote_uri(quoted)
|
||||
|
||||
|
||||
class TestMorselToCookieExpires:
|
||||
"""Tests for morsel_to_cookie when morsel contains expires."""
|
||||
|
||||
@@ -1472,8 +1228,8 @@ class TestMorselToCookieExpires:
|
||||
cookie = morsel_to_cookie(morsel)
|
||||
assert cookie.expires == 1
|
||||
|
||||
@pytest.mark.parametrize('value, exception',
|
||||
(
|
||||
@pytest.mark.parametrize(
|
||||
'value, exception', (
|
||||
(100, TypeError),
|
||||
('woops', ValueError),
|
||||
)
|
||||
@@ -1516,20 +1272,23 @@ class TestMorselToCookieMaxAge:
|
||||
|
||||
|
||||
class TestTimeout:
|
||||
|
||||
def test_stream_timeout(self, httpbin):
|
||||
try:
|
||||
requests.get(httpbin('delay/10'), timeout=2.0)
|
||||
except requests.exceptions.Timeout as e:
|
||||
assert 'Read timed out' in e.args[0].args[0]
|
||||
|
||||
def test_invalid_timeout(self, httpbin):
|
||||
@pytest.mark.parametrize(
|
||||
'timeout, error_text', (
|
||||
((3, 4, 5), '(connect, read)'),
|
||||
('foo', 'must be an int or float'),
|
||||
)
|
||||
)
|
||||
def test_invalid_timeout(self, httpbin, timeout, error_text):
|
||||
with pytest.raises(ValueError) as e:
|
||||
requests.get(httpbin('get'), timeout=(3, 4, 5))
|
||||
assert '(connect, read)' in str(e)
|
||||
|
||||
with pytest.raises(ValueError) as e:
|
||||
requests.get(httpbin('get'), timeout="foo")
|
||||
assert 'must be an int or float' in str(e)
|
||||
requests.get(httpbin('get'), timeout=timeout)
|
||||
assert error_text in str(e)
|
||||
|
||||
def test_none_timeout(self, httpbin):
|
||||
""" Check that you can set None as a valid timeout value.
|
||||
@@ -1606,7 +1365,13 @@ class RedirectSession(SessionRedirectMixin):
|
||||
return string
|
||||
|
||||
|
||||
class TestRedirects:
|
||||
def test_requests_are_updated_each_time(httpbin):
|
||||
session = RedirectSession([303, 307])
|
||||
prep = requests.Request('POST', httpbin('post')).prepare()
|
||||
r0 = session.send(prep)
|
||||
assert r0.request.method == 'POST'
|
||||
assert session.calls[-1] == SendCall((r0.request,), {})
|
||||
redirect_generator = session.resolve_redirects(r0, prep)
|
||||
default_keyword_args = {
|
||||
'stream': False,
|
||||
'verify': True,
|
||||
@@ -1615,85 +1380,62 @@ class TestRedirects:
|
||||
'allow_redirects': False,
|
||||
'proxies': {},
|
||||
}
|
||||
|
||||
def test_requests_are_updated_each_time(self, httpbin):
|
||||
session = RedirectSession([303, 307])
|
||||
prep = requests.Request('POST', httpbin('post')).prepare()
|
||||
r0 = session.send(prep)
|
||||
assert r0.request.method == 'POST'
|
||||
assert session.calls[-1] == SendCall((r0.request,), {})
|
||||
redirect_generator = session.resolve_redirects(r0, prep)
|
||||
for response in redirect_generator:
|
||||
assert response.request.method == 'GET'
|
||||
send_call = SendCall((response.request,),
|
||||
TestRedirects.default_keyword_args)
|
||||
assert session.calls[-1] == send_call
|
||||
for response in redirect_generator:
|
||||
assert response.request.method == 'GET'
|
||||
send_call = SendCall((response.request,), default_keyword_args)
|
||||
assert session.calls[-1] == send_call
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def list_of_tuples():
|
||||
return [
|
||||
@pytest.mark.parametrize(
|
||||
'data', (
|
||||
(('a', 'b'), ('c', 'd')),
|
||||
(('c', 'd'), ('a', 'b')),
|
||||
(('a', 'b'), ('c', 'd'), ('e', 'f')),
|
||||
]
|
||||
|
||||
|
||||
def test_data_argument_accepts_tuples(list_of_tuples):
|
||||
)
|
||||
)
|
||||
def test_data_argument_accepts_tuples(data):
|
||||
"""Ensure that the data argument will accept tuples of strings
|
||||
and properly encode them.
|
||||
"""
|
||||
for data in list_of_tuples:
|
||||
p = PreparedRequest()
|
||||
p.prepare(
|
||||
method='GET',
|
||||
url='http://www.example.com',
|
||||
data=data,
|
||||
hooks=default_hooks()
|
||||
)
|
||||
assert p.body == urlencode(data)
|
||||
|
||||
|
||||
def assert_copy(p, p_copy):
|
||||
for attr in ('method', 'url', 'headers', '_cookies', 'body', 'hooks'):
|
||||
assert getattr(p, attr) == getattr(p_copy, attr)
|
||||
|
||||
|
||||
def test_prepared_request_empty_copy():
|
||||
p = PreparedRequest()
|
||||
assert_copy(p, p.copy())
|
||||
|
||||
|
||||
def test_prepared_request_no_cookies_copy():
|
||||
p = PreparedRequest()
|
||||
p.prepare(
|
||||
method='GET',
|
||||
url='http://www.example.com',
|
||||
data='foo=bar',
|
||||
data=data,
|
||||
hooks=default_hooks()
|
||||
)
|
||||
assert_copy(p, p.copy())
|
||||
assert p.body == urlencode(data)
|
||||
|
||||
|
||||
def test_prepared_request_complete_copy():
|
||||
p = PreparedRequest()
|
||||
p.prepare(
|
||||
method='GET',
|
||||
url='http://www.example.com',
|
||||
data='foo=bar',
|
||||
hooks=default_hooks(),
|
||||
cookies={'foo': 'bar'}
|
||||
@pytest.mark.parametrize(
|
||||
'kwargs', (
|
||||
None,
|
||||
{
|
||||
'method': 'GET',
|
||||
'url': 'http://www.example.com',
|
||||
'data': 'foo=bar',
|
||||
'hooks': default_hooks()
|
||||
},
|
||||
{
|
||||
'method': 'GET',
|
||||
'url': 'http://www.example.com',
|
||||
'data': 'foo=bar',
|
||||
'hooks': default_hooks(),
|
||||
'cookies': {'foo': 'bar'}
|
||||
},
|
||||
{
|
||||
'method': 'GET',
|
||||
'url': u('http://www.example.com/üniçø∂é')
|
||||
},
|
||||
)
|
||||
assert_copy(p, p.copy())
|
||||
|
||||
|
||||
def test_prepare_unicode_url():
|
||||
)
|
||||
def test_prepared_copy(kwargs):
|
||||
p = PreparedRequest()
|
||||
p.prepare(
|
||||
method='GET',
|
||||
url=u('http://www.example.com/üniçø∂é'),
|
||||
)
|
||||
assert_copy(p, p.copy())
|
||||
if kwargs:
|
||||
p.prepare(**kwargs)
|
||||
copy = p.copy()
|
||||
for attr in ('method', 'url', 'headers', '_cookies', 'body', 'hooks'):
|
||||
assert getattr(p, attr) == getattr(copy, attr)
|
||||
|
||||
|
||||
def test_urllib3_retries(httpbin):
|
||||
@@ -0,0 +1,242 @@
|
||||
# coding: utf-8
|
||||
import os
|
||||
from io import BytesIO
|
||||
|
||||
import pytest
|
||||
from requests import compat
|
||||
from requests.utils import (
|
||||
address_in_network, dotted_netmask,
|
||||
get_auth_from_url, get_encodings_from_content,
|
||||
get_environ_proxies, guess_filename,
|
||||
is_ipv4_address, is_valid_cidr, requote_uri,
|
||||
select_proxy, super_len)
|
||||
|
||||
from .compat import StringIO, cStringIO
|
||||
|
||||
|
||||
class TestSuperLen:
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'stream, value', (
|
||||
(StringIO.StringIO, 'Test'),
|
||||
(BytesIO, b'Test'),
|
||||
pytest.mark.skipif('cStringIO is None')(
|
||||
(cStringIO, 'Test')
|
||||
),
|
||||
))
|
||||
def test_io_streams(self, stream, value):
|
||||
""" Ensures that we properly deal with different kinds of IO streams. """
|
||||
assert super_len(stream()) == 0
|
||||
assert super_len(stream(value)) == 4
|
||||
|
||||
def test_super_len_correctly_calculates_len_of_partially_read_file(self):
|
||||
"""Ensure that we handle partially consumed file like objects."""
|
||||
s = StringIO.StringIO()
|
||||
s.write('foobarbogus')
|
||||
assert super_len(s) == 0
|
||||
|
||||
|
||||
class TestGetEnvironProxies:
|
||||
"""Ensures that IP addresses are correctly matches with ranges
|
||||
in no_proxy variable."""
|
||||
|
||||
@pytest.yield_fixture(scope='class', autouse=True, params=['no_proxy', 'NO_PROXY'])
|
||||
def no_proxy(self, request):
|
||||
os.environ[request.param] = '192.168.0.0/24,127.0.0.1,localhost.localdomain,172.16.1.1'
|
||||
yield
|
||||
del os.environ[request.param]
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'url', (
|
||||
'http://192.168.0.1:5000/',
|
||||
'http://192.168.0.1/',
|
||||
'http://172.16.1.1/',
|
||||
'http://172.16.1.1:5000/',
|
||||
'http://localhost.localdomain:5000/v1.0/',
|
||||
)
|
||||
)
|
||||
def test_bypass(self, url):
|
||||
assert get_environ_proxies(url) == {}
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'url', (
|
||||
'http://192.168.1.1:5000/',
|
||||
'http://192.168.1.1/',
|
||||
'http://www.requests.com/',
|
||||
)
|
||||
)
|
||||
def test_not_bypass(self, url):
|
||||
assert get_environ_proxies(url) != {}
|
||||
|
||||
|
||||
class TestIsIPv4Address:
|
||||
|
||||
def test_valid(self):
|
||||
assert is_ipv4_address('8.8.8.8')
|
||||
|
||||
@pytest.mark.parametrize('value', ('8.8.8.8.8', 'localhost.localdomain'))
|
||||
def test_invalid(self, value):
|
||||
assert not is_ipv4_address(value)
|
||||
|
||||
|
||||
class TestIsValidCIDR:
|
||||
|
||||
def test_valid(self):
|
||||
assert is_valid_cidr('192.168.1.0/24')
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'value', (
|
||||
'8.8.8.8',
|
||||
'192.168.1.0/a',
|
||||
'192.168.1.0/128',
|
||||
'192.168.1.0/-1',
|
||||
'192.168.1.999/24',
|
||||
)
|
||||
)
|
||||
def test_invalid(self, value):
|
||||
assert not is_valid_cidr(value)
|
||||
|
||||
|
||||
class TestAddressInNetwork:
|
||||
|
||||
def test_valid(self):
|
||||
assert address_in_network('192.168.1.1', '192.168.1.0/24')
|
||||
|
||||
def test_invalid(self):
|
||||
assert not address_in_network('172.16.0.1', '192.168.1.0/24')
|
||||
|
||||
|
||||
class TestGuessFilename:
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'value', (1, type('Fake', (object,), {'name': 1})()),
|
||||
)
|
||||
def test_guess_filename_invalid(self, value):
|
||||
assert guess_filename(value) is None
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'value, expected_type', (
|
||||
(b'value', compat.bytes),
|
||||
(b'value'.decode('utf-8'), compat.str)
|
||||
)
|
||||
)
|
||||
def test_guess_filename_valid(self, value, expected_type):
|
||||
obj = type('Fake', (object,), {'name': value})()
|
||||
result = guess_filename(obj)
|
||||
assert result == value
|
||||
assert isinstance(result, expected_type)
|
||||
|
||||
|
||||
class TestContentEncodingDetection:
|
||||
|
||||
def test_none(self):
|
||||
encodings = get_encodings_from_content('')
|
||||
assert not len(encodings)
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'content', (
|
||||
# HTML5 meta charset attribute
|
||||
'<meta charset="UTF-8">',
|
||||
# HTML4 pragma directive
|
||||
'<meta http-equiv="Content-type" content="text/html;charset=UTF-8">',
|
||||
# XHTML 1.x served with text/html MIME type
|
||||
'<meta http-equiv="Content-type" content="text/html;charset=UTF-8" />',
|
||||
# XHTML 1.x served as XML
|
||||
'<?xml version="1.0" encoding="UTF-8"?>',
|
||||
)
|
||||
)
|
||||
def test_pragmas(self, content):
|
||||
encodings = get_encodings_from_content(content)
|
||||
assert len(encodings) == 1
|
||||
assert encodings[0] == 'UTF-8'
|
||||
|
||||
def test_precedence(self):
|
||||
content = '''
|
||||
<?xml version="1.0" encoding="XML"?>
|
||||
<meta charset="HTML5">
|
||||
<meta http-equiv="Content-type" content="text/html;charset=HTML4" />
|
||||
'''.strip()
|
||||
assert get_encodings_from_content(content) == ['HTML5', 'HTML4', 'XML']
|
||||
|
||||
|
||||
USER = PASSWORD = "%!*'();:@&=+$,/?#[] "
|
||||
ENCODED_USER = compat.quote(USER, '')
|
||||
ENCODED_PASSWORD = compat.quote(PASSWORD, '')
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'url, auth', (
|
||||
(
|
||||
'http://' + ENCODED_USER + ':' + ENCODED_PASSWORD + '@' +
|
||||
'request.com/url.html#test',
|
||||
(USER, PASSWORD)
|
||||
),
|
||||
(
|
||||
'http://user:pass@complex.url.com/path?query=yes',
|
||||
('user', 'pass')
|
||||
),
|
||||
(
|
||||
'http://user:pass%20pass@complex.url.com/path?query=yes',
|
||||
('user', 'pass pass')
|
||||
),
|
||||
(
|
||||
'http://user:pass pass@complex.url.com/path?query=yes',
|
||||
('user', 'pass pass')
|
||||
),
|
||||
(
|
||||
'http://user%25user:pass@complex.url.com/path?query=yes',
|
||||
('user%user', 'pass')
|
||||
),
|
||||
(
|
||||
'http://user:pass%23pass@complex.url.com/path?query=yes',
|
||||
('user', 'pass#pass')
|
||||
),
|
||||
)
|
||||
)
|
||||
def test_get_auth_from_url(url, auth):
|
||||
assert get_auth_from_url(url) == auth
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'uri, expected', (
|
||||
(
|
||||
# Ensure requoting doesn't break expectations
|
||||
'http://example.com/fiz?buz=%25ppicture',
|
||||
'http://example.com/fiz?buz=%25ppicture',
|
||||
),
|
||||
(
|
||||
# Ensure we handle unquoted percent signs in redirects
|
||||
'http://example.com/fiz?buz=%ppicture',
|
||||
'http://example.com/fiz?buz=%25ppicture',
|
||||
),
|
||||
)
|
||||
)
|
||||
def test_requote_uri_with_unquoted_percents(uri, expected):
|
||||
"""See: https://github.com/kennethreitz/requests/issues/2356
|
||||
"""
|
||||
assert requote_uri(uri) == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'mask, expected', (
|
||||
(8, '255.0.0.0'),
|
||||
(24, '255.255.255.0'),
|
||||
(25, '255.255.255.128'),
|
||||
)
|
||||
)
|
||||
def test_dotted_netmask(mask, expected):
|
||||
assert dotted_netmask(mask) == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'url, expected', (
|
||||
('hTTp://u:p@Some.Host/path', 'http://some.host.proxy'),
|
||||
('hTTp://u:p@Other.Host/path', 'http://http.proxy'),
|
||||
('hTTps://Other.Host', None),
|
||||
)
|
||||
)
|
||||
def test_select_proxies(url, expected):
|
||||
"""Make sure we can select per-host proxies correctly."""
|
||||
proxies = {'http': 'http://http.proxy',
|
||||
'http://some.host': 'http://some.host.proxy'}
|
||||
assert select_proxy(url, proxies) == expected
|
||||
Reference in New Issue
Block a user