mirror of
https://github.com/kennethreitz/requests.git
synced 2026-06-05 14:50:16 +00:00
2813 lines
96 KiB
Python
2813 lines
96 KiB
Python
"""Tests for Requests."""
|
|
|
|
import collections
|
|
import contextlib
|
|
import io
|
|
import json
|
|
import os
|
|
import pickle
|
|
import re
|
|
import warnings
|
|
from unittest import mock
|
|
|
|
import pytest
|
|
import urllib3
|
|
from urllib3.util import Timeout as Urllib3Timeout
|
|
|
|
import requests
|
|
from requests.adapters import HTTPAdapter
|
|
from requests.auth import HTTPDigestAuth, _basic_auth_str
|
|
from requests.compat import (
|
|
JSONDecodeError,
|
|
Morsel,
|
|
MutableMapping,
|
|
builtin_str,
|
|
cookielib,
|
|
getproxies,
|
|
urlparse,
|
|
)
|
|
from requests.cookies import cookiejar_from_dict, morsel_to_cookie
|
|
from requests.exceptions import (
|
|
ChunkedEncodingError,
|
|
ConnectionError,
|
|
ConnectTimeout,
|
|
ContentDecodingError,
|
|
InvalidHeader,
|
|
InvalidProxyURL,
|
|
InvalidSchema,
|
|
InvalidURL,
|
|
MissingSchema,
|
|
ProxyError,
|
|
ReadTimeout,
|
|
RequestException,
|
|
RetryError,
|
|
)
|
|
from requests.exceptions import SSLError as RequestsSSLError
|
|
from requests.exceptions import Timeout, TooManyRedirects, UnrewindableBodyError
|
|
from requests.hooks import default_hooks
|
|
from requests.models import PreparedRequest, urlencode
|
|
from requests.sessions import SessionRedirectMixin
|
|
from requests.structures import CaseInsensitiveDict
|
|
|
|
from . import SNIMissingWarning
|
|
from .compat import StringIO
|
|
from .utils import override_environ
|
|
|
|
# Requests to this URL should always fail with a connection timeout (nothing
|
|
# listening on that port)
|
|
TARPIT = "http://10.255.255.1"
|
|
|
|
# This is to avoid waiting the timeout of using TARPIT
|
|
INVALID_PROXY = "http://localhost:1"
|
|
|
|
try:
|
|
from ssl import SSLContext
|
|
|
|
del SSLContext
|
|
HAS_MODERN_SSL = True
|
|
except ImportError:
|
|
HAS_MODERN_SSL = False
|
|
|
|
try:
|
|
requests.pyopenssl
|
|
HAS_PYOPENSSL = True
|
|
except AttributeError:
|
|
HAS_PYOPENSSL = False
|
|
|
|
|
|
class TestRequests:
|
|
digest_auth_algo = ("MD5", "SHA-256", "SHA-512")
|
|
|
|
def test_entry_points(self):
|
|
requests.session
|
|
requests.session().get
|
|
requests.session().head
|
|
requests.get
|
|
requests.head
|
|
requests.put
|
|
requests.patch
|
|
requests.post
|
|
# Not really an entry point, but people rely on it.
|
|
from requests.packages.urllib3.poolmanager import PoolManager # noqa:F401
|
|
|
|
@pytest.mark.parametrize(
|
|
"exception, url",
|
|
(
|
|
(MissingSchema, "hiwpefhipowhefopw"),
|
|
(InvalidSchema, "localhost:3128"),
|
|
(InvalidSchema, "localhost.localdomain:3128/"),
|
|
(InvalidSchema, "10.122.1.1:3128/"),
|
|
(InvalidURL, "http://"),
|
|
(InvalidURL, "http://*example.com"),
|
|
(InvalidURL, "http://.example.com"),
|
|
),
|
|
)
|
|
def test_invalid_url(self, exception, url):
|
|
with pytest.raises(exception):
|
|
requests.get(url)
|
|
|
|
def test_basic_building(self):
|
|
req = requests.Request()
|
|
req.url = "http://kennethreitz.org/"
|
|
req.data = {"life": "42"}
|
|
|
|
pr = req.prepare()
|
|
assert pr.url == req.url
|
|
assert pr.body == "life=42"
|
|
|
|
@pytest.mark.parametrize("method", ("GET", "HEAD"))
|
|
def test_no_content_length(self, httpbin, method):
|
|
req = requests.Request(method, httpbin(method.lower())).prepare()
|
|
assert "Content-Length" not in req.headers
|
|
|
|
@pytest.mark.parametrize("method", ("POST", "PUT", "PATCH", "OPTIONS"))
|
|
def test_no_body_content_length(self, httpbin, method):
|
|
req = requests.Request(method, httpbin(method.lower())).prepare()
|
|
assert req.headers["Content-Length"] == "0"
|
|
|
|
@pytest.mark.parametrize("method", ("POST", "PUT", "PATCH", "OPTIONS"))
|
|
def test_empty_content_length(self, httpbin, method):
|
|
req = requests.Request(method, httpbin(method.lower()), data="").prepare()
|
|
assert req.headers["Content-Length"] == "0"
|
|
|
|
def test_override_content_length(self, httpbin):
|
|
headers = {"Content-Length": "not zero"}
|
|
r = requests.Request("POST", httpbin("post"), headers=headers).prepare()
|
|
assert "Content-Length" in r.headers
|
|
assert r.headers["Content-Length"] == "not zero"
|
|
|
|
def test_path_is_not_double_encoded(self):
|
|
request = requests.Request("GET", "http://0.0.0.0/get/test case").prepare()
|
|
|
|
assert request.path_url == "/get/test%20case"
|
|
|
|
@pytest.mark.parametrize(
|
|
"url, expected",
|
|
(
|
|
(
|
|
"http://example.com/path#fragment",
|
|
"http://example.com/path?a=b#fragment",
|
|
),
|
|
(
|
|
"http://example.com/path?key=value#fragment",
|
|
"http://example.com/path?key=value&a=b#fragment",
|
|
),
|
|
),
|
|
)
|
|
def test_params_are_added_before_fragment(self, url, expected):
|
|
request = requests.Request("GET", url, params={"a": "b"}).prepare()
|
|
assert request.url == expected
|
|
|
|
def test_params_original_order_is_preserved_by_default(self):
|
|
param_ordered_dict = collections.OrderedDict(
|
|
(("z", 1), ("a", 1), ("k", 1), ("d", 1))
|
|
)
|
|
session = requests.Session()
|
|
request = requests.Request(
|
|
"GET", "http://example.com/", params=param_ordered_dict
|
|
)
|
|
prep = session.prepare_request(request)
|
|
assert prep.url == "http://example.com/?z=1&a=1&k=1&d=1"
|
|
|
|
def test_params_bytes_are_encoded(self):
|
|
request = requests.Request(
|
|
"GET", "http://example.com", params=b"test=foo"
|
|
).prepare()
|
|
assert request.url == "http://example.com/?test=foo"
|
|
|
|
def test_binary_put(self):
|
|
request = requests.Request(
|
|
"PUT", "http://example.com", data="ööö".encode()
|
|
).prepare()
|
|
assert isinstance(request.body, bytes)
|
|
|
|
def test_whitespaces_are_removed_from_url(self):
|
|
# Test for issue #3696
|
|
request = requests.Request("GET", " http://example.com").prepare()
|
|
assert request.url == "http://example.com/"
|
|
|
|
@pytest.mark.parametrize("scheme", ("http://", "HTTP://", "hTTp://", "HttP://"))
|
|
def test_mixed_case_scheme_acceptable(self, httpbin, scheme):
|
|
s = requests.Session()
|
|
s.proxies = getproxies()
|
|
parts = urlparse(httpbin("get"))
|
|
url = scheme + parts.netloc + parts.path
|
|
r = requests.Request("GET", url)
|
|
r = s.send(r.prepare())
|
|
assert r.status_code == 200, f"failed for scheme {scheme}"
|
|
|
|
def test_HTTP_200_OK_GET_ALTERNATIVE(self, httpbin):
|
|
r = requests.Request("GET", httpbin("get"))
|
|
s = requests.Session()
|
|
s.proxies = getproxies()
|
|
|
|
r = s.send(r.prepare())
|
|
|
|
assert r.status_code == 200
|
|
|
|
def test_HTTP_302_ALLOW_REDIRECT_GET(self, httpbin):
|
|
r = requests.get(httpbin("redirect", "1"))
|
|
assert r.status_code == 200
|
|
assert r.history[0].status_code == 302
|
|
assert r.history[0].is_redirect
|
|
|
|
def test_HTTP_307_ALLOW_REDIRECT_POST(self, httpbin):
|
|
r = requests.post(
|
|
httpbin("redirect-to"),
|
|
data="test",
|
|
params={"url": "post", "status_code": 307},
|
|
)
|
|
assert r.status_code == 200
|
|
assert r.history[0].status_code == 307
|
|
assert r.history[0].is_redirect
|
|
assert r.json()["data"] == "test"
|
|
|
|
def test_HTTP_307_ALLOW_REDIRECT_POST_WITH_SEEKABLE(self, httpbin):
|
|
byte_str = b"test"
|
|
r = requests.post(
|
|
httpbin("redirect-to"),
|
|
data=io.BytesIO(byte_str),
|
|
params={"url": "post", "status_code": 307},
|
|
)
|
|
assert r.status_code == 200
|
|
assert r.history[0].status_code == 307
|
|
assert r.history[0].is_redirect
|
|
assert r.json()["data"] == byte_str.decode("utf-8")
|
|
|
|
def test_HTTP_302_TOO_MANY_REDIRECTS(self, httpbin):
|
|
try:
|
|
requests.get(httpbin("relative-redirect", "50"))
|
|
except TooManyRedirects as e:
|
|
url = httpbin("relative-redirect", "20")
|
|
assert e.request.url == url
|
|
assert e.response.url == url
|
|
assert len(e.response.history) == 30
|
|
else:
|
|
pytest.fail("Expected redirect to raise TooManyRedirects but it did not")
|
|
|
|
def test_HTTP_302_TOO_MANY_REDIRECTS_WITH_PARAMS(self, httpbin):
|
|
s = requests.session()
|
|
s.max_redirects = 5
|
|
try:
|
|
s.get(httpbin("relative-redirect", "50"))
|
|
except TooManyRedirects as e:
|
|
url = httpbin("relative-redirect", "45")
|
|
assert e.request.url == url
|
|
assert e.response.url == url
|
|
assert len(e.response.history) == 5
|
|
else:
|
|
pytest.fail(
|
|
"Expected custom max number of redirects to be respected but was not"
|
|
)
|
|
|
|
def test_http_301_changes_post_to_get(self, httpbin):
|
|
r = requests.post(httpbin("status", "301"))
|
|
assert r.status_code == 200
|
|
assert r.request.method == "GET"
|
|
assert r.history[0].status_code == 301
|
|
assert r.history[0].is_redirect
|
|
|
|
def test_http_301_doesnt_change_head_to_get(self, httpbin):
|
|
r = requests.head(httpbin("status", "301"), allow_redirects=True)
|
|
print(r.content)
|
|
assert r.status_code == 200
|
|
assert r.request.method == "HEAD"
|
|
assert r.history[0].status_code == 301
|
|
assert r.history[0].is_redirect
|
|
|
|
def test_http_302_changes_post_to_get(self, httpbin):
|
|
r = requests.post(httpbin("status", "302"))
|
|
assert r.status_code == 200
|
|
assert r.request.method == "GET"
|
|
assert r.history[0].status_code == 302
|
|
assert r.history[0].is_redirect
|
|
|
|
def test_http_302_doesnt_change_head_to_get(self, httpbin):
|
|
r = requests.head(httpbin("status", "302"), allow_redirects=True)
|
|
assert r.status_code == 200
|
|
assert r.request.method == "HEAD"
|
|
assert r.history[0].status_code == 302
|
|
assert r.history[0].is_redirect
|
|
|
|
def test_http_303_changes_post_to_get(self, httpbin):
|
|
r = requests.post(httpbin("status", "303"))
|
|
assert r.status_code == 200
|
|
assert r.request.method == "GET"
|
|
assert r.history[0].status_code == 303
|
|
assert r.history[0].is_redirect
|
|
|
|
def test_http_303_doesnt_change_head_to_get(self, httpbin):
|
|
r = requests.head(httpbin("status", "303"), allow_redirects=True)
|
|
assert r.status_code == 200
|
|
assert r.request.method == "HEAD"
|
|
assert r.history[0].status_code == 303
|
|
assert r.history[0].is_redirect
|
|
|
|
def test_header_and_body_removal_on_redirect(self, httpbin):
|
|
purged_headers = ("Content-Length", "Content-Type")
|
|
ses = requests.Session()
|
|
req = requests.Request("POST", httpbin("post"), data={"test": "data"})
|
|
prep = ses.prepare_request(req)
|
|
resp = ses.send(prep)
|
|
|
|
# Mimic a redirect response
|
|
resp.status_code = 302
|
|
resp.headers["location"] = "get"
|
|
|
|
# Run request through resolve_redirects
|
|
next_resp = next(ses.resolve_redirects(resp, prep))
|
|
assert next_resp.request.body is None
|
|
for header in purged_headers:
|
|
assert header not in next_resp.request.headers
|
|
|
|
def test_transfer_enc_removal_on_redirect(self, httpbin):
|
|
purged_headers = ("Transfer-Encoding", "Content-Type")
|
|
ses = requests.Session()
|
|
req = requests.Request("POST", httpbin("post"), data=(b"x" for x in range(1)))
|
|
prep = ses.prepare_request(req)
|
|
assert "Transfer-Encoding" in prep.headers
|
|
|
|
# Create Response to avoid https://github.com/kevin1024/pytest-httpbin/issues/33
|
|
resp = requests.Response()
|
|
resp.raw = io.BytesIO(b"the content")
|
|
resp.request = prep
|
|
setattr(resp.raw, "release_conn", lambda *args: args)
|
|
|
|
# Mimic a redirect response
|
|
resp.status_code = 302
|
|
resp.headers["location"] = httpbin("get")
|
|
|
|
# Run request through resolve_redirect
|
|
next_resp = next(ses.resolve_redirects(resp, prep))
|
|
assert next_resp.request.body is None
|
|
for header in purged_headers:
|
|
assert header not in next_resp.request.headers
|
|
|
|
def test_fragment_maintained_on_redirect(self, httpbin):
|
|
fragment = "#view=edit&token=hunter2"
|
|
r = requests.get(httpbin("redirect-to?url=get") + fragment)
|
|
|
|
assert len(r.history) > 0
|
|
assert r.history[0].request.url == httpbin("redirect-to?url=get") + fragment
|
|
assert r.url == httpbin("get") + fragment
|
|
|
|
def test_HTTP_200_OK_GET_WITH_PARAMS(self, httpbin):
|
|
heads = {"User-agent": "Mozilla/5.0"}
|
|
|
|
r = requests.get(httpbin("user-agent"), headers=heads)
|
|
|
|
assert heads["User-agent"] in r.text
|
|
assert r.status_code == 200
|
|
|
|
def test_HTTP_200_OK_GET_WITH_MIXED_PARAMS(self, httpbin):
|
|
heads = {"User-agent": "Mozilla/5.0"}
|
|
|
|
r = requests.get(
|
|
httpbin("get") + "?test=true", params={"q": "test"}, headers=heads
|
|
)
|
|
assert r.status_code == 200
|
|
|
|
def test_set_cookie_on_301(self, httpbin):
|
|
s = requests.session()
|
|
url = httpbin("cookies/set?foo=bar")
|
|
s.get(url)
|
|
assert s.cookies["foo"] == "bar"
|
|
|
|
def test_cookie_sent_on_redirect(self, httpbin):
|
|
s = requests.session()
|
|
s.get(httpbin("cookies/set?foo=bar"))
|
|
r = s.get(httpbin("redirect/1")) # redirects to httpbin('get')
|
|
assert "Cookie" in r.json()["headers"]
|
|
|
|
def test_cookie_removed_on_expire(self, httpbin):
|
|
s = requests.session()
|
|
s.get(httpbin("cookies/set?foo=bar"))
|
|
assert s.cookies["foo"] == "bar"
|
|
s.get(
|
|
httpbin("response-headers"),
|
|
params={"Set-Cookie": "foo=deleted; expires=Thu, 01-Jan-1970 00:00:01 GMT"},
|
|
)
|
|
assert "foo" not in s.cookies
|
|
|
|
def test_cookie_quote_wrapped(self, httpbin):
|
|
s = requests.session()
|
|
s.get(httpbin('cookies/set?foo="bar:baz"'))
|
|
assert s.cookies["foo"] == '"bar:baz"'
|
|
|
|
def test_cookie_persists_via_api(self, httpbin):
|
|
s = requests.session()
|
|
r = s.get(httpbin("redirect/1"), cookies={"foo": "bar"})
|
|
assert "foo" in r.request.headers["Cookie"]
|
|
assert "foo" in r.history[0].request.headers["Cookie"]
|
|
|
|
def test_request_cookie_overrides_session_cookie(self, httpbin):
|
|
s = requests.session()
|
|
s.cookies["foo"] = "bar"
|
|
r = s.get(httpbin("cookies"), cookies={"foo": "baz"})
|
|
assert r.json()["cookies"]["foo"] == "baz"
|
|
# Session cookie should not be modified
|
|
assert s.cookies["foo"] == "bar"
|
|
|
|
def test_request_cookies_not_persisted(self, httpbin):
|
|
s = requests.session()
|
|
s.get(httpbin("cookies"), cookies={"foo": "baz"})
|
|
# Sending a request with cookies should not add cookies to the session
|
|
assert not s.cookies
|
|
|
|
def test_generic_cookiejar_works(self, httpbin):
|
|
cj = cookielib.CookieJar()
|
|
cookiejar_from_dict({"foo": "bar"}, cj)
|
|
s = requests.session()
|
|
s.cookies = cj
|
|
r = s.get(httpbin("cookies"))
|
|
# Make sure the cookie was sent
|
|
assert r.json()["cookies"]["foo"] == "bar"
|
|
# Make sure the session cj is still the custom one
|
|
assert s.cookies is cj
|
|
|
|
def test_param_cookiejar_works(self, httpbin):
|
|
cj = cookielib.CookieJar()
|
|
cookiejar_from_dict({"foo": "bar"}, cj)
|
|
s = requests.session()
|
|
r = s.get(httpbin("cookies"), cookies=cj)
|
|
# Make sure the cookie was sent
|
|
assert r.json()["cookies"]["foo"] == "bar"
|
|
|
|
def test_cookielib_cookiejar_on_redirect(self, httpbin):
|
|
"""Tests resolve_redirect doesn't fail when merging cookies
|
|
with non-RequestsCookieJar cookiejar.
|
|
|
|
See GH #3579
|
|
"""
|
|
cj = cookiejar_from_dict({"foo": "bar"}, cookielib.CookieJar())
|
|
s = requests.Session()
|
|
s.cookies = cookiejar_from_dict({"cookie": "tasty"})
|
|
|
|
# Prepare request without using Session
|
|
req = requests.Request("GET", httpbin("headers"), cookies=cj)
|
|
prep_req = req.prepare()
|
|
|
|
# Send request and simulate redirect
|
|
resp = s.send(prep_req)
|
|
resp.status_code = 302
|
|
resp.headers["location"] = httpbin("get")
|
|
redirects = s.resolve_redirects(resp, prep_req)
|
|
resp = next(redirects)
|
|
|
|
# Verify CookieJar isn't being converted to RequestsCookieJar
|
|
assert isinstance(prep_req._cookies, cookielib.CookieJar)
|
|
assert isinstance(resp.request._cookies, cookielib.CookieJar)
|
|
assert not isinstance(resp.request._cookies, requests.cookies.RequestsCookieJar)
|
|
|
|
cookies = {}
|
|
for c in resp.request._cookies:
|
|
cookies[c.name] = c.value
|
|
assert cookies["foo"] == "bar"
|
|
assert cookies["cookie"] == "tasty"
|
|
|
|
def test_requests_in_history_are_not_overridden(self, httpbin):
|
|
resp = requests.get(httpbin("redirect/3"))
|
|
urls = [r.url for r in resp.history]
|
|
req_urls = [r.request.url for r in resp.history]
|
|
assert urls == req_urls
|
|
|
|
def test_history_is_always_a_list(self, httpbin):
|
|
"""Show that even with redirects, Response.history is always a list."""
|
|
resp = requests.get(httpbin("get"))
|
|
assert isinstance(resp.history, list)
|
|
resp = requests.get(httpbin("redirect/1"))
|
|
assert isinstance(resp.history, list)
|
|
assert not isinstance(resp.history, tuple)
|
|
|
|
def test_headers_on_session_with_None_are_not_sent(self, httpbin):
|
|
"""Do not send headers in Session.headers with None values."""
|
|
ses = requests.Session()
|
|
ses.headers["Accept-Encoding"] = None
|
|
req = requests.Request("GET", httpbin("get"))
|
|
prep = ses.prepare_request(req)
|
|
assert "Accept-Encoding" not in prep.headers
|
|
|
|
def test_headers_preserve_order(self, httpbin):
|
|
"""Preserve order when headers provided as OrderedDict."""
|
|
ses = requests.Session()
|
|
ses.headers = collections.OrderedDict()
|
|
ses.headers["Accept-Encoding"] = "identity"
|
|
ses.headers["First"] = "1"
|
|
ses.headers["Second"] = "2"
|
|
headers = collections.OrderedDict([("Third", "3"), ("Fourth", "4")])
|
|
headers["Fifth"] = "5"
|
|
headers["Second"] = "222"
|
|
req = requests.Request("GET", httpbin("get"), headers=headers)
|
|
prep = ses.prepare_request(req)
|
|
items = list(prep.headers.items())
|
|
assert items[0] == ("Accept-Encoding", "identity")
|
|
assert items[1] == ("First", "1")
|
|
assert items[2] == ("Second", "222")
|
|
assert items[3] == ("Third", "3")
|
|
assert items[4] == ("Fourth", "4")
|
|
assert items[5] == ("Fifth", "5")
|
|
|
|
@pytest.mark.parametrize("key", ("User-agent", "user-agent"))
|
|
def test_user_agent_transfers(self, httpbin, key):
|
|
heads = {key: "Mozilla/5.0 (github.com/psf/requests)"}
|
|
|
|
r = requests.get(httpbin("user-agent"), headers=heads)
|
|
assert heads[key] in r.text
|
|
|
|
def test_HTTP_200_OK_HEAD(self, httpbin):
|
|
r = requests.head(httpbin("get"))
|
|
assert r.status_code == 200
|
|
|
|
def test_HTTP_200_OK_PUT(self, httpbin):
|
|
r = requests.put(httpbin("put"))
|
|
assert r.status_code == 200
|
|
|
|
def test_BASICAUTH_TUPLE_HTTP_200_OK_GET(self, httpbin):
|
|
auth = ("user", "pass")
|
|
url = httpbin("basic-auth", "user", "pass")
|
|
|
|
r = requests.get(url, auth=auth)
|
|
assert r.status_code == 200
|
|
|
|
r = requests.get(url)
|
|
assert r.status_code == 401
|
|
|
|
s = requests.session()
|
|
s.auth = auth
|
|
r = s.get(url)
|
|
assert r.status_code == 200
|
|
|
|
@pytest.mark.parametrize(
|
|
"username, password",
|
|
(
|
|
("user", "pass"),
|
|
("имя".encode(), "пароль".encode()),
|
|
(42, 42),
|
|
(None, None),
|
|
),
|
|
)
|
|
def test_set_basicauth(self, httpbin, username, password):
|
|
auth = (username, password)
|
|
url = httpbin("get")
|
|
|
|
r = requests.Request("GET", url, auth=auth)
|
|
p = r.prepare()
|
|
|
|
assert p.headers["Authorization"] == _basic_auth_str(username, password)
|
|
|
|
def test_basicauth_encodes_byte_strings(self):
|
|
"""Ensure b'test' formats as the byte string "test" rather
|
|
than the unicode string "b'test'" in Python 3.
|
|
"""
|
|
auth = (b"\xc5\xafsername", b"test\xc6\xb6")
|
|
r = requests.Request("GET", "http://localhost", auth=auth)
|
|
p = r.prepare()
|
|
|
|
assert p.headers["Authorization"] == "Basic xa9zZXJuYW1lOnRlc3TGtg=="
|
|
|
|
@pytest.mark.parametrize(
|
|
"url, exception",
|
|
(
|
|
# Connecting to an unknown domain should raise a ConnectionError
|
|
("http://doesnotexist.google.com", ConnectionError),
|
|
# Connecting to an invalid port should raise a ConnectionError
|
|
("http://localhost:1", ConnectionError),
|
|
# Inputing a URL that cannot be parsed should raise an InvalidURL error
|
|
("http://fe80::5054:ff:fe5a:fc0", InvalidURL),
|
|
),
|
|
)
|
|
def test_errors(self, url, exception):
|
|
with pytest.raises(exception):
|
|
requests.get(url, timeout=1)
|
|
|
|
def test_proxy_error(self):
|
|
# any proxy related error (address resolution, no route to host, etc) should result in a ProxyError
|
|
with pytest.raises(ProxyError):
|
|
requests.get(
|
|
"http://localhost:1", proxies={"http": "non-resolvable-address"}
|
|
)
|
|
|
|
def test_proxy_error_on_bad_url(self, httpbin, httpbin_secure):
|
|
with pytest.raises(InvalidProxyURL):
|
|
requests.get(httpbin_secure(), proxies={"https": "http:/badproxyurl:3128"})
|
|
|
|
with pytest.raises(InvalidProxyURL):
|
|
requests.get(httpbin(), proxies={"http": "http://:8080"})
|
|
|
|
with pytest.raises(InvalidProxyURL):
|
|
requests.get(httpbin_secure(), proxies={"https": "https://"})
|
|
|
|
with pytest.raises(InvalidProxyURL):
|
|
requests.get(httpbin(), proxies={"http": "http:///example.com:8080"})
|
|
|
|
def test_respect_proxy_env_on_send_self_prepared_request(self, httpbin):
|
|
with override_environ(http_proxy=INVALID_PROXY):
|
|
with pytest.raises(ProxyError):
|
|
session = requests.Session()
|
|
request = requests.Request("GET", httpbin())
|
|
session.send(request.prepare())
|
|
|
|
def test_respect_proxy_env_on_send_session_prepared_request(self, httpbin):
|
|
with override_environ(http_proxy=INVALID_PROXY):
|
|
with pytest.raises(ProxyError):
|
|
session = requests.Session()
|
|
request = requests.Request("GET", httpbin())
|
|
prepared = session.prepare_request(request)
|
|
session.send(prepared)
|
|
|
|
def test_respect_proxy_env_on_send_with_redirects(self, httpbin):
|
|
with override_environ(http_proxy=INVALID_PROXY):
|
|
with pytest.raises(ProxyError):
|
|
session = requests.Session()
|
|
url = httpbin("redirect/1")
|
|
print(url)
|
|
request = requests.Request("GET", url)
|
|
session.send(request.prepare())
|
|
|
|
def test_respect_proxy_env_on_get(self, httpbin):
|
|
with override_environ(http_proxy=INVALID_PROXY):
|
|
with pytest.raises(ProxyError):
|
|
session = requests.Session()
|
|
session.get(httpbin())
|
|
|
|
def test_respect_proxy_env_on_request(self, httpbin):
|
|
with override_environ(http_proxy=INVALID_PROXY):
|
|
with pytest.raises(ProxyError):
|
|
session = requests.Session()
|
|
session.request(method="GET", url=httpbin())
|
|
|
|
def test_proxy_authorization_preserved_on_request(self, httpbin):
|
|
proxy_auth_value = "Bearer XXX"
|
|
session = requests.Session()
|
|
session.headers.update({"Proxy-Authorization": proxy_auth_value})
|
|
resp = session.request(method="GET", url=httpbin("get"))
|
|
sent_headers = resp.json().get("headers", {})
|
|
|
|
assert sent_headers.get("Proxy-Authorization") == proxy_auth_value
|
|
|
|
@pytest.mark.parametrize(
|
|
"url,has_proxy_auth",
|
|
(
|
|
("http://example.com", True),
|
|
("https://example.com", False),
|
|
),
|
|
)
|
|
def test_proxy_authorization_not_appended_to_https_request(
|
|
self, url, has_proxy_auth
|
|
):
|
|
session = requests.Session()
|
|
proxies = {
|
|
"http": "http://test:pass@localhost:8080",
|
|
"https": "http://test:pass@localhost:8090",
|
|
}
|
|
req = requests.Request("GET", url)
|
|
prep = req.prepare()
|
|
session.rebuild_proxies(prep, proxies)
|
|
|
|
assert ("Proxy-Authorization" in prep.headers) is has_proxy_auth
|
|
|
|
def test_basicauth_with_netrc(self, httpbin):
|
|
auth = ("user", "pass")
|
|
wrong_auth = ("wronguser", "wrongpass")
|
|
url = httpbin("basic-auth", "user", "pass")
|
|
|
|
old_auth = requests.sessions.get_netrc_auth
|
|
|
|
try:
|
|
|
|
def get_netrc_auth_mock(url):
|
|
return auth
|
|
|
|
requests.sessions.get_netrc_auth = get_netrc_auth_mock
|
|
|
|
# Should use netrc and work.
|
|
r = requests.get(url)
|
|
assert r.status_code == 200
|
|
|
|
# Given auth should override and fail.
|
|
r = requests.get(url, auth=wrong_auth)
|
|
assert r.status_code == 401
|
|
|
|
s = requests.session()
|
|
|
|
# Should use netrc and work.
|
|
r = s.get(url)
|
|
assert r.status_code == 200
|
|
|
|
# Given auth should override and fail.
|
|
s.auth = wrong_auth
|
|
r = s.get(url)
|
|
assert r.status_code == 401
|
|
finally:
|
|
requests.sessions.get_netrc_auth = old_auth
|
|
|
|
def test_DIGEST_HTTP_200_OK_GET(self, httpbin):
|
|
for authtype in self.digest_auth_algo:
|
|
auth = HTTPDigestAuth("user", "pass")
|
|
url = httpbin("digest-auth", "auth", "user", "pass", authtype, "never")
|
|
|
|
r = requests.get(url, auth=auth)
|
|
assert r.status_code == 200
|
|
|
|
r = requests.get(url)
|
|
assert r.status_code == 401
|
|
print(r.headers["WWW-Authenticate"])
|
|
|
|
s = requests.session()
|
|
s.auth = HTTPDigestAuth("user", "pass")
|
|
r = s.get(url)
|
|
assert r.status_code == 200
|
|
|
|
def test_DIGEST_AUTH_RETURNS_COOKIE(self, httpbin):
|
|
for authtype in self.digest_auth_algo:
|
|
url = httpbin("digest-auth", "auth", "user", "pass", authtype)
|
|
auth = HTTPDigestAuth("user", "pass")
|
|
r = requests.get(url)
|
|
assert r.cookies["fake"] == "fake_value"
|
|
|
|
r = requests.get(url, auth=auth)
|
|
assert r.status_code == 200
|
|
|
|
def test_DIGEST_AUTH_SETS_SESSION_COOKIES(self, httpbin):
|
|
for authtype in self.digest_auth_algo:
|
|
url = httpbin("digest-auth", "auth", "user", "pass", authtype)
|
|
auth = HTTPDigestAuth("user", "pass")
|
|
s = requests.Session()
|
|
s.get(url, auth=auth)
|
|
assert s.cookies["fake"] == "fake_value"
|
|
|
|
def test_DIGEST_STREAM(self, httpbin):
|
|
for authtype in self.digest_auth_algo:
|
|
auth = HTTPDigestAuth("user", "pass")
|
|
url = httpbin("digest-auth", "auth", "user", "pass", authtype)
|
|
|
|
r = requests.get(url, auth=auth, stream=True)
|
|
assert r.raw.read() != b""
|
|
|
|
r = requests.get(url, auth=auth, stream=False)
|
|
assert r.raw.read() == b""
|
|
|
|
def test_DIGESTAUTH_WRONG_HTTP_401_GET(self, httpbin):
|
|
for authtype in self.digest_auth_algo:
|
|
auth = HTTPDigestAuth("user", "wrongpass")
|
|
url = httpbin("digest-auth", "auth", "user", "pass", authtype)
|
|
|
|
r = requests.get(url, auth=auth)
|
|
assert r.status_code == 401
|
|
|
|
r = requests.get(url)
|
|
assert r.status_code == 401
|
|
|
|
s = requests.session()
|
|
s.auth = auth
|
|
r = s.get(url)
|
|
assert r.status_code == 401
|
|
|
|
def test_DIGESTAUTH_QUOTES_QOP_VALUE(self, httpbin):
|
|
for authtype in self.digest_auth_algo:
|
|
auth = HTTPDigestAuth("user", "pass")
|
|
url = httpbin("digest-auth", "auth", "user", "pass", authtype)
|
|
|
|
r = requests.get(url, auth=auth)
|
|
assert '"auth"' in r.request.headers["Authorization"]
|
|
|
|
def test_POSTBIN_GET_POST_FILES(self, httpbin):
|
|
url = httpbin("post")
|
|
requests.post(url).raise_for_status()
|
|
|
|
post1 = requests.post(url, data={"some": "data"})
|
|
assert post1.status_code == 200
|
|
|
|
with open("requirements-dev.txt") as f:
|
|
post2 = requests.post(url, files={"some": f})
|
|
assert post2.status_code == 200
|
|
|
|
post4 = requests.post(url, data='[{"some": "json"}]')
|
|
assert post4.status_code == 200
|
|
|
|
with pytest.raises(ValueError):
|
|
requests.post(url, files=["bad file data"])
|
|
|
|
def test_invalid_files_input(self, httpbin):
|
|
url = httpbin("post")
|
|
post = requests.post(url, files={"random-file-1": None, "random-file-2": 1})
|
|
assert b'name="random-file-1"' not in post.request.body
|
|
assert b'name="random-file-2"' in post.request.body
|
|
|
|
def test_POSTBIN_SEEKED_OBJECT_WITH_NO_ITER(self, httpbin):
|
|
class TestStream:
|
|
def __init__(self, data):
|
|
self.data = data.encode()
|
|
self.length = len(self.data)
|
|
self.index = 0
|
|
|
|
def __len__(self):
|
|
return self.length
|
|
|
|
def read(self, size=None):
|
|
if size:
|
|
ret = self.data[self.index : self.index + size]
|
|
self.index += size
|
|
else:
|
|
ret = self.data[self.index :]
|
|
self.index = self.length
|
|
return ret
|
|
|
|
def tell(self):
|
|
return self.index
|
|
|
|
def seek(self, offset, where=0):
|
|
if where == 0:
|
|
self.index = offset
|
|
elif where == 1:
|
|
self.index += offset
|
|
elif where == 2:
|
|
self.index = self.length + offset
|
|
|
|
test = TestStream("test")
|
|
post1 = requests.post(httpbin("post"), data=test)
|
|
assert post1.status_code == 200
|
|
assert post1.json()["data"] == "test"
|
|
|
|
test = TestStream("test")
|
|
test.seek(2)
|
|
post2 = requests.post(httpbin("post"), data=test)
|
|
assert post2.status_code == 200
|
|
assert post2.json()["data"] == "st"
|
|
|
|
def test_POSTBIN_GET_POST_FILES_WITH_DATA(self, httpbin):
|
|
url = httpbin("post")
|
|
requests.post(url).raise_for_status()
|
|
|
|
post1 = requests.post(url, data={"some": "data"})
|
|
assert post1.status_code == 200
|
|
|
|
with open("requirements-dev.txt") as f:
|
|
post2 = requests.post(url, data={"some": "data"}, files={"some": f})
|
|
assert post2.status_code == 200
|
|
|
|
post4 = requests.post(url, data='[{"some": "json"}]')
|
|
assert post4.status_code == 200
|
|
|
|
with pytest.raises(ValueError):
|
|
requests.post(url, files=["bad file data"])
|
|
|
|
def test_post_with_custom_mapping(self, httpbin):
|
|
class CustomMapping(MutableMapping):
|
|
def __init__(self, *args, **kwargs):
|
|
self.data = dict(*args, **kwargs)
|
|
|
|
def __delitem__(self, key):
|
|
del self.data[key]
|
|
|
|
def __getitem__(self, key):
|
|
return self.data[key]
|
|
|
|
def __setitem__(self, key, value):
|
|
self.data[key] = value
|
|
|
|
def __iter__(self):
|
|
return iter(self.data)
|
|
|
|
def __len__(self):
|
|
return len(self.data)
|
|
|
|
data = CustomMapping({"some": "data"})
|
|
url = httpbin("post")
|
|
found_json = requests.post(url, data=data).json().get("form")
|
|
assert found_json == {"some": "data"}
|
|
|
|
def test_conflicting_post_params(self, httpbin):
|
|
url = httpbin("post")
|
|
with open("requirements-dev.txt") as f:
|
|
with pytest.raises(ValueError):
|
|
requests.post(url, data='[{"some": "data"}]', files={"some": f})
|
|
|
|
def test_request_ok_set(self, httpbin):
|
|
r = requests.get(httpbin("status", "404"))
|
|
assert not r.ok
|
|
|
|
def test_status_raising(self, httpbin):
|
|
r = requests.get(httpbin("status", "404"))
|
|
with pytest.raises(requests.exceptions.HTTPError):
|
|
r.raise_for_status()
|
|
|
|
r = requests.get(httpbin("status", "500"))
|
|
assert not r.ok
|
|
|
|
def test_decompress_gzip(self, httpbin):
|
|
r = requests.get(httpbin("gzip"))
|
|
r.content.decode("ascii")
|
|
|
|
@pytest.mark.parametrize(
|
|
"url, params",
|
|
(
|
|
("/get", {"foo": "føø"}),
|
|
("/get", {"føø": "føø"}),
|
|
("/get", {"føø": "føø"}),
|
|
("/get", {"foo": "foo"}),
|
|
("ø", {"foo": "foo"}),
|
|
),
|
|
)
|
|
def test_unicode_get(self, httpbin, url, params):
|
|
requests.get(httpbin(url), params=params)
|
|
|
|
def test_unicode_header_name(self, httpbin):
|
|
requests.put(
|
|
httpbin("put"),
|
|
headers={"Content-Type": "application/octet-stream"},
|
|
data="\xff",
|
|
) # compat.str is unicode.
|
|
|
|
def test_pyopenssl_redirect(self, httpbin_secure, httpbin_ca_bundle):
|
|
requests.get(httpbin_secure("status", "301"), verify=httpbin_ca_bundle)
|
|
|
|
def test_invalid_ca_certificate_path(self, httpbin_secure):
|
|
INVALID_PATH = "/garbage"
|
|
with pytest.raises(IOError) as e:
|
|
requests.get(httpbin_secure(), verify=INVALID_PATH)
|
|
assert str(
|
|
e.value
|
|
) == "Could not find a suitable TLS CA certificate bundle, invalid path: {}".format(
|
|
INVALID_PATH
|
|
)
|
|
|
|
def test_invalid_ssl_certificate_files(self, httpbin_secure):
|
|
INVALID_PATH = "/garbage"
|
|
with pytest.raises(IOError) as e:
|
|
requests.get(httpbin_secure(), cert=INVALID_PATH)
|
|
assert str(
|
|
e.value
|
|
) == "Could not find the TLS certificate file, invalid path: {}".format(
|
|
INVALID_PATH
|
|
)
|
|
|
|
with pytest.raises(IOError) as e:
|
|
requests.get(httpbin_secure(), cert=(".", INVALID_PATH))
|
|
assert str(e.value) == (
|
|
f"Could not find the TLS key file, invalid path: {INVALID_PATH}"
|
|
)
|
|
|
|
@pytest.mark.parametrize(
|
|
"env, expected",
|
|
(
|
|
({}, True),
|
|
({"REQUESTS_CA_BUNDLE": "/some/path"}, "/some/path"),
|
|
({"REQUESTS_CA_BUNDLE": ""}, True),
|
|
({"CURL_CA_BUNDLE": "/some/path"}, "/some/path"),
|
|
({"CURL_CA_BUNDLE": ""}, True),
|
|
({"REQUESTS_CA_BUNDLE": "", "CURL_CA_BUNDLE": ""}, True),
|
|
(
|
|
{
|
|
"REQUESTS_CA_BUNDLE": "/some/path",
|
|
"CURL_CA_BUNDLE": "/curl/path",
|
|
},
|
|
"/some/path",
|
|
),
|
|
(
|
|
{
|
|
"REQUESTS_CA_BUNDLE": "",
|
|
"CURL_CA_BUNDLE": "/curl/path",
|
|
},
|
|
"/curl/path",
|
|
),
|
|
),
|
|
)
|
|
def test_env_cert_bundles(self, httpbin, env, expected):
|
|
s = requests.Session()
|
|
with mock.patch("os.environ", env):
|
|
settings = s.merge_environment_settings(
|
|
url=httpbin("get"), proxies={}, stream=False, verify=True, cert=None
|
|
)
|
|
assert settings["verify"] == expected
|
|
|
|
def test_http_with_certificate(self, httpbin):
|
|
r = requests.get(httpbin(), cert=".")
|
|
assert r.status_code == 200
|
|
|
|
@pytest.mark.skipif(
|
|
SNIMissingWarning is None,
|
|
reason="urllib3 2.0 removed that warning and errors out instead",
|
|
)
|
|
def test_https_warnings(self, nosan_server):
|
|
"""warnings are emitted with requests.get"""
|
|
host, port, ca_bundle = nosan_server
|
|
if HAS_MODERN_SSL or HAS_PYOPENSSL:
|
|
warnings_expected = ("SubjectAltNameWarning",)
|
|
else:
|
|
warnings_expected = (
|
|
"SNIMissingWarning",
|
|
"InsecurePlatformWarning",
|
|
"SubjectAltNameWarning",
|
|
)
|
|
|
|
with pytest.warns(None) as warning_records:
|
|
warnings.simplefilter("always")
|
|
requests.get(f"https://localhost:{port}/", verify=ca_bundle)
|
|
|
|
warning_records = [
|
|
item
|
|
for item in warning_records
|
|
if item.category.__name__ != "ResourceWarning"
|
|
]
|
|
|
|
warnings_category = tuple(item.category.__name__ for item in warning_records)
|
|
assert warnings_category == warnings_expected
|
|
|
|
def test_certificate_failure(self, httpbin_secure):
|
|
"""
|
|
When underlying SSL problems occur, an SSLError is raised.
|
|
"""
|
|
with pytest.raises(RequestsSSLError):
|
|
# Our local httpbin does not have a trusted CA, so this call will
|
|
# fail if we use our default trust bundle.
|
|
requests.get(httpbin_secure("status", "200"))
|
|
|
|
def test_urlencoded_get_query_multivalued_param(self, httpbin):
|
|
r = requests.get(httpbin("get"), params={"test": ["foo", "baz"]})
|
|
assert r.status_code == 200
|
|
assert r.url == httpbin("get?test=foo&test=baz")
|
|
|
|
def test_form_encoded_post_query_multivalued_element(self, httpbin):
|
|
r = requests.Request(
|
|
method="POST", url=httpbin("post"), data=dict(test=["foo", "baz"])
|
|
)
|
|
prep = r.prepare()
|
|
assert prep.body == "test=foo&test=baz"
|
|
|
|
def test_different_encodings_dont_break_post(self, httpbin):
|
|
with open(__file__, "rb") as f:
|
|
r = requests.post(
|
|
httpbin("post"),
|
|
data={"stuff": json.dumps({"a": 123})},
|
|
params={"blah": "asdf1234"},
|
|
files={"file": ("test_requests.py", f)},
|
|
)
|
|
assert r.status_code == 200
|
|
|
|
@pytest.mark.parametrize(
|
|
"data",
|
|
(
|
|
{"stuff": "ëlïxr"},
|
|
{"stuff": "ëlïxr".encode()},
|
|
{"stuff": "elixr"},
|
|
{"stuff": b"elixr"},
|
|
),
|
|
)
|
|
def test_unicode_multipart_post(self, httpbin, data):
|
|
with open(__file__, "rb") as f:
|
|
r = requests.post(
|
|
httpbin("post"),
|
|
data=data,
|
|
files={"file": ("test_requests.py", f)},
|
|
)
|
|
assert r.status_code == 200
|
|
|
|
def test_unicode_multipart_post_fieldnames(self, httpbin):
|
|
filename = os.path.splitext(__file__)[0] + ".py"
|
|
with open(filename, "rb") as f:
|
|
r = requests.Request(
|
|
method="POST",
|
|
url=httpbin("post"),
|
|
data={b"stuff": "elixr"},
|
|
files={"file": ("test_requests.py", f)},
|
|
)
|
|
prep = r.prepare()
|
|
|
|
assert b'name="stuff"' in prep.body
|
|
assert b"name=\"b'stuff'\"" not in prep.body
|
|
|
|
def test_unicode_method_name(self, httpbin):
|
|
with open(__file__, "rb") as f:
|
|
files = {"file": f}
|
|
r = requests.request(
|
|
method="POST",
|
|
url=httpbin("post"),
|
|
files=files,
|
|
)
|
|
assert r.status_code == 200
|
|
|
|
def test_unicode_method_name_with_request_object(self, httpbin):
|
|
s = requests.Session()
|
|
with open(__file__, "rb") as f:
|
|
files = {"file": f}
|
|
req = requests.Request("POST", httpbin("post"), files=files)
|
|
prep = s.prepare_request(req)
|
|
assert isinstance(prep.method, builtin_str)
|
|
assert prep.method == "POST"
|
|
|
|
resp = s.send(prep)
|
|
assert resp.status_code == 200
|
|
|
|
def test_non_prepared_request_error(self):
|
|
s = requests.Session()
|
|
req = requests.Request("POST", "/")
|
|
|
|
with pytest.raises(ValueError) as e:
|
|
s.send(req)
|
|
assert str(e.value) == "You can only send PreparedRequests."
|
|
|
|
def test_custom_content_type(self, httpbin):
|
|
with open(__file__, "rb") as f1:
|
|
with open(__file__, "rb") as f2:
|
|
data = {"stuff": json.dumps({"a": 123})}
|
|
files = {
|
|
"file1": ("test_requests.py", f1),
|
|
"file2": ("test_requests", f2, "text/py-content-type"),
|
|
}
|
|
r = requests.post(httpbin("post"), data=data, files=files)
|
|
assert r.status_code == 200
|
|
assert b"text/py-content-type" in r.request.body
|
|
|
|
def test_hook_receives_request_arguments(self, httpbin):
|
|
def hook(resp, **kwargs):
|
|
assert resp is not None
|
|
assert kwargs != {}
|
|
|
|
s = requests.Session()
|
|
r = requests.Request("GET", httpbin(), hooks={"response": hook})
|
|
prep = s.prepare_request(r)
|
|
s.send(prep)
|
|
|
|
def test_session_hooks_are_used_with_no_request_hooks(self, httpbin):
|
|
def hook(*args, **kwargs):
|
|
pass
|
|
|
|
s = requests.Session()
|
|
s.hooks["response"].append(hook)
|
|
r = requests.Request("GET", httpbin())
|
|
prep = s.prepare_request(r)
|
|
assert prep.hooks["response"] != []
|
|
assert prep.hooks["response"] == [hook]
|
|
|
|
def test_session_hooks_are_overridden_by_request_hooks(self, httpbin):
|
|
def hook1(*args, **kwargs):
|
|
pass
|
|
|
|
def hook2(*args, **kwargs):
|
|
pass
|
|
|
|
assert hook1 is not hook2
|
|
s = requests.Session()
|
|
s.hooks["response"].append(hook2)
|
|
r = requests.Request("GET", httpbin(), hooks={"response": [hook1]})
|
|
prep = s.prepare_request(r)
|
|
assert prep.hooks["response"] == [hook1]
|
|
|
|
def test_prepared_request_hook(self, httpbin):
|
|
def hook(resp, **kwargs):
|
|
resp.hook_working = True
|
|
return resp
|
|
|
|
req = requests.Request("GET", httpbin(), hooks={"response": hook})
|
|
prep = req.prepare()
|
|
|
|
s = requests.Session()
|
|
s.proxies = getproxies()
|
|
resp = s.send(prep)
|
|
|
|
assert hasattr(resp, "hook_working")
|
|
|
|
def test_prepared_from_session(self, httpbin):
|
|
class DummyAuth(requests.auth.AuthBase):
|
|
def __call__(self, r):
|
|
r.headers["Dummy-Auth-Test"] = "dummy-auth-test-ok"
|
|
return r
|
|
|
|
req = requests.Request("GET", httpbin("headers"))
|
|
assert not req.auth
|
|
|
|
s = requests.Session()
|
|
s.auth = DummyAuth()
|
|
|
|
prep = s.prepare_request(req)
|
|
resp = s.send(prep)
|
|
|
|
assert resp.json()["headers"]["Dummy-Auth-Test"] == "dummy-auth-test-ok"
|
|
|
|
def test_prepare_request_with_bytestring_url(self):
|
|
req = requests.Request("GET", b"https://httpbin.org/")
|
|
s = requests.Session()
|
|
prep = s.prepare_request(req)
|
|
assert prep.url == "https://httpbin.org/"
|
|
|
|
def test_request_with_bytestring_host(self, httpbin):
|
|
s = requests.Session()
|
|
resp = s.request(
|
|
"GET",
|
|
httpbin("cookies/set?cookie=value"),
|
|
allow_redirects=False,
|
|
headers={"Host": b"httpbin.org"},
|
|
)
|
|
assert resp.cookies.get("cookie") == "value"
|
|
|
|
def test_links(self):
|
|
r = requests.Response()
|
|
r.headers = {
|
|
"cache-control": "public, max-age=60, s-maxage=60",
|
|
"connection": "keep-alive",
|
|
"content-encoding": "gzip",
|
|
"content-type": "application/json; charset=utf-8",
|
|
"date": "Sat, 26 Jan 2013 16:47:56 GMT",
|
|
"etag": '"6ff6a73c0e446c1f61614769e3ceb778"',
|
|
"last-modified": "Sat, 26 Jan 2013 16:22:39 GMT",
|
|
"link": (
|
|
"<https://api.github.com/users/kennethreitz/repos?"
|
|
'page=2&per_page=10>; rel="next", <https://api.github.'
|
|
"com/users/kennethreitz/repos?page=7&per_page=10>; "
|
|
' rel="last"'
|
|
),
|
|
"server": "GitHub.com",
|
|
"status": "200 OK",
|
|
"vary": "Accept",
|
|
"x-content-type-options": "nosniff",
|
|
"x-github-media-type": "github.beta",
|
|
"x-ratelimit-limit": "60",
|
|
"x-ratelimit-remaining": "57",
|
|
}
|
|
assert r.links["next"]["rel"] == "next"
|
|
|
|
def test_cookie_parameters(self):
|
|
key = "some_cookie"
|
|
value = "some_value"
|
|
secure = True
|
|
domain = "test.com"
|
|
rest = {"HttpOnly": True}
|
|
|
|
jar = requests.cookies.RequestsCookieJar()
|
|
jar.set(key, value, secure=secure, domain=domain, rest=rest)
|
|
|
|
assert len(jar) == 1
|
|
assert "some_cookie" in jar
|
|
|
|
cookie = list(jar)[0]
|
|
assert cookie.secure == secure
|
|
assert cookie.domain == domain
|
|
assert cookie._rest["HttpOnly"] == rest["HttpOnly"]
|
|
|
|
def test_cookie_as_dict_keeps_len(self):
|
|
key = "some_cookie"
|
|
value = "some_value"
|
|
|
|
key1 = "some_cookie1"
|
|
value1 = "some_value1"
|
|
|
|
jar = requests.cookies.RequestsCookieJar()
|
|
jar.set(key, value)
|
|
jar.set(key1, value1)
|
|
|
|
d1 = dict(jar)
|
|
d2 = dict(jar.iteritems())
|
|
d3 = dict(jar.items())
|
|
|
|
assert len(jar) == 2
|
|
assert len(d1) == 2
|
|
assert len(d2) == 2
|
|
assert len(d3) == 2
|
|
|
|
def test_cookie_as_dict_keeps_items(self):
|
|
key = "some_cookie"
|
|
value = "some_value"
|
|
|
|
key1 = "some_cookie1"
|
|
value1 = "some_value1"
|
|
|
|
jar = requests.cookies.RequestsCookieJar()
|
|
jar.set(key, value)
|
|
jar.set(key1, value1)
|
|
|
|
d1 = dict(jar)
|
|
d2 = dict(jar.iteritems())
|
|
d3 = dict(jar.items())
|
|
|
|
assert d1["some_cookie"] == "some_value"
|
|
assert d2["some_cookie"] == "some_value"
|
|
assert d3["some_cookie1"] == "some_value1"
|
|
|
|
def test_cookie_as_dict_keys(self):
|
|
key = "some_cookie"
|
|
value = "some_value"
|
|
|
|
key1 = "some_cookie1"
|
|
value1 = "some_value1"
|
|
|
|
jar = requests.cookies.RequestsCookieJar()
|
|
jar.set(key, value)
|
|
jar.set(key1, value1)
|
|
|
|
keys = jar.keys()
|
|
assert keys == list(keys)
|
|
# make sure one can use keys multiple times
|
|
assert list(keys) == list(keys)
|
|
|
|
def test_cookie_as_dict_values(self):
|
|
key = "some_cookie"
|
|
value = "some_value"
|
|
|
|
key1 = "some_cookie1"
|
|
value1 = "some_value1"
|
|
|
|
jar = requests.cookies.RequestsCookieJar()
|
|
jar.set(key, value)
|
|
jar.set(key1, value1)
|
|
|
|
values = jar.values()
|
|
assert values == list(values)
|
|
# make sure one can use values multiple times
|
|
assert list(values) == list(values)
|
|
|
|
def test_cookie_as_dict_items(self):
|
|
key = "some_cookie"
|
|
value = "some_value"
|
|
|
|
key1 = "some_cookie1"
|
|
value1 = "some_value1"
|
|
|
|
jar = requests.cookies.RequestsCookieJar()
|
|
jar.set(key, value)
|
|
jar.set(key1, value1)
|
|
|
|
items = jar.items()
|
|
assert items == list(items)
|
|
# make sure one can use items multiple times
|
|
assert list(items) == list(items)
|
|
|
|
def test_cookie_duplicate_names_different_domains(self):
|
|
key = "some_cookie"
|
|
value = "some_value"
|
|
domain1 = "test1.com"
|
|
domain2 = "test2.com"
|
|
|
|
jar = requests.cookies.RequestsCookieJar()
|
|
jar.set(key, value, domain=domain1)
|
|
jar.set(key, value, domain=domain2)
|
|
assert key in jar
|
|
items = jar.items()
|
|
assert len(items) == 2
|
|
|
|
# Verify that CookieConflictError is raised if domain is not specified
|
|
with pytest.raises(requests.cookies.CookieConflictError):
|
|
jar.get(key)
|
|
|
|
# Verify that CookieConflictError is not raised if domain is specified
|
|
cookie = jar.get(key, domain=domain1)
|
|
assert cookie == value
|
|
|
|
def test_cookie_duplicate_names_raises_cookie_conflict_error(self):
|
|
key = "some_cookie"
|
|
value = "some_value"
|
|
path = "some_path"
|
|
|
|
jar = requests.cookies.RequestsCookieJar()
|
|
jar.set(key, value, path=path)
|
|
jar.set(key, value)
|
|
with pytest.raises(requests.cookies.CookieConflictError):
|
|
jar.get(key)
|
|
|
|
def test_cookie_policy_copy(self):
|
|
class MyCookiePolicy(cookielib.DefaultCookiePolicy):
|
|
pass
|
|
|
|
jar = requests.cookies.RequestsCookieJar()
|
|
jar.set_policy(MyCookiePolicy())
|
|
assert isinstance(jar.copy().get_policy(), MyCookiePolicy)
|
|
|
|
def test_time_elapsed_blank(self, httpbin):
|
|
r = requests.get(httpbin("get"))
|
|
td = r.elapsed
|
|
total_seconds = (
|
|
td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6
|
|
) / 10**6
|
|
assert total_seconds > 0.0
|
|
|
|
def test_empty_response_has_content_none(self):
|
|
r = requests.Response()
|
|
assert r.content is None
|
|
|
|
def test_response_is_iterable(self):
|
|
r = requests.Response()
|
|
io = StringIO.StringIO("abc")
|
|
read_ = io.read
|
|
|
|
def read_mock(amt, decode_content=None):
|
|
return read_(amt)
|
|
|
|
setattr(io, "read", read_mock)
|
|
r.raw = io
|
|
assert next(iter(r))
|
|
io.close()
|
|
|
|
def test_response_decode_unicode(self):
|
|
"""When called with decode_unicode, Response.iter_content should always
|
|
return unicode.
|
|
"""
|
|
r = requests.Response()
|
|
r._content_consumed = True
|
|
r._content = b"the content"
|
|
r.encoding = "ascii"
|
|
|
|
chunks = r.iter_content(decode_unicode=True)
|
|
assert all(isinstance(chunk, str) for chunk in chunks)
|
|
|
|
# also for streaming
|
|
r = requests.Response()
|
|
r.raw = io.BytesIO(b"the content")
|
|
r.encoding = "ascii"
|
|
chunks = r.iter_content(decode_unicode=True)
|
|
assert all(isinstance(chunk, str) for chunk in chunks)
|
|
|
|
def test_response_reason_unicode(self):
|
|
# check for unicode HTTP status
|
|
r = requests.Response()
|
|
r.url = "unicode URL"
|
|
r.reason = "Komponenttia ei löydy".encode()
|
|
r.status_code = 404
|
|
r.encoding = None
|
|
assert not r.ok # old behaviour - crashes here
|
|
|
|
def test_response_reason_unicode_fallback(self):
|
|
# check raise_status falls back to ISO-8859-1
|
|
r = requests.Response()
|
|
r.url = "some url"
|
|
reason = "Komponenttia ei löydy"
|
|
r.reason = reason.encode("latin-1")
|
|
r.status_code = 500
|
|
r.encoding = None
|
|
with pytest.raises(requests.exceptions.HTTPError) as e:
|
|
r.raise_for_status()
|
|
assert reason in e.value.args[0]
|
|
|
|
def test_response_chunk_size_type(self):
|
|
"""Ensure that chunk_size is passed as None or an integer, otherwise
|
|
raise a TypeError.
|
|
"""
|
|
r = requests.Response()
|
|
r.raw = io.BytesIO(b"the content")
|
|
chunks = r.iter_content(1)
|
|
assert all(len(chunk) == 1 for chunk in chunks)
|
|
|
|
r = requests.Response()
|
|
r.raw = io.BytesIO(b"the content")
|
|
chunks = r.iter_content(None)
|
|
assert list(chunks) == [b"the content"]
|
|
|
|
r = requests.Response()
|
|
r.raw = io.BytesIO(b"the content")
|
|
with pytest.raises(TypeError):
|
|
chunks = r.iter_content("1024")
|
|
|
|
@pytest.mark.parametrize(
|
|
"exception, args, expected",
|
|
(
|
|
(urllib3.exceptions.ProtocolError, tuple(), ChunkedEncodingError),
|
|
(urllib3.exceptions.DecodeError, tuple(), ContentDecodingError),
|
|
(urllib3.exceptions.ReadTimeoutError, (None, "", ""), ConnectionError),
|
|
(urllib3.exceptions.SSLError, tuple(), RequestsSSLError),
|
|
),
|
|
)
|
|
def test_iter_content_wraps_exceptions(self, httpbin, exception, args, expected):
|
|
r = requests.Response()
|
|
r.raw = mock.Mock()
|
|
# ReadTimeoutError can't be initialized by mock
|
|
# so we'll manually create the instance with args
|
|
r.raw.stream.side_effect = exception(*args)
|
|
|
|
with pytest.raises(expected):
|
|
next(r.iter_content(1024))
|
|
|
|
def test_request_and_response_are_pickleable(self, httpbin):
|
|
r = requests.get(httpbin("get"))
|
|
|
|
# verify we can pickle the original request
|
|
assert pickle.loads(pickle.dumps(r.request))
|
|
|
|
# verify we can pickle the response and that we have access to
|
|
# the original request.
|
|
pr = pickle.loads(pickle.dumps(r))
|
|
assert r.request.url == pr.request.url
|
|
assert r.request.headers == pr.request.headers
|
|
|
|
def test_prepared_request_is_pickleable(self, httpbin):
|
|
p = requests.Request("GET", httpbin("get")).prepare()
|
|
|
|
# Verify PreparedRequest can be pickled and unpickled
|
|
r = pickle.loads(pickle.dumps(p))
|
|
assert r.url == p.url
|
|
assert r.headers == p.headers
|
|
assert r.body == p.body
|
|
|
|
# Verify unpickled PreparedRequest sends properly
|
|
s = requests.Session()
|
|
resp = s.send(r)
|
|
assert resp.status_code == 200
|
|
|
|
def test_prepared_request_with_file_is_pickleable(self, httpbin):
|
|
with open(__file__, "rb") as f:
|
|
r = requests.Request("POST", httpbin("post"), files={"file": f})
|
|
p = r.prepare()
|
|
|
|
# Verify PreparedRequest can be pickled and unpickled
|
|
r = pickle.loads(pickle.dumps(p))
|
|
assert r.url == p.url
|
|
assert r.headers == p.headers
|
|
assert r.body == p.body
|
|
|
|
# Verify unpickled PreparedRequest sends properly
|
|
s = requests.Session()
|
|
resp = s.send(r)
|
|
assert resp.status_code == 200
|
|
|
|
def test_prepared_request_with_hook_is_pickleable(self, httpbin):
|
|
r = requests.Request("GET", httpbin("get"), hooks=default_hooks())
|
|
p = r.prepare()
|
|
|
|
# Verify PreparedRequest can be pickled
|
|
r = pickle.loads(pickle.dumps(p))
|
|
assert r.url == p.url
|
|
assert r.headers == p.headers
|
|
assert r.body == p.body
|
|
assert r.hooks == p.hooks
|
|
|
|
# Verify unpickled PreparedRequest sends properly
|
|
s = requests.Session()
|
|
resp = s.send(r)
|
|
assert resp.status_code == 200
|
|
|
|
def test_cannot_send_unprepared_requests(self, httpbin):
|
|
r = requests.Request(url=httpbin())
|
|
with pytest.raises(ValueError):
|
|
requests.Session().send(r)
|
|
|
|
def test_http_error(self):
|
|
error = requests.exceptions.HTTPError()
|
|
assert not error.response
|
|
response = requests.Response()
|
|
error = requests.exceptions.HTTPError(response=response)
|
|
assert error.response == response
|
|
error = requests.exceptions.HTTPError("message", response=response)
|
|
assert str(error) == "message"
|
|
assert error.response == response
|
|
|
|
def test_session_pickling(self, httpbin):
|
|
r = requests.Request("GET", httpbin("get"))
|
|
s = requests.Session()
|
|
|
|
s = pickle.loads(pickle.dumps(s))
|
|
s.proxies = getproxies()
|
|
|
|
r = s.send(r.prepare())
|
|
assert r.status_code == 200
|
|
|
|
def test_fixes_1329(self, httpbin):
|
|
"""Ensure that header updates are done case-insensitively."""
|
|
s = requests.Session()
|
|
s.headers.update({"ACCEPT": "BOGUS"})
|
|
s.headers.update({"accept": "application/json"})
|
|
r = s.get(httpbin("get"))
|
|
headers = r.request.headers
|
|
assert headers["accept"] == "application/json"
|
|
assert headers["Accept"] == "application/json"
|
|
assert headers["ACCEPT"] == "application/json"
|
|
|
|
def test_uppercase_scheme_redirect(self, httpbin):
|
|
parts = urlparse(httpbin("html"))
|
|
url = "HTTP://" + parts.netloc + parts.path
|
|
r = requests.get(httpbin("redirect-to"), params={"url": url})
|
|
assert r.status_code == 200
|
|
assert r.url.lower() == url.lower()
|
|
|
|
def test_transport_adapter_ordering(self):
|
|
s = requests.Session()
|
|
order = ["https://", "http://"]
|
|
assert order == list(s.adapters)
|
|
s.mount("http://git", HTTPAdapter())
|
|
s.mount("http://github", HTTPAdapter())
|
|
s.mount("http://github.com", HTTPAdapter())
|
|
s.mount("http://github.com/about/", HTTPAdapter())
|
|
order = [
|
|
"http://github.com/about/",
|
|
"http://github.com",
|
|
"http://github",
|
|
"http://git",
|
|
"https://",
|
|
"http://",
|
|
]
|
|
assert order == list(s.adapters)
|
|
s.mount("http://gittip", HTTPAdapter())
|
|
s.mount("http://gittip.com", HTTPAdapter())
|
|
s.mount("http://gittip.com/about/", HTTPAdapter())
|
|
order = [
|
|
"http://github.com/about/",
|
|
"http://gittip.com/about/",
|
|
"http://github.com",
|
|
"http://gittip.com",
|
|
"http://github",
|
|
"http://gittip",
|
|
"http://git",
|
|
"https://",
|
|
"http://",
|
|
]
|
|
assert order == list(s.adapters)
|
|
s2 = requests.Session()
|
|
s2.adapters = {"http://": HTTPAdapter()}
|
|
s2.mount("https://", HTTPAdapter())
|
|
assert "http://" in s2.adapters
|
|
assert "https://" in s2.adapters
|
|
|
|
def test_session_get_adapter_prefix_matching(self):
|
|
prefix = "https://example.com"
|
|
more_specific_prefix = prefix + "/some/path"
|
|
|
|
url_matching_only_prefix = prefix + "/another/path"
|
|
url_matching_more_specific_prefix = more_specific_prefix + "/longer/path"
|
|
url_not_matching_prefix = "https://another.example.com/"
|
|
|
|
s = requests.Session()
|
|
prefix_adapter = HTTPAdapter()
|
|
more_specific_prefix_adapter = HTTPAdapter()
|
|
s.mount(prefix, prefix_adapter)
|
|
s.mount(more_specific_prefix, more_specific_prefix_adapter)
|
|
|
|
assert s.get_adapter(url_matching_only_prefix) is prefix_adapter
|
|
assert (
|
|
s.get_adapter(url_matching_more_specific_prefix)
|
|
is more_specific_prefix_adapter
|
|
)
|
|
assert s.get_adapter(url_not_matching_prefix) not in (
|
|
prefix_adapter,
|
|
more_specific_prefix_adapter,
|
|
)
|
|
|
|
def test_session_get_adapter_prefix_matching_mixed_case(self):
|
|
mixed_case_prefix = "hTtPs://eXamPle.CoM/MixEd_CAse_PREfix"
|
|
url_matching_prefix = mixed_case_prefix + "/full_url"
|
|
|
|
s = requests.Session()
|
|
my_adapter = HTTPAdapter()
|
|
s.mount(mixed_case_prefix, my_adapter)
|
|
|
|
assert s.get_adapter(url_matching_prefix) is my_adapter
|
|
|
|
def test_session_get_adapter_prefix_matching_is_case_insensitive(self):
|
|
mixed_case_prefix = "hTtPs://eXamPle.CoM/MixEd_CAse_PREfix"
|
|
url_matching_prefix_with_different_case = (
|
|
"HtTpS://exaMPLe.cOm/MiXeD_caSE_preFIX/another_url"
|
|
)
|
|
|
|
s = requests.Session()
|
|
my_adapter = HTTPAdapter()
|
|
s.mount(mixed_case_prefix, my_adapter)
|
|
|
|
assert s.get_adapter(url_matching_prefix_with_different_case) is my_adapter
|
|
|
|
def test_header_remove_is_case_insensitive(self, httpbin):
|
|
# From issue #1321
|
|
s = requests.Session()
|
|
s.headers["foo"] = "bar"
|
|
r = s.get(httpbin("get"), headers={"FOO": None})
|
|
assert "foo" not in r.request.headers
|
|
|
|
def test_params_are_merged_case_sensitive(self, httpbin):
|
|
s = requests.Session()
|
|
s.params["foo"] = "bar"
|
|
r = s.get(httpbin("get"), params={"FOO": "bar"})
|
|
assert r.json()["args"] == {"foo": "bar", "FOO": "bar"}
|
|
|
|
def test_long_authinfo_in_url(self):
|
|
url = "http://{}:{}@{}:9000/path?query#frag".format(
|
|
"E8A3BE87-9E3F-4620-8858-95478E385B5B",
|
|
"EA770032-DA4D-4D84-8CE9-29C6D910BF1E",
|
|
"exactly-------------sixty-----------three------------characters",
|
|
)
|
|
r = requests.Request("GET", url).prepare()
|
|
assert r.url == url
|
|
|
|
def test_header_keys_are_native(self, httpbin):
|
|
headers = {"unicode": "blah", b"byte": "blah"}
|
|
r = requests.Request("GET", httpbin("get"), headers=headers)
|
|
p = r.prepare()
|
|
|
|
# This is testing that they are builtin strings. A bit weird, but there
|
|
# we go.
|
|
assert "unicode" in p.headers.keys()
|
|
assert "byte" in p.headers.keys()
|
|
|
|
def test_header_validation(self, httpbin):
|
|
"""Ensure prepare_headers regex isn't flagging valid header contents."""
|
|
valid_headers = {
|
|
"foo": "bar baz qux",
|
|
"bar": b"fbbq",
|
|
"baz": "",
|
|
"qux": "1",
|
|
}
|
|
r = requests.get(httpbin("get"), headers=valid_headers)
|
|
for key in valid_headers.keys():
|
|
assert valid_headers[key] == r.request.headers[key]
|
|
|
|
@pytest.mark.parametrize(
|
|
"invalid_header, key",
|
|
(
|
|
({"foo": 3}, "foo"),
|
|
({"bar": {"foo": "bar"}}, "bar"),
|
|
({"baz": ["foo", "bar"]}, "baz"),
|
|
),
|
|
)
|
|
def test_header_value_not_str(self, httpbin, invalid_header, key):
|
|
"""Ensure the header value is of type string or bytes as
|
|
per discussion in GH issue #3386
|
|
"""
|
|
with pytest.raises(InvalidHeader) as excinfo:
|
|
requests.get(httpbin("get"), headers=invalid_header)
|
|
assert key in str(excinfo.value)
|
|
|
|
@pytest.mark.parametrize(
|
|
"invalid_header",
|
|
(
|
|
{"foo": "bar\r\nbaz: qux"},
|
|
{"foo": "bar\n\rbaz: qux"},
|
|
{"foo": "bar\nbaz: qux"},
|
|
{"foo": "bar\rbaz: qux"},
|
|
{"fo\ro": "bar"},
|
|
{"fo\r\no": "bar"},
|
|
{"fo\n\ro": "bar"},
|
|
{"fo\no": "bar"},
|
|
),
|
|
)
|
|
def test_header_no_return_chars(self, httpbin, invalid_header):
|
|
"""Ensure that a header containing return character sequences raise an
|
|
exception. Otherwise, multiple headers are created from single string.
|
|
"""
|
|
with pytest.raises(InvalidHeader):
|
|
requests.get(httpbin("get"), headers=invalid_header)
|
|
|
|
@pytest.mark.parametrize(
|
|
"invalid_header",
|
|
(
|
|
{" foo": "bar"},
|
|
{"\tfoo": "bar"},
|
|
{" foo": "bar"},
|
|
{"foo": " bar"},
|
|
{"foo": " bar"},
|
|
{"foo": "\tbar"},
|
|
{" ": "bar"},
|
|
),
|
|
)
|
|
def test_header_no_leading_space(self, httpbin, invalid_header):
|
|
"""Ensure headers containing leading whitespace raise
|
|
InvalidHeader Error before sending.
|
|
"""
|
|
with pytest.raises(InvalidHeader):
|
|
requests.get(httpbin("get"), headers=invalid_header)
|
|
|
|
def test_header_with_subclass_types(self, httpbin):
|
|
"""If the subclasses does not behave *exactly* like
|
|
the base bytes/str classes, this is not supported.
|
|
This test is for backwards compatibility.
|
|
"""
|
|
|
|
class MyString(str):
|
|
pass
|
|
|
|
class MyBytes(bytes):
|
|
pass
|
|
|
|
r_str = requests.get(httpbin("get"), headers={MyString("x-custom"): "myheader"})
|
|
assert r_str.request.headers["x-custom"] == "myheader"
|
|
|
|
r_bytes = requests.get(
|
|
httpbin("get"), headers={MyBytes(b"x-custom"): b"myheader"}
|
|
)
|
|
assert r_bytes.request.headers["x-custom"] == b"myheader"
|
|
|
|
r_mixed = requests.get(
|
|
httpbin("get"), headers={MyString("x-custom"): MyBytes(b"myheader")}
|
|
)
|
|
assert r_mixed.request.headers["x-custom"] == b"myheader"
|
|
|
|
@pytest.mark.parametrize("files", ("foo", b"foo", bytearray(b"foo")))
|
|
def test_can_send_objects_with_files(self, httpbin, files):
|
|
data = {"a": "this is a string"}
|
|
files = {"b": files}
|
|
r = requests.Request("POST", httpbin("post"), data=data, files=files)
|
|
p = r.prepare()
|
|
assert "multipart/form-data" in p.headers["Content-Type"]
|
|
|
|
def test_can_send_file_object_with_non_string_filename(self, httpbin):
|
|
f = io.BytesIO()
|
|
f.name = 2
|
|
r = requests.Request("POST", httpbin("post"), files={"f": f})
|
|
p = r.prepare()
|
|
|
|
assert "multipart/form-data" in p.headers["Content-Type"]
|
|
|
|
def test_autoset_header_values_are_native(self, httpbin):
|
|
data = "this is a string"
|
|
length = "16"
|
|
req = requests.Request("POST", httpbin("post"), data=data)
|
|
p = req.prepare()
|
|
|
|
assert p.headers["Content-Length"] == length
|
|
|
|
def test_nonhttp_schemes_dont_check_URLs(self):
|
|
test_urls = (
|
|
"data:image/gif;base64,R0lGODlhAQABAHAAACH5BAUAAAAALAAAAAABAAEAAAICRAEAOw==",
|
|
"file:///etc/passwd",
|
|
"magnet:?xt=urn:btih:be08f00302bc2d1d3cfa3af02024fa647a271431",
|
|
)
|
|
for test_url in test_urls:
|
|
req = requests.Request("GET", test_url)
|
|
preq = req.prepare()
|
|
assert test_url == preq.url
|
|
|
|
def test_auth_is_stripped_on_http_downgrade(
|
|
self, httpbin, httpbin_secure, httpbin_ca_bundle
|
|
):
|
|
r = requests.get(
|
|
httpbin_secure("redirect-to"),
|
|
params={"url": httpbin("get")},
|
|
auth=("user", "pass"),
|
|
verify=httpbin_ca_bundle,
|
|
)
|
|
assert r.history[0].request.headers["Authorization"]
|
|
assert "Authorization" not in r.request.headers
|
|
|
|
def test_auth_is_retained_for_redirect_on_host(self, httpbin):
|
|
r = requests.get(httpbin("redirect/1"), auth=("user", "pass"))
|
|
h1 = r.history[0].request.headers["Authorization"]
|
|
h2 = r.request.headers["Authorization"]
|
|
|
|
assert h1 == h2
|
|
|
|
def test_should_strip_auth_host_change(self):
|
|
s = requests.Session()
|
|
assert s.should_strip_auth(
|
|
"http://example.com/foo", "http://another.example.com/"
|
|
)
|
|
|
|
def test_should_strip_auth_http_downgrade(self):
|
|
s = requests.Session()
|
|
assert s.should_strip_auth("https://example.com/foo", "http://example.com/bar")
|
|
|
|
def test_should_strip_auth_https_upgrade(self):
|
|
s = requests.Session()
|
|
assert not s.should_strip_auth(
|
|
"http://example.com/foo", "https://example.com/bar"
|
|
)
|
|
assert not s.should_strip_auth(
|
|
"http://example.com:80/foo", "https://example.com/bar"
|
|
)
|
|
assert not s.should_strip_auth(
|
|
"http://example.com/foo", "https://example.com:443/bar"
|
|
)
|
|
# Non-standard ports should trigger stripping
|
|
assert s.should_strip_auth(
|
|
"http://example.com:8080/foo", "https://example.com/bar"
|
|
)
|
|
assert s.should_strip_auth(
|
|
"http://example.com/foo", "https://example.com:8443/bar"
|
|
)
|
|
|
|
def test_should_strip_auth_port_change(self):
|
|
s = requests.Session()
|
|
assert s.should_strip_auth(
|
|
"http://example.com:1234/foo", "https://example.com:4321/bar"
|
|
)
|
|
|
|
@pytest.mark.parametrize(
|
|
"old_uri, new_uri",
|
|
(
|
|
("https://example.com:443/foo", "https://example.com/bar"),
|
|
("http://example.com:80/foo", "http://example.com/bar"),
|
|
("https://example.com/foo", "https://example.com:443/bar"),
|
|
("http://example.com/foo", "http://example.com:80/bar"),
|
|
),
|
|
)
|
|
def test_should_strip_auth_default_port(self, old_uri, new_uri):
|
|
s = requests.Session()
|
|
assert not s.should_strip_auth(old_uri, new_uri)
|
|
|
|
def test_manual_redirect_with_partial_body_read(self, httpbin):
|
|
s = requests.Session()
|
|
r1 = s.get(httpbin("redirect/2"), allow_redirects=False, stream=True)
|
|
assert r1.is_redirect
|
|
rg = s.resolve_redirects(r1, r1.request, stream=True)
|
|
|
|
# read only the first eight bytes of the response body,
|
|
# then follow the redirect
|
|
r1.iter_content(8)
|
|
r2 = next(rg)
|
|
assert r2.is_redirect
|
|
|
|
# read all of the response via iter_content,
|
|
# then follow the redirect
|
|
for _ in r2.iter_content():
|
|
pass
|
|
r3 = next(rg)
|
|
assert not r3.is_redirect
|
|
|
|
def test_prepare_body_position_non_stream(self):
|
|
data = b"the data"
|
|
prep = requests.Request("GET", "http://example.com", data=data).prepare()
|
|
assert prep._body_position is None
|
|
|
|
def test_rewind_body(self):
|
|
data = io.BytesIO(b"the data")
|
|
prep = requests.Request("GET", "http://example.com", data=data).prepare()
|
|
assert prep._body_position == 0
|
|
assert prep.body.read() == b"the data"
|
|
|
|
# the data has all been read
|
|
assert prep.body.read() == b""
|
|
|
|
# rewind it back
|
|
requests.utils.rewind_body(prep)
|
|
assert prep.body.read() == b"the data"
|
|
|
|
def test_rewind_partially_read_body(self):
|
|
data = io.BytesIO(b"the data")
|
|
data.read(4) # read some data
|
|
prep = requests.Request("GET", "http://example.com", data=data).prepare()
|
|
assert prep._body_position == 4
|
|
assert prep.body.read() == b"data"
|
|
|
|
# the data has all been read
|
|
assert prep.body.read() == b""
|
|
|
|
# rewind it back
|
|
requests.utils.rewind_body(prep)
|
|
assert prep.body.read() == b"data"
|
|
|
|
def test_rewind_body_no_seek(self):
|
|
class BadFileObj:
|
|
def __init__(self, data):
|
|
self.data = data
|
|
|
|
def tell(self):
|
|
return 0
|
|
|
|
def __iter__(self):
|
|
return
|
|
|
|
data = BadFileObj("the data")
|
|
prep = requests.Request("GET", "http://example.com", data=data).prepare()
|
|
assert prep._body_position == 0
|
|
|
|
with pytest.raises(UnrewindableBodyError) as e:
|
|
requests.utils.rewind_body(prep)
|
|
|
|
assert "Unable to rewind request body" in str(e)
|
|
|
|
def test_rewind_body_failed_seek(self):
|
|
class BadFileObj:
|
|
def __init__(self, data):
|
|
self.data = data
|
|
|
|
def tell(self):
|
|
return 0
|
|
|
|
def seek(self, pos, whence=0):
|
|
raise OSError()
|
|
|
|
def __iter__(self):
|
|
return
|
|
|
|
data = BadFileObj("the data")
|
|
prep = requests.Request("GET", "http://example.com", data=data).prepare()
|
|
assert prep._body_position == 0
|
|
|
|
with pytest.raises(UnrewindableBodyError) as e:
|
|
requests.utils.rewind_body(prep)
|
|
|
|
assert "error occurred when rewinding request body" in str(e)
|
|
|
|
def test_rewind_body_failed_tell(self):
|
|
class BadFileObj:
|
|
def __init__(self, data):
|
|
self.data = data
|
|
|
|
def tell(self):
|
|
raise OSError()
|
|
|
|
def __iter__(self):
|
|
return
|
|
|
|
data = BadFileObj("the data")
|
|
prep = requests.Request("GET", "http://example.com", data=data).prepare()
|
|
assert prep._body_position is not None
|
|
|
|
with pytest.raises(UnrewindableBodyError) as e:
|
|
requests.utils.rewind_body(prep)
|
|
|
|
assert "Unable to rewind request body" in str(e)
|
|
|
|
def _patch_adapter_gzipped_redirect(self, session, url):
|
|
adapter = session.get_adapter(url=url)
|
|
org_build_response = adapter.build_response
|
|
self._patched_response = False
|
|
|
|
def build_response(*args, **kwargs):
|
|
resp = org_build_response(*args, **kwargs)
|
|
if not self._patched_response:
|
|
resp.raw.headers["content-encoding"] = "gzip"
|
|
self._patched_response = True
|
|
return resp
|
|
|
|
adapter.build_response = build_response
|
|
|
|
def test_redirect_with_wrong_gzipped_header(self, httpbin):
|
|
s = requests.Session()
|
|
url = httpbin("redirect/1")
|
|
self._patch_adapter_gzipped_redirect(s, url)
|
|
s.get(url)
|
|
|
|
@pytest.mark.parametrize(
|
|
"username, password, auth_str",
|
|
(
|
|
("test", "test", "Basic dGVzdDp0ZXN0"),
|
|
(
|
|
"имя".encode(),
|
|
"пароль".encode(),
|
|
"Basic 0LjQvNGPOtC/0LDRgNC+0LvRjA==",
|
|
),
|
|
),
|
|
)
|
|
def test_basic_auth_str_is_always_native(self, username, password, auth_str):
|
|
s = _basic_auth_str(username, password)
|
|
assert isinstance(s, builtin_str)
|
|
assert s == auth_str
|
|
|
|
def test_requests_history_is_saved(self, httpbin):
|
|
r = requests.get(httpbin("redirect/5"))
|
|
total = r.history[-1].history
|
|
i = 0
|
|
for item in r.history:
|
|
assert item.history == total[0:i]
|
|
i += 1
|
|
|
|
def test_json_param_post_content_type_works(self, httpbin):
|
|
r = requests.post(httpbin("post"), json={"life": 42})
|
|
assert r.status_code == 200
|
|
assert "application/json" in r.request.headers["Content-Type"]
|
|
assert {"life": 42} == r.json()["json"]
|
|
|
|
def test_json_param_post_should_not_override_data_param(self, httpbin):
|
|
r = requests.Request(
|
|
method="POST",
|
|
url=httpbin("post"),
|
|
data={"stuff": "elixr"},
|
|
json={"music": "flute"},
|
|
)
|
|
prep = r.prepare()
|
|
assert "stuff=elixr" == prep.body
|
|
|
|
def test_response_iter_lines(self, httpbin):
|
|
r = requests.get(httpbin("stream/4"), stream=True)
|
|
assert r.status_code == 200
|
|
|
|
it = r.iter_lines()
|
|
next(it)
|
|
assert len(list(it)) == 3
|
|
|
|
def test_response_context_manager(self, httpbin):
|
|
with requests.get(httpbin("stream/4"), stream=True) as response:
|
|
assert isinstance(response, requests.Response)
|
|
|
|
assert response.raw.closed
|
|
|
|
def test_unconsumed_session_response_closes_connection(self, httpbin):
|
|
s = requests.session()
|
|
|
|
with contextlib.closing(s.get(httpbin("stream/4"), stream=True)) as response:
|
|
pass
|
|
|
|
assert response._content_consumed is False
|
|
assert response.raw.closed
|
|
|
|
@pytest.mark.xfail
|
|
def test_response_iter_lines_reentrant(self, httpbin):
|
|
"""Response.iter_lines() is not reentrant safe"""
|
|
r = requests.get(httpbin("stream/4"), stream=True)
|
|
assert r.status_code == 200
|
|
|
|
next(r.iter_lines())
|
|
assert len(list(r.iter_lines())) == 3
|
|
|
|
def test_session_close_proxy_clear(self):
|
|
proxies = {
|
|
"one": mock.Mock(),
|
|
"two": mock.Mock(),
|
|
}
|
|
session = requests.Session()
|
|
with mock.patch.dict(session.adapters["http://"].proxy_manager, proxies):
|
|
session.close()
|
|
proxies["one"].clear.assert_called_once_with()
|
|
proxies["two"].clear.assert_called_once_with()
|
|
|
|
def test_proxy_auth(self):
|
|
adapter = HTTPAdapter()
|
|
headers = adapter.proxy_headers("http://user:pass@httpbin.org")
|
|
assert headers == {"Proxy-Authorization": "Basic dXNlcjpwYXNz"}
|
|
|
|
def test_proxy_auth_empty_pass(self):
|
|
adapter = HTTPAdapter()
|
|
headers = adapter.proxy_headers("http://user:@httpbin.org")
|
|
assert headers == {"Proxy-Authorization": "Basic dXNlcjo="}
|
|
|
|
def test_response_json_when_content_is_None(self, httpbin):
|
|
r = requests.get(httpbin("/status/204"))
|
|
# Make sure r.content is None
|
|
r.status_code = 0
|
|
r._content = False
|
|
r._content_consumed = False
|
|
|
|
assert r.content is None
|
|
with pytest.raises(ValueError):
|
|
r.json()
|
|
|
|
def test_response_without_release_conn(self):
|
|
"""Test `close` call for non-urllib3-like raw objects.
|
|
Should work when `release_conn` attr doesn't exist on `response.raw`.
|
|
"""
|
|
resp = requests.Response()
|
|
resp.raw = StringIO.StringIO("test")
|
|
assert not resp.raw.closed
|
|
resp.close()
|
|
assert resp.raw.closed
|
|
|
|
def test_empty_stream_with_auth_does_not_set_content_length_header(self, httpbin):
|
|
"""Ensure that a byte stream with size 0 will not set both a Content-Length
|
|
and Transfer-Encoding header.
|
|
"""
|
|
auth = ("user", "pass")
|
|
url = httpbin("post")
|
|
file_obj = io.BytesIO(b"")
|
|
r = requests.Request("POST", url, auth=auth, data=file_obj)
|
|
prepared_request = r.prepare()
|
|
assert "Transfer-Encoding" in prepared_request.headers
|
|
assert "Content-Length" not in prepared_request.headers
|
|
|
|
def test_stream_with_auth_does_not_set_transfer_encoding_header(self, httpbin):
|
|
"""Ensure that a byte stream with size > 0 will not set both a Content-Length
|
|
and Transfer-Encoding header.
|
|
"""
|
|
auth = ("user", "pass")
|
|
url = httpbin("post")
|
|
file_obj = io.BytesIO(b"test data")
|
|
r = requests.Request("POST", url, auth=auth, data=file_obj)
|
|
prepared_request = r.prepare()
|
|
assert "Transfer-Encoding" not in prepared_request.headers
|
|
assert "Content-Length" in prepared_request.headers
|
|
|
|
def test_chunked_upload_does_not_set_content_length_header(self, httpbin):
|
|
"""Ensure that requests with a generator body stream using
|
|
Transfer-Encoding: chunked, not a Content-Length header.
|
|
"""
|
|
data = (i for i in [b"a", b"b", b"c"])
|
|
url = httpbin("post")
|
|
r = requests.Request("POST", url, data=data)
|
|
prepared_request = r.prepare()
|
|
assert "Transfer-Encoding" in prepared_request.headers
|
|
assert "Content-Length" not in prepared_request.headers
|
|
|
|
def test_custom_redirect_mixin(self, httpbin):
|
|
"""Tests a custom mixin to overwrite ``get_redirect_target``.
|
|
|
|
Ensures a subclassed ``requests.Session`` can handle a certain type of
|
|
malformed redirect responses.
|
|
|
|
1. original request receives a proper response: 302 redirect
|
|
2. following the redirect, a malformed response is given:
|
|
status code = HTTP 200
|
|
location = alternate url
|
|
3. the custom session catches the edge case and follows the redirect
|
|
"""
|
|
url_final = httpbin("html")
|
|
querystring_malformed = urlencode({"location": url_final})
|
|
url_redirect_malformed = httpbin("response-headers?%s" % querystring_malformed)
|
|
querystring_redirect = urlencode({"url": url_redirect_malformed})
|
|
url_redirect = httpbin("redirect-to?%s" % querystring_redirect)
|
|
urls_test = [
|
|
url_redirect,
|
|
url_redirect_malformed,
|
|
url_final,
|
|
]
|
|
|
|
class CustomRedirectSession(requests.Session):
|
|
def get_redirect_target(self, resp):
|
|
# default behavior
|
|
if resp.is_redirect:
|
|
return resp.headers["location"]
|
|
# edge case - check to see if 'location' is in headers anyways
|
|
location = resp.headers.get("location")
|
|
if location and (location != resp.url):
|
|
return location
|
|
return None
|
|
|
|
session = CustomRedirectSession()
|
|
r = session.get(urls_test[0])
|
|
assert len(r.history) == 2
|
|
assert r.status_code == 200
|
|
assert r.history[0].status_code == 302
|
|
assert r.history[0].is_redirect
|
|
assert r.history[1].status_code == 200
|
|
assert not r.history[1].is_redirect
|
|
assert r.url == urls_test[2]
|
|
|
|
|
|
class TestCaseInsensitiveDict:
|
|
@pytest.mark.parametrize(
|
|
"cid",
|
|
(
|
|
CaseInsensitiveDict({"Foo": "foo", "BAr": "bar"}),
|
|
CaseInsensitiveDict([("Foo", "foo"), ("BAr", "bar")]),
|
|
CaseInsensitiveDict(FOO="foo", BAr="bar"),
|
|
),
|
|
)
|
|
def test_init(self, cid):
|
|
assert len(cid) == 2
|
|
assert "foo" in cid
|
|
assert "bar" in cid
|
|
|
|
def test_docstring_example(self):
|
|
cid = CaseInsensitiveDict()
|
|
cid["Accept"] = "application/json"
|
|
assert cid["aCCEPT"] == "application/json"
|
|
assert list(cid) == ["Accept"]
|
|
|
|
def test_len(self):
|
|
cid = CaseInsensitiveDict({"a": "a", "b": "b"})
|
|
cid["A"] = "a"
|
|
assert len(cid) == 2
|
|
|
|
def test_getitem(self):
|
|
cid = CaseInsensitiveDict({"Spam": "blueval"})
|
|
assert cid["spam"] == "blueval"
|
|
assert cid["SPAM"] == "blueval"
|
|
|
|
def test_fixes_649(self):
|
|
"""__setitem__ should behave case-insensitively."""
|
|
cid = CaseInsensitiveDict()
|
|
cid["spam"] = "oneval"
|
|
cid["Spam"] = "twoval"
|
|
cid["sPAM"] = "redval"
|
|
cid["SPAM"] = "blueval"
|
|
assert cid["spam"] == "blueval"
|
|
assert cid["SPAM"] == "blueval"
|
|
assert list(cid.keys()) == ["SPAM"]
|
|
|
|
def test_delitem(self):
|
|
cid = CaseInsensitiveDict()
|
|
cid["Spam"] = "someval"
|
|
del cid["sPam"]
|
|
assert "spam" not in cid
|
|
assert len(cid) == 0
|
|
|
|
def test_contains(self):
|
|
cid = CaseInsensitiveDict()
|
|
cid["Spam"] = "someval"
|
|
assert "Spam" in cid
|
|
assert "spam" in cid
|
|
assert "SPAM" in cid
|
|
assert "sPam" in cid
|
|
assert "notspam" not in cid
|
|
|
|
def test_get(self):
|
|
cid = CaseInsensitiveDict()
|
|
cid["spam"] = "oneval"
|
|
cid["SPAM"] = "blueval"
|
|
assert cid.get("spam") == "blueval"
|
|
assert cid.get("SPAM") == "blueval"
|
|
assert cid.get("sPam") == "blueval"
|
|
assert cid.get("notspam", "default") == "default"
|
|
|
|
def test_update(self):
|
|
cid = CaseInsensitiveDict()
|
|
cid["spam"] = "blueval"
|
|
cid.update({"sPam": "notblueval"})
|
|
assert cid["spam"] == "notblueval"
|
|
cid = CaseInsensitiveDict({"Foo": "foo", "BAr": "bar"})
|
|
cid.update({"fOO": "anotherfoo", "bAR": "anotherbar"})
|
|
assert len(cid) == 2
|
|
assert cid["foo"] == "anotherfoo"
|
|
assert cid["bar"] == "anotherbar"
|
|
|
|
def test_update_retains_unchanged(self):
|
|
cid = CaseInsensitiveDict({"foo": "foo", "bar": "bar"})
|
|
cid.update({"foo": "newfoo"})
|
|
assert cid["bar"] == "bar"
|
|
|
|
def test_iter(self):
|
|
cid = CaseInsensitiveDict({"Spam": "spam", "Eggs": "eggs"})
|
|
keys = frozenset(["Spam", "Eggs"])
|
|
assert frozenset(iter(cid)) == keys
|
|
|
|
def test_equality(self):
|
|
cid = CaseInsensitiveDict({"SPAM": "blueval", "Eggs": "redval"})
|
|
othercid = CaseInsensitiveDict({"spam": "blueval", "eggs": "redval"})
|
|
assert cid == othercid
|
|
del othercid["spam"]
|
|
assert cid != othercid
|
|
assert cid == {"spam": "blueval", "eggs": "redval"}
|
|
assert cid != object()
|
|
|
|
def test_setdefault(self):
|
|
cid = CaseInsensitiveDict({"Spam": "blueval"})
|
|
assert cid.setdefault("spam", "notblueval") == "blueval"
|
|
assert cid.setdefault("notspam", "notblueval") == "notblueval"
|
|
|
|
def test_lower_items(self):
|
|
cid = CaseInsensitiveDict(
|
|
{
|
|
"Accept": "application/json",
|
|
"user-Agent": "requests",
|
|
}
|
|
)
|
|
keyset = frozenset(lowerkey for lowerkey, v in cid.lower_items())
|
|
lowerkeyset = frozenset(["accept", "user-agent"])
|
|
assert keyset == lowerkeyset
|
|
|
|
def test_preserve_key_case(self):
|
|
cid = CaseInsensitiveDict(
|
|
{
|
|
"Accept": "application/json",
|
|
"user-Agent": "requests",
|
|
}
|
|
)
|
|
keyset = frozenset(["Accept", "user-Agent"])
|
|
assert frozenset(i[0] for i in cid.items()) == keyset
|
|
assert frozenset(cid.keys()) == keyset
|
|
assert frozenset(cid) == keyset
|
|
|
|
def test_preserve_last_key_case(self):
|
|
cid = CaseInsensitiveDict(
|
|
{
|
|
"Accept": "application/json",
|
|
"user-Agent": "requests",
|
|
}
|
|
)
|
|
cid.update({"ACCEPT": "application/json"})
|
|
cid["USER-AGENT"] = "requests"
|
|
keyset = frozenset(["ACCEPT", "USER-AGENT"])
|
|
assert frozenset(i[0] for i in cid.items()) == keyset
|
|
assert frozenset(cid.keys()) == keyset
|
|
assert frozenset(cid) == keyset
|
|
|
|
def test_copy(self):
|
|
cid = CaseInsensitiveDict(
|
|
{
|
|
"Accept": "application/json",
|
|
"user-Agent": "requests",
|
|
}
|
|
)
|
|
cid_copy = cid.copy()
|
|
assert cid == cid_copy
|
|
cid["changed"] = True
|
|
assert cid != cid_copy
|
|
|
|
|
|
class TestMorselToCookieExpires:
|
|
"""Tests for morsel_to_cookie when morsel contains expires."""
|
|
|
|
def test_expires_valid_str(self):
|
|
"""Test case where we convert expires from string time."""
|
|
|
|
morsel = Morsel()
|
|
morsel["expires"] = "Thu, 01-Jan-1970 00:00:01 GMT"
|
|
cookie = morsel_to_cookie(morsel)
|
|
assert cookie.expires == 1
|
|
|
|
@pytest.mark.parametrize(
|
|
"value, exception",
|
|
(
|
|
(100, TypeError),
|
|
("woops", ValueError),
|
|
),
|
|
)
|
|
def test_expires_invalid_int(self, value, exception):
|
|
"""Test case where an invalid type is passed for expires."""
|
|
morsel = Morsel()
|
|
morsel["expires"] = value
|
|
with pytest.raises(exception):
|
|
morsel_to_cookie(morsel)
|
|
|
|
def test_expires_none(self):
|
|
"""Test case where expires is None."""
|
|
|
|
morsel = Morsel()
|
|
morsel["expires"] = None
|
|
cookie = morsel_to_cookie(morsel)
|
|
assert cookie.expires is None
|
|
|
|
|
|
class TestMorselToCookieMaxAge:
|
|
|
|
"""Tests for morsel_to_cookie when morsel contains max-age."""
|
|
|
|
def test_max_age_valid_int(self):
|
|
"""Test case where a valid max age in seconds is passed."""
|
|
|
|
morsel = Morsel()
|
|
morsel["max-age"] = 60
|
|
cookie = morsel_to_cookie(morsel)
|
|
assert isinstance(cookie.expires, int)
|
|
|
|
def test_max_age_invalid_str(self):
|
|
"""Test case where a invalid max age is passed."""
|
|
|
|
morsel = Morsel()
|
|
morsel["max-age"] = "woops"
|
|
with pytest.raises(TypeError):
|
|
morsel_to_cookie(morsel)
|
|
|
|
|
|
class TestTimeout:
|
|
def test_stream_timeout(self, httpbin):
|
|
try:
|
|
requests.get(httpbin("delay/10"), timeout=2.0)
|
|
except requests.exceptions.Timeout as e:
|
|
assert "Read timed out" in e.args[0].args[0]
|
|
|
|
@pytest.mark.parametrize(
|
|
"timeout, error_text",
|
|
(
|
|
((3, 4, 5), "(connect, read)"),
|
|
("foo", "must be an int, float or None"),
|
|
),
|
|
)
|
|
def test_invalid_timeout(self, httpbin, timeout, error_text):
|
|
with pytest.raises(ValueError) as e:
|
|
requests.get(httpbin("get"), timeout=timeout)
|
|
assert error_text in str(e)
|
|
|
|
@pytest.mark.parametrize("timeout", (None, Urllib3Timeout(connect=None, read=None)))
|
|
def test_none_timeout(self, httpbin, timeout):
|
|
"""Check that you can set None as a valid timeout value.
|
|
|
|
To actually test this behavior, we'd want to check that setting the
|
|
timeout to None actually lets the request block past the system default
|
|
timeout. However, this would make the test suite unbearably slow.
|
|
Instead we verify that setting the timeout to None does not prevent the
|
|
request from succeeding.
|
|
"""
|
|
r = requests.get(httpbin("get"), timeout=timeout)
|
|
assert r.status_code == 200
|
|
|
|
@pytest.mark.parametrize(
|
|
"timeout", ((None, 0.1), Urllib3Timeout(connect=None, read=0.1))
|
|
)
|
|
def test_read_timeout(self, httpbin, timeout):
|
|
try:
|
|
requests.get(httpbin("delay/10"), timeout=timeout)
|
|
pytest.fail("The recv() request should time out.")
|
|
except ReadTimeout:
|
|
pass
|
|
|
|
@pytest.mark.parametrize(
|
|
"timeout", ((0.1, None), Urllib3Timeout(connect=0.1, read=None))
|
|
)
|
|
def test_connect_timeout(self, timeout):
|
|
try:
|
|
requests.get(TARPIT, timeout=timeout)
|
|
pytest.fail("The connect() request should time out.")
|
|
except ConnectTimeout as e:
|
|
assert isinstance(e, ConnectionError)
|
|
assert isinstance(e, Timeout)
|
|
|
|
@pytest.mark.parametrize(
|
|
"timeout", ((0.1, 0.1), Urllib3Timeout(connect=0.1, read=0.1))
|
|
)
|
|
def test_total_timeout_connect(self, timeout):
|
|
try:
|
|
requests.get(TARPIT, timeout=timeout)
|
|
pytest.fail("The connect() request should time out.")
|
|
except ConnectTimeout:
|
|
pass
|
|
|
|
def test_encoded_methods(self, httpbin):
|
|
"""See: https://github.com/psf/requests/issues/2316"""
|
|
r = requests.request(b"GET", httpbin("get"))
|
|
assert r.ok
|
|
|
|
|
|
SendCall = collections.namedtuple("SendCall", ("args", "kwargs"))
|
|
|
|
|
|
class RedirectSession(SessionRedirectMixin):
|
|
def __init__(self, order_of_redirects):
|
|
self.redirects = order_of_redirects
|
|
self.calls = []
|
|
self.max_redirects = 30
|
|
self.cookies = {}
|
|
self.trust_env = False
|
|
|
|
def send(self, *args, **kwargs):
|
|
self.calls.append(SendCall(args, kwargs))
|
|
return self.build_response()
|
|
|
|
def build_response(self):
|
|
request = self.calls[-1].args[0]
|
|
r = requests.Response()
|
|
|
|
try:
|
|
r.status_code = int(self.redirects.pop(0))
|
|
except IndexError:
|
|
r.status_code = 200
|
|
|
|
r.headers = CaseInsensitiveDict({"Location": "/"})
|
|
r.raw = self._build_raw()
|
|
r.request = request
|
|
return r
|
|
|
|
def _build_raw(self):
|
|
string = StringIO.StringIO("")
|
|
setattr(string, "release_conn", lambda *args: args)
|
|
return string
|
|
|
|
|
|
def test_json_encodes_as_bytes():
|
|
# urllib3 expects bodies as bytes-like objects
|
|
body = {"key": "value"}
|
|
p = PreparedRequest()
|
|
p.prepare(method="GET", url="https://www.example.com/", json=body)
|
|
assert isinstance(p.body, bytes)
|
|
|
|
|
|
def test_requests_are_updated_each_time(httpbin):
|
|
session = RedirectSession([303, 307])
|
|
prep = requests.Request("POST", httpbin("post")).prepare()
|
|
r0 = session.send(prep)
|
|
assert r0.request.method == "POST"
|
|
assert session.calls[-1] == SendCall((r0.request,), {})
|
|
redirect_generator = session.resolve_redirects(r0, prep)
|
|
default_keyword_args = {
|
|
"stream": False,
|
|
"verify": True,
|
|
"cert": None,
|
|
"timeout": None,
|
|
"allow_redirects": False,
|
|
"proxies": {},
|
|
}
|
|
for response in redirect_generator:
|
|
assert response.request.method == "GET"
|
|
send_call = SendCall((response.request,), default_keyword_args)
|
|
assert session.calls[-1] == send_call
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"var,url,proxy",
|
|
[
|
|
("http_proxy", "http://example.com", "socks5://proxy.com:9876"),
|
|
("https_proxy", "https://example.com", "socks5://proxy.com:9876"),
|
|
("all_proxy", "http://example.com", "socks5://proxy.com:9876"),
|
|
("all_proxy", "https://example.com", "socks5://proxy.com:9876"),
|
|
],
|
|
)
|
|
def test_proxy_env_vars_override_default(var, url, proxy):
|
|
session = requests.Session()
|
|
prep = PreparedRequest()
|
|
prep.prepare(method="GET", url=url)
|
|
|
|
kwargs = {var: proxy}
|
|
scheme = urlparse(url).scheme
|
|
with override_environ(**kwargs):
|
|
proxies = session.rebuild_proxies(prep, {})
|
|
assert scheme in proxies
|
|
assert proxies[scheme] == proxy
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"data",
|
|
(
|
|
(("a", "b"), ("c", "d")),
|
|
(("c", "d"), ("a", "b")),
|
|
(("a", "b"), ("c", "d"), ("e", "f")),
|
|
),
|
|
)
|
|
def test_data_argument_accepts_tuples(data):
|
|
"""Ensure that the data argument will accept tuples of strings
|
|
and properly encode them.
|
|
"""
|
|
p = PreparedRequest()
|
|
p.prepare(
|
|
method="GET", url="http://www.example.com", data=data, hooks=default_hooks()
|
|
)
|
|
assert p.body == urlencode(data)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"kwargs",
|
|
(
|
|
None,
|
|
{
|
|
"method": "GET",
|
|
"url": "http://www.example.com",
|
|
"data": "foo=bar",
|
|
"hooks": default_hooks(),
|
|
},
|
|
{
|
|
"method": "GET",
|
|
"url": "http://www.example.com",
|
|
"data": "foo=bar",
|
|
"hooks": default_hooks(),
|
|
"cookies": {"foo": "bar"},
|
|
},
|
|
{"method": "GET", "url": "http://www.example.com/üniçø∂é"},
|
|
),
|
|
)
|
|
def test_prepared_copy(kwargs):
|
|
p = PreparedRequest()
|
|
if kwargs:
|
|
p.prepare(**kwargs)
|
|
copy = p.copy()
|
|
for attr in ("method", "url", "headers", "_cookies", "body", "hooks"):
|
|
assert getattr(p, attr) == getattr(copy, attr)
|
|
|
|
|
|
def test_urllib3_retries(httpbin):
|
|
from urllib3.util import Retry
|
|
|
|
s = requests.Session()
|
|
s.mount("http://", HTTPAdapter(max_retries=Retry(total=2, status_forcelist=[500])))
|
|
|
|
with pytest.raises(RetryError):
|
|
s.get(httpbin("status/500"))
|
|
|
|
|
|
def test_urllib3_pool_connection_closed(httpbin):
|
|
s = requests.Session()
|
|
s.mount("http://", HTTPAdapter(pool_connections=0, pool_maxsize=0))
|
|
|
|
try:
|
|
s.get(httpbin("status/200"))
|
|
except ConnectionError as e:
|
|
assert "Pool is closed." in str(e)
|
|
|
|
|
|
class TestPreparingURLs:
|
|
@pytest.mark.parametrize(
|
|
"url,expected",
|
|
(
|
|
("http://google.com", "http://google.com/"),
|
|
("http://ジェーピーニック.jp", "http://xn--hckqz9bzb1cyrb.jp/"),
|
|
("http://xn--n3h.net/", "http://xn--n3h.net/"),
|
|
("http://ジェーピーニック.jp".encode(), "http://xn--hckqz9bzb1cyrb.jp/"),
|
|
("http://straße.de/straße", "http://xn--strae-oqa.de/stra%C3%9Fe"),
|
|
(
|
|
"http://straße.de/straße".encode(),
|
|
"http://xn--strae-oqa.de/stra%C3%9Fe",
|
|
),
|
|
(
|
|
"http://Königsgäßchen.de/straße",
|
|
"http://xn--knigsgchen-b4a3dun.de/stra%C3%9Fe",
|
|
),
|
|
(
|
|
"http://Königsgäßchen.de/straße".encode(),
|
|
"http://xn--knigsgchen-b4a3dun.de/stra%C3%9Fe",
|
|
),
|
|
(b"http://xn--n3h.net/", "http://xn--n3h.net/"),
|
|
(
|
|
b"http://[1200:0000:ab00:1234:0000:2552:7777:1313]:12345/",
|
|
"http://[1200:0000:ab00:1234:0000:2552:7777:1313]:12345/",
|
|
),
|
|
(
|
|
"http://[1200:0000:ab00:1234:0000:2552:7777:1313]:12345/",
|
|
"http://[1200:0000:ab00:1234:0000:2552:7777:1313]:12345/",
|
|
),
|
|
),
|
|
)
|
|
def test_preparing_url(self, url, expected):
|
|
def normalize_percent_encode(x):
|
|
# Helper function that normalizes equivalent
|
|
# percent-encoded bytes before comparisons
|
|
for c in re.findall(r"%[a-fA-F0-9]{2}", x):
|
|
x = x.replace(c, c.upper())
|
|
return x
|
|
|
|
r = requests.Request("GET", url=url)
|
|
p = r.prepare()
|
|
assert normalize_percent_encode(p.url) == expected
|
|
|
|
@pytest.mark.parametrize(
|
|
"url",
|
|
(
|
|
b"http://*.google.com",
|
|
b"http://*",
|
|
"http://*.google.com",
|
|
"http://*",
|
|
"http://☃.net/",
|
|
),
|
|
)
|
|
def test_preparing_bad_url(self, url):
|
|
r = requests.Request("GET", url=url)
|
|
with pytest.raises(requests.exceptions.InvalidURL):
|
|
r.prepare()
|
|
|
|
@pytest.mark.parametrize("url, exception", (("http://localhost:-1", InvalidURL),))
|
|
def test_redirecting_to_bad_url(self, httpbin, url, exception):
|
|
with pytest.raises(exception):
|
|
requests.get(httpbin("redirect-to"), params={"url": url})
|
|
|
|
@pytest.mark.parametrize(
|
|
"input, expected",
|
|
(
|
|
(
|
|
b"http+unix://%2Fvar%2Frun%2Fsocket/path%7E",
|
|
"http+unix://%2Fvar%2Frun%2Fsocket/path~",
|
|
),
|
|
(
|
|
"http+unix://%2Fvar%2Frun%2Fsocket/path%7E",
|
|
"http+unix://%2Fvar%2Frun%2Fsocket/path~",
|
|
),
|
|
(
|
|
b"mailto:user@example.org",
|
|
"mailto:user@example.org",
|
|
),
|
|
(
|
|
"mailto:user@example.org",
|
|
"mailto:user@example.org",
|
|
),
|
|
(
|
|
b"data:SSDimaUgUHl0aG9uIQ==",
|
|
"data:SSDimaUgUHl0aG9uIQ==",
|
|
),
|
|
),
|
|
)
|
|
def test_url_mutation(self, input, expected):
|
|
"""
|
|
This test validates that we correctly exclude some URLs from
|
|
preparation, and that we handle others. Specifically, it tests that
|
|
any URL whose scheme doesn't begin with "http" is left alone, and
|
|
those whose scheme *does* begin with "http" are mutated.
|
|
"""
|
|
r = requests.Request("GET", url=input)
|
|
p = r.prepare()
|
|
assert p.url == expected
|
|
|
|
@pytest.mark.parametrize(
|
|
"input, params, expected",
|
|
(
|
|
(
|
|
b"http+unix://%2Fvar%2Frun%2Fsocket/path",
|
|
{"key": "value"},
|
|
"http+unix://%2Fvar%2Frun%2Fsocket/path?key=value",
|
|
),
|
|
(
|
|
"http+unix://%2Fvar%2Frun%2Fsocket/path",
|
|
{"key": "value"},
|
|
"http+unix://%2Fvar%2Frun%2Fsocket/path?key=value",
|
|
),
|
|
(
|
|
b"mailto:user@example.org",
|
|
{"key": "value"},
|
|
"mailto:user@example.org",
|
|
),
|
|
(
|
|
"mailto:user@example.org",
|
|
{"key": "value"},
|
|
"mailto:user@example.org",
|
|
),
|
|
),
|
|
)
|
|
def test_parameters_for_nonstandard_schemes(self, input, params, expected):
|
|
"""
|
|
Setting parameters for nonstandard schemes is allowed if those schemes
|
|
begin with "http", and is forbidden otherwise.
|
|
"""
|
|
r = requests.Request("GET", url=input, params=params)
|
|
p = r.prepare()
|
|
assert p.url == expected
|
|
|
|
def test_post_json_nan(self, httpbin):
|
|
data = {"foo": float("nan")}
|
|
with pytest.raises(requests.exceptions.InvalidJSONError):
|
|
requests.post(httpbin("post"), json=data)
|
|
|
|
def test_json_decode_compatibility(self, httpbin):
|
|
r = requests.get(httpbin("bytes/20"))
|
|
with pytest.raises(requests.exceptions.JSONDecodeError) as excinfo:
|
|
r.json()
|
|
assert isinstance(excinfo.value, RequestException)
|
|
assert isinstance(excinfo.value, JSONDecodeError)
|
|
assert r.text not in str(excinfo.value)
|
|
|
|
def test_json_decode_persists_doc_attr(self, httpbin):
|
|
r = requests.get(httpbin("bytes/20"))
|
|
with pytest.raises(requests.exceptions.JSONDecodeError) as excinfo:
|
|
r.json()
|
|
assert excinfo.value.doc == r.text
|
|
|
|
def test_status_code_425(self):
|
|
r1 = requests.codes.get("TOO_EARLY")
|
|
r2 = requests.codes.get("too_early")
|
|
r3 = requests.codes.get("UNORDERED")
|
|
r4 = requests.codes.get("unordered")
|
|
r5 = requests.codes.get("UNORDERED_COLLECTION")
|
|
r6 = requests.codes.get("unordered_collection")
|
|
|
|
assert r1 == 425
|
|
assert r2 == 425
|
|
assert r3 == 425
|
|
assert r4 == 425
|
|
assert r5 == 425
|
|
assert r6 == 425
|