mirror of
https://github.com/kennethreitz/requests.git
synced 2026-06-05 14:50:16 +00:00
926 lines
28 KiB
Python
926 lines
28 KiB
Python
import copy
|
|
import filecmp
|
|
import os
|
|
import tarfile
|
|
import zipfile
|
|
from collections import deque
|
|
from io import BytesIO
|
|
|
|
import pytest
|
|
|
|
from requests import compat
|
|
from requests._internal_utils import unicode_is_ascii
|
|
from requests.cookies import RequestsCookieJar
|
|
from requests.structures import CaseInsensitiveDict
|
|
from requests.utils import (
|
|
_parse_content_type_header,
|
|
add_dict_to_cookiejar,
|
|
address_in_network,
|
|
dotted_netmask,
|
|
extract_zipped_paths,
|
|
get_auth_from_url,
|
|
get_encoding_from_headers,
|
|
get_encodings_from_content,
|
|
get_environ_proxies,
|
|
guess_filename,
|
|
guess_json_utf,
|
|
is_ipv4_address,
|
|
is_valid_cidr,
|
|
iter_slices,
|
|
parse_dict_header,
|
|
parse_header_links,
|
|
prepend_scheme_if_needed,
|
|
requote_uri,
|
|
select_proxy,
|
|
set_environ,
|
|
should_bypass_proxies,
|
|
super_len,
|
|
to_key_val_list,
|
|
to_native_string,
|
|
unquote_header_value,
|
|
unquote_unreserved,
|
|
urldefragauth,
|
|
)
|
|
|
|
from .compat import StringIO, cStringIO
|
|
|
|
|
|
class TestSuperLen:
|
|
@pytest.mark.parametrize(
|
|
"stream, value",
|
|
(
|
|
(StringIO.StringIO, "Test"),
|
|
(BytesIO, b"Test"),
|
|
pytest.param(
|
|
cStringIO, "Test", marks=pytest.mark.skipif("cStringIO is None")
|
|
),
|
|
),
|
|
)
|
|
def test_io_streams(self, stream, value):
|
|
"""Ensures that we properly deal with different kinds of IO streams."""
|
|
assert super_len(stream()) == 0
|
|
assert super_len(stream(value)) == 4
|
|
|
|
def test_super_len_correctly_calculates_len_of_partially_read_file(self):
|
|
"""Ensure that we handle partially consumed file like objects."""
|
|
s = StringIO.StringIO()
|
|
s.write("foobarbogus")
|
|
assert super_len(s) == 0
|
|
|
|
@pytest.mark.parametrize("error", [IOError, OSError])
|
|
def test_super_len_handles_files_raising_weird_errors_in_tell(self, error):
|
|
"""If tell() raises errors, assume the cursor is at position zero."""
|
|
|
|
class BoomFile:
|
|
def __len__(self):
|
|
return 5
|
|
|
|
def tell(self):
|
|
raise error()
|
|
|
|
assert super_len(BoomFile()) == 0
|
|
|
|
@pytest.mark.parametrize("error", [IOError, OSError])
|
|
def test_super_len_tell_ioerror(self, error):
|
|
"""Ensure that if tell gives an IOError super_len doesn't fail"""
|
|
|
|
class NoLenBoomFile:
|
|
def tell(self):
|
|
raise error()
|
|
|
|
def seek(self, offset, whence):
|
|
pass
|
|
|
|
assert super_len(NoLenBoomFile()) == 0
|
|
|
|
def test_string(self):
|
|
assert super_len("Test") == 4
|
|
|
|
@pytest.mark.parametrize(
|
|
"mode, warnings_num",
|
|
(
|
|
("r", 1),
|
|
("rb", 0),
|
|
),
|
|
)
|
|
def test_file(self, tmpdir, mode, warnings_num, recwarn):
|
|
file_obj = tmpdir.join("test.txt")
|
|
file_obj.write("Test")
|
|
with file_obj.open(mode) as fd:
|
|
assert super_len(fd) == 4
|
|
assert len(recwarn) == warnings_num
|
|
|
|
def test_tarfile_member(self, tmpdir):
|
|
file_obj = tmpdir.join("test.txt")
|
|
file_obj.write("Test")
|
|
|
|
tar_obj = str(tmpdir.join("test.tar"))
|
|
with tarfile.open(tar_obj, "w") as tar:
|
|
tar.add(str(file_obj), arcname="test.txt")
|
|
|
|
with tarfile.open(tar_obj) as tar:
|
|
member = tar.extractfile("test.txt")
|
|
assert super_len(member) == 4
|
|
|
|
def test_super_len_with__len__(self):
|
|
foo = [1, 2, 3, 4]
|
|
len_foo = super_len(foo)
|
|
assert len_foo == 4
|
|
|
|
def test_super_len_with_no__len__(self):
|
|
class LenFile:
|
|
def __init__(self):
|
|
self.len = 5
|
|
|
|
assert super_len(LenFile()) == 5
|
|
|
|
def test_super_len_with_tell(self):
|
|
foo = StringIO.StringIO("12345")
|
|
assert super_len(foo) == 5
|
|
foo.read(2)
|
|
assert super_len(foo) == 3
|
|
|
|
def test_super_len_with_fileno(self):
|
|
with open(__file__, "rb") as f:
|
|
length = super_len(f)
|
|
file_data = f.read()
|
|
assert length == len(file_data)
|
|
|
|
def test_super_len_with_no_matches(self):
|
|
"""Ensure that objects without any length methods default to 0"""
|
|
assert super_len(object()) == 0
|
|
|
|
|
|
class TestToKeyValList:
|
|
@pytest.mark.parametrize(
|
|
"value, expected",
|
|
(
|
|
([("key", "val")], [("key", "val")]),
|
|
((("key", "val"),), [("key", "val")]),
|
|
({"key": "val"}, [("key", "val")]),
|
|
(None, None),
|
|
),
|
|
)
|
|
def test_valid(self, value, expected):
|
|
assert to_key_val_list(value) == expected
|
|
|
|
def test_invalid(self):
|
|
with pytest.raises(ValueError):
|
|
to_key_val_list("string")
|
|
|
|
|
|
class TestUnquoteHeaderValue:
|
|
@pytest.mark.parametrize(
|
|
"value, expected",
|
|
(
|
|
(None, None),
|
|
("Test", "Test"),
|
|
('"Test"', "Test"),
|
|
('"Test\\\\"', "Test\\"),
|
|
('"\\\\Comp\\Res"', "\\Comp\\Res"),
|
|
),
|
|
)
|
|
def test_valid(self, value, expected):
|
|
assert unquote_header_value(value) == expected
|
|
|
|
def test_is_filename(self):
|
|
assert unquote_header_value('"\\\\Comp\\Res"', True) == "\\\\Comp\\Res"
|
|
|
|
|
|
class TestGetEnvironProxies:
|
|
"""Ensures that IP addresses are correctly matches with ranges
|
|
in no_proxy variable.
|
|
"""
|
|
|
|
@pytest.fixture(autouse=True, params=["no_proxy", "NO_PROXY"])
|
|
def no_proxy(self, request, monkeypatch):
|
|
monkeypatch.setenv(
|
|
request.param, "192.168.0.0/24,127.0.0.1,localhost.localdomain,172.16.1.1"
|
|
)
|
|
|
|
@pytest.mark.parametrize(
|
|
"url",
|
|
(
|
|
"http://192.168.0.1:5000/",
|
|
"http://192.168.0.1/",
|
|
"http://172.16.1.1/",
|
|
"http://172.16.1.1:5000/",
|
|
"http://localhost.localdomain:5000/v1.0/",
|
|
),
|
|
)
|
|
def test_bypass(self, url):
|
|
assert get_environ_proxies(url, no_proxy=None) == {}
|
|
|
|
@pytest.mark.parametrize(
|
|
"url",
|
|
(
|
|
"http://192.168.1.1:5000/",
|
|
"http://192.168.1.1/",
|
|
"http://www.requests.com/",
|
|
),
|
|
)
|
|
def test_not_bypass(self, url):
|
|
assert get_environ_proxies(url, no_proxy=None) != {}
|
|
|
|
@pytest.mark.parametrize(
|
|
"url",
|
|
(
|
|
"http://192.168.1.1:5000/",
|
|
"http://192.168.1.1/",
|
|
"http://www.requests.com/",
|
|
),
|
|
)
|
|
def test_bypass_no_proxy_keyword(self, url):
|
|
no_proxy = "192.168.1.1,requests.com"
|
|
assert get_environ_proxies(url, no_proxy=no_proxy) == {}
|
|
|
|
@pytest.mark.parametrize(
|
|
"url",
|
|
(
|
|
"http://192.168.0.1:5000/",
|
|
"http://192.168.0.1/",
|
|
"http://172.16.1.1/",
|
|
"http://172.16.1.1:5000/",
|
|
"http://localhost.localdomain:5000/v1.0/",
|
|
),
|
|
)
|
|
def test_not_bypass_no_proxy_keyword(self, url, monkeypatch):
|
|
# This is testing that the 'no_proxy' argument overrides the
|
|
# environment variable 'no_proxy'
|
|
monkeypatch.setenv("http_proxy", "http://proxy.example.com:3128/")
|
|
no_proxy = "192.168.1.1,requests.com"
|
|
assert get_environ_proxies(url, no_proxy=no_proxy) != {}
|
|
|
|
|
|
class TestIsIPv4Address:
|
|
def test_valid(self):
|
|
assert is_ipv4_address("8.8.8.8")
|
|
|
|
@pytest.mark.parametrize("value", ("8.8.8.8.8", "localhost.localdomain"))
|
|
def test_invalid(self, value):
|
|
assert not is_ipv4_address(value)
|
|
|
|
|
|
class TestIsValidCIDR:
|
|
def test_valid(self):
|
|
assert is_valid_cidr("192.168.1.0/24")
|
|
|
|
@pytest.mark.parametrize(
|
|
"value",
|
|
(
|
|
"8.8.8.8",
|
|
"192.168.1.0/a",
|
|
"192.168.1.0/128",
|
|
"192.168.1.0/-1",
|
|
"192.168.1.999/24",
|
|
),
|
|
)
|
|
def test_invalid(self, value):
|
|
assert not is_valid_cidr(value)
|
|
|
|
|
|
class TestAddressInNetwork:
|
|
def test_valid(self):
|
|
assert address_in_network("192.168.1.1", "192.168.1.0/24")
|
|
|
|
def test_invalid(self):
|
|
assert not address_in_network("172.16.0.1", "192.168.1.0/24")
|
|
|
|
|
|
class TestGuessFilename:
|
|
@pytest.mark.parametrize(
|
|
"value",
|
|
(1, type("Fake", (object,), {"name": 1})()),
|
|
)
|
|
def test_guess_filename_invalid(self, value):
|
|
assert guess_filename(value) is None
|
|
|
|
@pytest.mark.parametrize(
|
|
"value, expected_type",
|
|
(
|
|
(b"value", compat.bytes),
|
|
(b"value".decode("utf-8"), compat.str),
|
|
),
|
|
)
|
|
def test_guess_filename_valid(self, value, expected_type):
|
|
obj = type("Fake", (object,), {"name": value})()
|
|
result = guess_filename(obj)
|
|
assert result == value
|
|
assert isinstance(result, expected_type)
|
|
|
|
|
|
class TestExtractZippedPaths:
|
|
@pytest.mark.parametrize(
|
|
"path",
|
|
(
|
|
"/",
|
|
__file__,
|
|
pytest.__file__,
|
|
"/etc/invalid/location",
|
|
),
|
|
)
|
|
def test_unzipped_paths_unchanged(self, path):
|
|
assert path == extract_zipped_paths(path)
|
|
|
|
def test_zipped_paths_extracted(self, tmpdir):
|
|
zipped_py = tmpdir.join("test.zip")
|
|
with zipfile.ZipFile(zipped_py.strpath, "w") as f:
|
|
f.write(__file__)
|
|
|
|
_, name = os.path.splitdrive(__file__)
|
|
zipped_path = os.path.join(zipped_py.strpath, name.lstrip(r"\/"))
|
|
extracted_path = extract_zipped_paths(zipped_path)
|
|
|
|
assert extracted_path != zipped_path
|
|
assert os.path.exists(extracted_path)
|
|
assert filecmp.cmp(extracted_path, __file__)
|
|
|
|
def test_invalid_unc_path(self):
|
|
path = r"\\localhost\invalid\location"
|
|
assert extract_zipped_paths(path) == path
|
|
|
|
|
|
class TestContentEncodingDetection:
|
|
def test_none(self):
|
|
encodings = get_encodings_from_content("")
|
|
assert not len(encodings)
|
|
|
|
@pytest.mark.parametrize(
|
|
"content",
|
|
(
|
|
# HTML5 meta charset attribute
|
|
'<meta charset="UTF-8">',
|
|
# HTML4 pragma directive
|
|
'<meta http-equiv="Content-type" content="text/html;charset=UTF-8">',
|
|
# XHTML 1.x served with text/html MIME type
|
|
'<meta http-equiv="Content-type" content="text/html;charset=UTF-8" />',
|
|
# XHTML 1.x served as XML
|
|
'<?xml version="1.0" encoding="UTF-8"?>',
|
|
),
|
|
)
|
|
def test_pragmas(self, content):
|
|
encodings = get_encodings_from_content(content)
|
|
assert len(encodings) == 1
|
|
assert encodings[0] == "UTF-8"
|
|
|
|
def test_precedence(self):
|
|
content = """
|
|
<?xml version="1.0" encoding="XML"?>
|
|
<meta charset="HTML5">
|
|
<meta http-equiv="Content-type" content="text/html;charset=HTML4" />
|
|
""".strip()
|
|
assert get_encodings_from_content(content) == ["HTML5", "HTML4", "XML"]
|
|
|
|
|
|
class TestGuessJSONUTF:
|
|
@pytest.mark.parametrize(
|
|
"encoding",
|
|
(
|
|
"utf-32",
|
|
"utf-8-sig",
|
|
"utf-16",
|
|
"utf-8",
|
|
"utf-16-be",
|
|
"utf-16-le",
|
|
"utf-32-be",
|
|
"utf-32-le",
|
|
),
|
|
)
|
|
def test_encoded(self, encoding):
|
|
data = "{}".encode(encoding)
|
|
assert guess_json_utf(data) == encoding
|
|
|
|
def test_bad_utf_like_encoding(self):
|
|
assert guess_json_utf(b"\x00\x00\x00\x00") is None
|
|
|
|
@pytest.mark.parametrize(
|
|
("encoding", "expected"),
|
|
(
|
|
("utf-16-be", "utf-16"),
|
|
("utf-16-le", "utf-16"),
|
|
("utf-32-be", "utf-32"),
|
|
("utf-32-le", "utf-32"),
|
|
),
|
|
)
|
|
def test_guess_by_bom(self, encoding, expected):
|
|
data = "\ufeff{}".encode(encoding)
|
|
assert guess_json_utf(data) == expected
|
|
|
|
|
|
USER = PASSWORD = "%!*'();:@&=+$,/?#[] "
|
|
ENCODED_USER = compat.quote(USER, "")
|
|
ENCODED_PASSWORD = compat.quote(PASSWORD, "")
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"url, auth",
|
|
(
|
|
(
|
|
f"http://{ENCODED_USER}:{ENCODED_PASSWORD}@request.com/url.html#test",
|
|
(USER, PASSWORD),
|
|
),
|
|
("http://user:pass@complex.url.com/path?query=yes", ("user", "pass")),
|
|
(
|
|
"http://user:pass%20pass@complex.url.com/path?query=yes",
|
|
("user", "pass pass"),
|
|
),
|
|
("http://user:pass pass@complex.url.com/path?query=yes", ("user", "pass pass")),
|
|
(
|
|
"http://user%25user:pass@complex.url.com/path?query=yes",
|
|
("user%user", "pass"),
|
|
),
|
|
(
|
|
"http://user:pass%23pass@complex.url.com/path?query=yes",
|
|
("user", "pass#pass"),
|
|
),
|
|
("http://complex.url.com/path?query=yes", ("", "")),
|
|
),
|
|
)
|
|
def test_get_auth_from_url(url, auth):
|
|
assert get_auth_from_url(url) == auth
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"uri, expected",
|
|
(
|
|
(
|
|
# Ensure requoting doesn't break expectations
|
|
"http://example.com/fiz?buz=%25ppicture",
|
|
"http://example.com/fiz?buz=%25ppicture",
|
|
),
|
|
(
|
|
# Ensure we handle unquoted percent signs in redirects
|
|
"http://example.com/fiz?buz=%ppicture",
|
|
"http://example.com/fiz?buz=%25ppicture",
|
|
),
|
|
),
|
|
)
|
|
def test_requote_uri_with_unquoted_percents(uri, expected):
|
|
"""See: https://github.com/psf/requests/issues/2356"""
|
|
assert requote_uri(uri) == expected
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"uri, expected",
|
|
(
|
|
(
|
|
# Illegal bytes
|
|
"http://example.com/?a=%--",
|
|
"http://example.com/?a=%--",
|
|
),
|
|
(
|
|
# Reserved characters
|
|
"http://example.com/?a=%300",
|
|
"http://example.com/?a=00",
|
|
),
|
|
),
|
|
)
|
|
def test_unquote_unreserved(uri, expected):
|
|
assert unquote_unreserved(uri) == expected
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"mask, expected",
|
|
(
|
|
(8, "255.0.0.0"),
|
|
(24, "255.255.255.0"),
|
|
(25, "255.255.255.128"),
|
|
),
|
|
)
|
|
def test_dotted_netmask(mask, expected):
|
|
assert dotted_netmask(mask) == expected
|
|
|
|
|
|
http_proxies = {
|
|
"http": "http://http.proxy",
|
|
"http://some.host": "http://some.host.proxy",
|
|
}
|
|
all_proxies = {
|
|
"all": "socks5://http.proxy",
|
|
"all://some.host": "socks5://some.host.proxy",
|
|
}
|
|
mixed_proxies = {
|
|
"http": "http://http.proxy",
|
|
"http://some.host": "http://some.host.proxy",
|
|
"all": "socks5://http.proxy",
|
|
}
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"url, expected, proxies",
|
|
(
|
|
("hTTp://u:p@Some.Host/path", "http://some.host.proxy", http_proxies),
|
|
("hTTp://u:p@Other.Host/path", "http://http.proxy", http_proxies),
|
|
("hTTp:///path", "http://http.proxy", http_proxies),
|
|
("hTTps://Other.Host", None, http_proxies),
|
|
("file:///etc/motd", None, http_proxies),
|
|
("hTTp://u:p@Some.Host/path", "socks5://some.host.proxy", all_proxies),
|
|
("hTTp://u:p@Other.Host/path", "socks5://http.proxy", all_proxies),
|
|
("hTTp:///path", "socks5://http.proxy", all_proxies),
|
|
("hTTps://Other.Host", "socks5://http.proxy", all_proxies),
|
|
("http://u:p@other.host/path", "http://http.proxy", mixed_proxies),
|
|
("http://u:p@some.host/path", "http://some.host.proxy", mixed_proxies),
|
|
("https://u:p@other.host/path", "socks5://http.proxy", mixed_proxies),
|
|
("https://u:p@some.host/path", "socks5://http.proxy", mixed_proxies),
|
|
("https://", "socks5://http.proxy", mixed_proxies),
|
|
# XXX: unsure whether this is reasonable behavior
|
|
("file:///etc/motd", "socks5://http.proxy", all_proxies),
|
|
),
|
|
)
|
|
def test_select_proxies(url, expected, proxies):
|
|
"""Make sure we can select per-host proxies correctly."""
|
|
assert select_proxy(url, proxies) == expected
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"value, expected",
|
|
(
|
|
('foo="is a fish", bar="as well"', {"foo": "is a fish", "bar": "as well"}),
|
|
("key_without_value", {"key_without_value": None}),
|
|
),
|
|
)
|
|
def test_parse_dict_header(value, expected):
|
|
assert parse_dict_header(value) == expected
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"value, expected",
|
|
(
|
|
("application/xml", ("application/xml", {})),
|
|
(
|
|
"application/json ; charset=utf-8",
|
|
("application/json", {"charset": "utf-8"}),
|
|
),
|
|
(
|
|
"application/json ; Charset=utf-8",
|
|
("application/json", {"charset": "utf-8"}),
|
|
),
|
|
("text/plain", ("text/plain", {})),
|
|
(
|
|
"multipart/form-data; boundary = something ; boundary2='something_else' ; no_equals ",
|
|
(
|
|
"multipart/form-data",
|
|
{
|
|
"boundary": "something",
|
|
"boundary2": "something_else",
|
|
"no_equals": True,
|
|
},
|
|
),
|
|
),
|
|
(
|
|
'multipart/form-data; boundary = something ; boundary2="something_else" ; no_equals ',
|
|
(
|
|
"multipart/form-data",
|
|
{
|
|
"boundary": "something",
|
|
"boundary2": "something_else",
|
|
"no_equals": True,
|
|
},
|
|
),
|
|
),
|
|
(
|
|
"multipart/form-data; boundary = something ; 'boundary2=something_else' ; no_equals ",
|
|
(
|
|
"multipart/form-data",
|
|
{
|
|
"boundary": "something",
|
|
"boundary2": "something_else",
|
|
"no_equals": True,
|
|
},
|
|
),
|
|
),
|
|
(
|
|
'multipart/form-data; boundary = something ; "boundary2=something_else" ; no_equals ',
|
|
(
|
|
"multipart/form-data",
|
|
{
|
|
"boundary": "something",
|
|
"boundary2": "something_else",
|
|
"no_equals": True,
|
|
},
|
|
),
|
|
),
|
|
("application/json ; ; ", ("application/json", {})),
|
|
),
|
|
)
|
|
def test__parse_content_type_header(value, expected):
|
|
assert _parse_content_type_header(value) == expected
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"value, expected",
|
|
(
|
|
(CaseInsensitiveDict(), None),
|
|
(
|
|
CaseInsensitiveDict({"content-type": "application/json; charset=utf-8"}),
|
|
"utf-8",
|
|
),
|
|
(CaseInsensitiveDict({"content-type": "text/plain"}), "ISO-8859-1"),
|
|
),
|
|
)
|
|
def test_get_encoding_from_headers(value, expected):
|
|
assert get_encoding_from_headers(value) == expected
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"value, length",
|
|
(
|
|
("", 0),
|
|
("T", 1),
|
|
("Test", 4),
|
|
("Cont", 0),
|
|
("Other", -5),
|
|
("Content", None),
|
|
),
|
|
)
|
|
def test_iter_slices(value, length):
|
|
if length is None or (length <= 0 and len(value) > 0):
|
|
# Reads all content at once
|
|
assert len(list(iter_slices(value, length))) == 1
|
|
else:
|
|
assert len(list(iter_slices(value, 1))) == length
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"value, expected",
|
|
(
|
|
(
|
|
'<http:/.../front.jpeg>; rel=front; type="image/jpeg"',
|
|
[{"url": "http:/.../front.jpeg", "rel": "front", "type": "image/jpeg"}],
|
|
),
|
|
("<http:/.../front.jpeg>", [{"url": "http:/.../front.jpeg"}]),
|
|
("<http:/.../front.jpeg>;", [{"url": "http:/.../front.jpeg"}]),
|
|
(
|
|
'<http:/.../front.jpeg>; type="image/jpeg",<http://.../back.jpeg>;',
|
|
[
|
|
{"url": "http:/.../front.jpeg", "type": "image/jpeg"},
|
|
{"url": "http://.../back.jpeg"},
|
|
],
|
|
),
|
|
("", []),
|
|
),
|
|
)
|
|
def test_parse_header_links(value, expected):
|
|
assert parse_header_links(value) == expected
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"value, expected",
|
|
(
|
|
("example.com/path", "http://example.com/path"),
|
|
("//example.com/path", "http://example.com/path"),
|
|
("example.com:80", "http://example.com:80"),
|
|
(
|
|
"http://user:pass@example.com/path?query",
|
|
"http://user:pass@example.com/path?query",
|
|
),
|
|
("http://user@example.com/path?query", "http://user@example.com/path?query"),
|
|
),
|
|
)
|
|
def test_prepend_scheme_if_needed(value, expected):
|
|
assert prepend_scheme_if_needed(value, "http") == expected
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"value, expected",
|
|
(
|
|
("T", "T"),
|
|
(b"T", "T"),
|
|
("T", "T"),
|
|
),
|
|
)
|
|
def test_to_native_string(value, expected):
|
|
assert to_native_string(value) == expected
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"url, expected",
|
|
(
|
|
("http://u:p@example.com/path?a=1#test", "http://example.com/path?a=1"),
|
|
("http://example.com/path", "http://example.com/path"),
|
|
("//u:p@example.com/path", "//example.com/path"),
|
|
("//example.com/path", "//example.com/path"),
|
|
("example.com/path", "//example.com/path"),
|
|
("scheme:u:p@example.com/path", "scheme://example.com/path"),
|
|
),
|
|
)
|
|
def test_urldefragauth(url, expected):
|
|
assert urldefragauth(url) == expected
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"url, expected",
|
|
(
|
|
("http://192.168.0.1:5000/", True),
|
|
("http://192.168.0.1/", True),
|
|
("http://172.16.1.1/", True),
|
|
("http://172.16.1.1:5000/", True),
|
|
("http://localhost.localdomain:5000/v1.0/", True),
|
|
("http://google.com:6000/", True),
|
|
("http://172.16.1.12/", False),
|
|
("http://172.16.1.12:5000/", False),
|
|
("http://google.com:5000/v1.0/", False),
|
|
("file:///some/path/on/disk", True),
|
|
),
|
|
)
|
|
def test_should_bypass_proxies(url, expected, monkeypatch):
|
|
"""Tests for function should_bypass_proxies to check if proxy
|
|
can be bypassed or not
|
|
"""
|
|
monkeypatch.setenv(
|
|
"no_proxy",
|
|
"192.168.0.0/24,127.0.0.1,localhost.localdomain,172.16.1.1, google.com:6000",
|
|
)
|
|
monkeypatch.setenv(
|
|
"NO_PROXY",
|
|
"192.168.0.0/24,127.0.0.1,localhost.localdomain,172.16.1.1, google.com:6000",
|
|
)
|
|
assert should_bypass_proxies(url, no_proxy=None) == expected
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"url, expected",
|
|
(
|
|
("http://172.16.1.1/", "172.16.1.1"),
|
|
("http://172.16.1.1:5000/", "172.16.1.1"),
|
|
("http://user:pass@172.16.1.1", "172.16.1.1"),
|
|
("http://user:pass@172.16.1.1:5000", "172.16.1.1"),
|
|
("http://hostname/", "hostname"),
|
|
("http://hostname:5000/", "hostname"),
|
|
("http://user:pass@hostname", "hostname"),
|
|
("http://user:pass@hostname:5000", "hostname"),
|
|
),
|
|
)
|
|
def test_should_bypass_proxies_pass_only_hostname(url, expected, mocker):
|
|
"""The proxy_bypass function should be called with a hostname or IP without
|
|
a port number or auth credentials.
|
|
"""
|
|
proxy_bypass = mocker.patch("requests.utils.proxy_bypass")
|
|
should_bypass_proxies(url, no_proxy=None)
|
|
proxy_bypass.assert_called_once_with(expected)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"cookiejar",
|
|
(
|
|
compat.cookielib.CookieJar(),
|
|
RequestsCookieJar(),
|
|
),
|
|
)
|
|
def test_add_dict_to_cookiejar(cookiejar):
|
|
"""Ensure add_dict_to_cookiejar works for
|
|
non-RequestsCookieJar CookieJars
|
|
"""
|
|
cookiedict = {"test": "cookies", "good": "cookies"}
|
|
cj = add_dict_to_cookiejar(cookiejar, cookiedict)
|
|
cookies = {cookie.name: cookie.value for cookie in cj}
|
|
assert cookiedict == cookies
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"value, expected",
|
|
(
|
|
("test", True),
|
|
("æíöû", False),
|
|
("ジェーピーニック", False),
|
|
),
|
|
)
|
|
def test_unicode_is_ascii(value, expected):
|
|
assert unicode_is_ascii(value) is expected
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"url, expected",
|
|
(
|
|
("http://192.168.0.1:5000/", True),
|
|
("http://192.168.0.1/", True),
|
|
("http://172.16.1.1/", True),
|
|
("http://172.16.1.1:5000/", True),
|
|
("http://localhost.localdomain:5000/v1.0/", True),
|
|
("http://172.16.1.12/", False),
|
|
("http://172.16.1.12:5000/", False),
|
|
("http://google.com:5000/v1.0/", False),
|
|
),
|
|
)
|
|
def test_should_bypass_proxies_no_proxy(url, expected, monkeypatch):
|
|
"""Tests for function should_bypass_proxies to check if proxy
|
|
can be bypassed or not using the 'no_proxy' argument
|
|
"""
|
|
no_proxy = "192.168.0.0/24,127.0.0.1,localhost.localdomain,172.16.1.1"
|
|
# Test 'no_proxy' argument
|
|
assert should_bypass_proxies(url, no_proxy=no_proxy) == expected
|
|
|
|
|
|
@pytest.mark.skipif(os.name != "nt", reason="Test only on Windows")
|
|
@pytest.mark.parametrize(
|
|
"url, expected, override",
|
|
(
|
|
("http://192.168.0.1:5000/", True, None),
|
|
("http://192.168.0.1/", True, None),
|
|
("http://172.16.1.1/", True, None),
|
|
("http://172.16.1.1:5000/", True, None),
|
|
("http://localhost.localdomain:5000/v1.0/", True, None),
|
|
("http://172.16.1.22/", False, None),
|
|
("http://172.16.1.22:5000/", False, None),
|
|
("http://google.com:5000/v1.0/", False, None),
|
|
("http://mylocalhostname:5000/v1.0/", True, "<local>"),
|
|
("http://192.168.0.1/", False, ""),
|
|
),
|
|
)
|
|
def test_should_bypass_proxies_win_registry(url, expected, override, monkeypatch):
|
|
"""Tests for function should_bypass_proxies to check if proxy
|
|
can be bypassed or not with Windows registry settings
|
|
"""
|
|
if override is None:
|
|
override = "192.168.*;127.0.0.1;localhost.localdomain;172.16.1.1"
|
|
import winreg
|
|
|
|
class RegHandle:
|
|
def Close(self):
|
|
pass
|
|
|
|
ie_settings = RegHandle()
|
|
proxyEnableValues = deque([1, "1"])
|
|
|
|
def OpenKey(key, subkey):
|
|
return ie_settings
|
|
|
|
def QueryValueEx(key, value_name):
|
|
if key is ie_settings:
|
|
if value_name == "ProxyEnable":
|
|
# this could be a string (REG_SZ) or a 32-bit number (REG_DWORD)
|
|
proxyEnableValues.rotate()
|
|
return [proxyEnableValues[0]]
|
|
elif value_name == "ProxyOverride":
|
|
return [override]
|
|
|
|
monkeypatch.setenv("http_proxy", "")
|
|
monkeypatch.setenv("https_proxy", "")
|
|
monkeypatch.setenv("ftp_proxy", "")
|
|
monkeypatch.setenv("no_proxy", "")
|
|
monkeypatch.setenv("NO_PROXY", "")
|
|
monkeypatch.setattr(winreg, "OpenKey", OpenKey)
|
|
monkeypatch.setattr(winreg, "QueryValueEx", QueryValueEx)
|
|
assert should_bypass_proxies(url, None) == expected
|
|
|
|
|
|
@pytest.mark.skipif(os.name != "nt", reason="Test only on Windows")
|
|
def test_should_bypass_proxies_win_registry_bad_values(monkeypatch):
|
|
"""Tests for function should_bypass_proxies to check if proxy
|
|
can be bypassed or not with Windows invalid registry settings.
|
|
"""
|
|
import winreg
|
|
|
|
class RegHandle:
|
|
def Close(self):
|
|
pass
|
|
|
|
ie_settings = RegHandle()
|
|
|
|
def OpenKey(key, subkey):
|
|
return ie_settings
|
|
|
|
def QueryValueEx(key, value_name):
|
|
if key is ie_settings:
|
|
if value_name == "ProxyEnable":
|
|
# Invalid response; Should be an int or int-y value
|
|
return [""]
|
|
elif value_name == "ProxyOverride":
|
|
return ["192.168.*;127.0.0.1;localhost.localdomain;172.16.1.1"]
|
|
|
|
monkeypatch.setenv("http_proxy", "")
|
|
monkeypatch.setenv("https_proxy", "")
|
|
monkeypatch.setenv("no_proxy", "")
|
|
monkeypatch.setenv("NO_PROXY", "")
|
|
monkeypatch.setattr(winreg, "OpenKey", OpenKey)
|
|
monkeypatch.setattr(winreg, "QueryValueEx", QueryValueEx)
|
|
assert should_bypass_proxies("http://172.16.1.1/", None) is False
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"env_name, value",
|
|
(
|
|
("no_proxy", "192.168.0.0/24,127.0.0.1,localhost.localdomain"),
|
|
("no_proxy", None),
|
|
("a_new_key", "192.168.0.0/24,127.0.0.1,localhost.localdomain"),
|
|
("a_new_key", None),
|
|
),
|
|
)
|
|
def test_set_environ(env_name, value):
|
|
"""Tests set_environ will set environ values and will restore the environ."""
|
|
environ_copy = copy.deepcopy(os.environ)
|
|
with set_environ(env_name, value):
|
|
assert os.environ.get(env_name) == value
|
|
|
|
assert os.environ == environ_copy
|
|
|
|
|
|
def test_set_environ_raises_exception():
|
|
"""Tests set_environ will raise exceptions in context when the
|
|
value parameter is None."""
|
|
with pytest.raises(Exception) as exception:
|
|
with set_environ("test1", None):
|
|
raise Exception("Expected exception")
|
|
|
|
assert "Expected exception" in str(exception.value)
|