mirror of
https://github.com/kennethreitz/requests.git
synced 2026-06-05 14:50:16 +00:00
3027 lines
108 KiB
Python
3027 lines
108 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""Tests for Requests."""
|
|
|
|
from __future__ import division
|
|
import itertools
|
|
import json
|
|
import os
|
|
import pickle
|
|
import collections
|
|
import contextlib
|
|
import warnings
|
|
|
|
import io
|
|
import requests
|
|
import pytest
|
|
import pytest_httpbin
|
|
from requests.adapters import HTTPAdapter
|
|
from requests.auth import HTTPDigestAuth, _basic_auth_str
|
|
from requests.basics import (
|
|
Morsel, cookielib, getproxies, str, urlparse, builtin_str
|
|
)
|
|
from requests.cookies import ( cookiejar_from_dict, morsel_to_cookie)
|
|
from requests.exceptions import (
|
|
ConnectionError,
|
|
ConnectTimeout,
|
|
InvalidScheme,
|
|
InvalidURL,
|
|
MissingScheme,
|
|
ReadTimeout,
|
|
Timeout,
|
|
RetryError,
|
|
TooManyRedirects,
|
|
ProxyError,
|
|
InvalidHeader,
|
|
UnrewindableBodyError,
|
|
InvalidBodyError,
|
|
SSLError,
|
|
)
|
|
from requests.models import PreparedRequest
|
|
from requests.structures import CaseInsensitiveDict
|
|
from requests.sessions import SessionRedirectMixin
|
|
from requests.models import urlencode
|
|
from requests.hooks import default_hooks
|
|
from requests.utils import DEFAULT_CA_BUNDLE_PATH
|
|
|
|
from .compat import StringIO, u
|
|
from .utils import override_environ
|
|
from urllib3.util import Timeout as Urllib3Timeout
|
|
|
|
|
|
class SendRecordingAdapter(HTTPAdapter):
|
|
"""
|
|
A basic subclass of the HTTPAdapter that records the arguments used to
|
|
``send``.
|
|
"""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super(SendRecordingAdapter, self).__init__(*args, **kwargs)
|
|
self.send_calls = []
|
|
|
|
def send(self, *args, **kwargs):
|
|
self.send_calls.append((args, kwargs))
|
|
return super(SendRecordingAdapter, self).send(*args, **kwargs)
|
|
|
|
|
|
# Requests to this URL should always fail with a connection timeout (nothing
|
|
# listening on that port)
|
|
TARPIT = 'http://10.255.255.1'
|
|
try:
|
|
from ssl import SSLContext
|
|
|
|
del SSLContext
|
|
HAS_MODERN_SSL = True
|
|
except ImportError:
|
|
HAS_MODERN_SSL = False
|
|
try:
|
|
requests.pyopenssl
|
|
HAS_PYOPENSSL = True
|
|
except AttributeError:
|
|
HAS_PYOPENSSL = False
|
|
|
|
|
|
class TestRequests:
|
|
|
|
def test_entry_points(self):
|
|
requests.session
|
|
requests.session().get
|
|
requests.session().head
|
|
requests.get
|
|
requests.head
|
|
requests.put
|
|
requests.patch
|
|
requests.post
|
|
|
|
@pytest.mark.parametrize(
|
|
'exception, url',
|
|
(
|
|
(MissingScheme, 'hiwpefhipowhefopw'),
|
|
(InvalidScheme, 'localhost:3128'),
|
|
(InvalidScheme, 'localhost.localdomain:3128/'),
|
|
(InvalidScheme, '10.122.1.1:3128/'),
|
|
(InvalidURL, 'http://'),
|
|
),
|
|
)
|
|
def test_invalid_url(self, exception, url):
|
|
with pytest.raises(exception):
|
|
requests.get(url)
|
|
|
|
def test_basic_building(self):
|
|
req = requests.Request(method='GET')
|
|
req.url = 'http://kennethreitz.org/'
|
|
req.data = {'life': '42'}
|
|
pr = req.prepare()
|
|
assert pr.url == req.url
|
|
assert pr.body == 'life=42'
|
|
|
|
@pytest.mark.parametrize('method', ('GET', 'HEAD'))
|
|
def test_no_content_length(self, httpbin, method):
|
|
req = requests.Request(method, httpbin(method.lower())).prepare()
|
|
assert 'Content-Length' not in req.headers
|
|
|
|
@pytest.mark.parametrize('method', ('POST', 'PUT', 'PATCH', 'OPTIONS'))
|
|
def test_no_body_content_length(self, httpbin, method):
|
|
req = requests.Request(method, httpbin(method.lower())).prepare()
|
|
assert req.headers['Content-Length'] == '0'
|
|
|
|
@pytest.mark.parametrize('method', ('POST', 'PUT', 'PATCH', 'OPTIONS'))
|
|
def test_empty_content_length(self, httpbin, method):
|
|
req = requests.Request(
|
|
method, httpbin(method.lower()), data=''
|
|
).prepare(
|
|
)
|
|
assert req.headers['Content-Length'] == '0'
|
|
|
|
def test_override_content_length(self, httpbin):
|
|
headers = {'Content-Length': 'not zero'}
|
|
r = requests.Request('POST', httpbin('post'), headers=headers).prepare(
|
|
)
|
|
assert 'Content-Length' in r.headers
|
|
assert r.headers['Content-Length'] == 'not zero'
|
|
|
|
def test_path_is_not_double_encoded(self):
|
|
request = requests.Request(
|
|
'GET', "http://0.0.0.0/get/test case"
|
|
).prepare(
|
|
)
|
|
assert request.path_url == '/get/test%20case'
|
|
|
|
@pytest.mark.parametrize(
|
|
'url, expected',
|
|
(
|
|
(
|
|
'http://example.com/path#fragment',
|
|
'http://example.com/path?a=b#fragment',
|
|
),
|
|
(
|
|
'http://example.com/path?key=value#fragment',
|
|
'http://example.com/path?key=value&a=b#fragment',
|
|
),
|
|
),
|
|
)
|
|
def test_params_are_added_before_fragment(self, url, expected):
|
|
request = requests.Request('GET', url, params={"a": "b"}).prepare()
|
|
assert request.url == expected
|
|
|
|
def test_params_original_order_is_preserved_by_default(self):
|
|
param_ordered_dict = collections.OrderedDict(
|
|
(('z', 1), ('a', 1), ('k', 1), ('d', 1))
|
|
)
|
|
session = requests.Session()
|
|
request = requests.Request(
|
|
'GET', 'http://example.com/', params=param_ordered_dict
|
|
)
|
|
prep = session.prepare_request(request)
|
|
assert prep.url == 'http://example.com/?z=1&a=1&k=1&d=1'
|
|
|
|
def test_params_bytes_are_encoded(self):
|
|
request = requests.Request(
|
|
'GET', 'http://example.com', params=b'test=foo'
|
|
).prepare(
|
|
)
|
|
assert request.url == 'http://example.com/?test=foo'
|
|
|
|
def test_binary_put(self):
|
|
request = requests.Request(
|
|
'PUT', 'http://example.com', data=u"ööö".encode("utf-8")
|
|
).prepare(
|
|
)
|
|
assert isinstance(request.body, bytes)
|
|
|
|
def test_whitespaces_are_removed_from_url(self):
|
|
# Test for issue #3696
|
|
request = requests.Request('GET', ' http://example.com').prepare()
|
|
assert request.url == 'http://example.com/'
|
|
|
|
@pytest.mark.parametrize(
|
|
'scheme', ('http://', 'HTTP://', 'hTTp://', 'HttP://')
|
|
)
|
|
def test_mixed_case_scheme_acceptable(self, httpbin, scheme):
|
|
s = requests.Session()
|
|
s.proxies = getproxies()
|
|
parts = urlparse(httpbin('get'))
|
|
url = scheme + parts.netloc + parts.path
|
|
r = requests.Request('GET', url)
|
|
r = s.send(r.prepare())
|
|
assert r.status_code == 200, 'failed for scheme {0}'.format(scheme)
|
|
|
|
def test_HTTP_200_OK_GET_ALTERNATIVE(self, httpbin):
|
|
r = requests.Request('GET', httpbin('get'))
|
|
s = requests.Session()
|
|
s.proxies = getproxies()
|
|
r = s.send(r.prepare())
|
|
assert r.status_code == 200
|
|
|
|
def test_HTTP_302_ALLOW_REDIRECT_GET(self, httpbin):
|
|
r = requests.get(httpbin('redirect', '1'))
|
|
assert r.status_code == 200
|
|
assert r.history[0].status_code == 302
|
|
assert r.history[0].is_redirect
|
|
|
|
def test_HTTP_307_ALLOW_REDIRECT_POST(self, httpbin):
|
|
r = requests.post(
|
|
httpbin('redirect-to'),
|
|
data='test',
|
|
params={'url': 'post', 'status_code': 307},
|
|
)
|
|
assert r.status_code == 200
|
|
assert r.history[0].status_code == 307
|
|
assert r.history[0].is_redirect
|
|
assert r.json()['data'] == 'test'
|
|
|
|
def test_HTTP_307_ALLOW_REDIRECT_POST_WITH_SEEKABLE(self, httpbin):
|
|
byte_str = b'test'
|
|
r = requests.post(
|
|
httpbin('redirect-to'),
|
|
data=io.BytesIO(byte_str),
|
|
params={'url': 'post', 'status_code': 307},
|
|
)
|
|
assert r.status_code == 200
|
|
assert r.history[0].status_code == 307
|
|
assert r.history[0].is_redirect
|
|
assert r.json()['data'] == byte_str.decode('utf-8')
|
|
|
|
def test_HTTP_302_TOO_MANY_REDIRECTS(self, httpbin):
|
|
try:
|
|
requests.get(httpbin('relative-redirect', '50'))
|
|
except TooManyRedirects as e:
|
|
url = httpbin('relative-redirect', '20')
|
|
assert e.request.url == url
|
|
assert e.response.url == url
|
|
assert len(e.response.history) == 30
|
|
else:
|
|
pytest.fail(
|
|
'Expected redirect to raise TooManyRedirects but it did not'
|
|
)
|
|
|
|
def test_HTTP_302_TOO_MANY_REDIRECTS_WITH_PARAMS(self, httpbin):
|
|
s = requests.session()
|
|
s.max_redirects = 5
|
|
try:
|
|
s.get(httpbin('relative-redirect', '50'))
|
|
except TooManyRedirects as e:
|
|
url = httpbin('relative-redirect', '45')
|
|
assert e.request.url == url
|
|
assert e.response.url == url
|
|
assert len(e.response.history) == 5
|
|
else:
|
|
pytest.fail(
|
|
'Expected custom max number of redirects to be respected but was not'
|
|
)
|
|
|
|
@pytest.mark.parametrize(
|
|
'method, body, expected',
|
|
(
|
|
('GET', None, 'GET'),
|
|
('HEAD', None, 'HEAD'),
|
|
('POST', 'test', 'GET'),
|
|
('PUT', 'put test', 'PUT'),
|
|
('PATCH', 'patch test', 'PATCH'),
|
|
('DELETE', '', 'DELETE'),
|
|
),
|
|
)
|
|
def test_http_301_for_redirectable_methods(
|
|
self, httpbin, method, body, expected
|
|
):
|
|
"""Tests all methods except OPTIONS for expected redirect behaviour.
|
|
|
|
OPTIONS responses can behave differently depending on the server, so
|
|
we don't have anything uniform to test except how httpbin responds
|
|
to them. For that reason they aren't included here.
|
|
"""
|
|
params = {'url': '/%s' % expected.lower(), 'status_code': '301'}
|
|
r = requests.request(
|
|
method, httpbin('redirect-to'), data=body, params=params
|
|
)
|
|
assert r.request.url == httpbin(expected.lower())
|
|
assert r.request.method == expected
|
|
assert r.history[0].status_code == 301
|
|
assert r.history[0].is_redirect
|
|
if expected in ('GET', 'HEAD'):
|
|
assert r.request.body is None
|
|
else:
|
|
assert r.json()['data'] == body
|
|
|
|
@pytest.mark.parametrize(
|
|
'method, body, expected',
|
|
(
|
|
('GET', None, 'GET'),
|
|
('HEAD', None, 'HEAD'),
|
|
('POST', 'test', 'GET'),
|
|
('PUT', 'put test', 'PUT'),
|
|
('PATCH', 'patch test', 'PATCH'),
|
|
('DELETE', '', 'DELETE'),
|
|
),
|
|
)
|
|
def test_http_302_for_redirectable_methods(
|
|
self, httpbin, method, body, expected
|
|
):
|
|
"""Tests all methods except OPTIONS for expected redirect behaviour.
|
|
|
|
OPTIONS responses can behave differently depending on the server, so
|
|
we don't have anything uniform to test except how httpbin responds
|
|
to them. For that reason they aren't included here.
|
|
"""
|
|
params = {'url': '/%s' % expected.lower()}
|
|
r = requests.request(
|
|
method, httpbin('redirect-to'), data=body, params=params
|
|
)
|
|
assert r.request.url == httpbin(expected.lower())
|
|
assert r.request.method == expected
|
|
assert r.history[0].status_code == 302
|
|
assert r.history[0].is_redirect
|
|
if expected in ('GET', 'HEAD'):
|
|
assert r.request.body is None
|
|
else:
|
|
assert r.json()['data'] == body
|
|
|
|
@pytest.mark.parametrize(
|
|
'method, body, expected',
|
|
(
|
|
('GET', None, 'GET'),
|
|
('HEAD', None, 'HEAD'),
|
|
('POST', 'test', 'GET'),
|
|
('PUT', 'put test', 'GET'),
|
|
('PATCH', 'patch test', 'GET'),
|
|
('DELETE', '', 'GET'),
|
|
),
|
|
)
|
|
def test_http_303_for_redirectable_methods(
|
|
self, httpbin, method, body, expected
|
|
):
|
|
"""Tests all methods except OPTIONS for expected redirect behaviour.
|
|
|
|
OPTIONS responses can behave differently depending on the server, so
|
|
we don't have anything uniform to test except how httpbin responds
|
|
to them. For that reason they aren't included here.
|
|
"""
|
|
params = {'url': '/%s' % expected.lower(), 'status_code': '303'}
|
|
r = requests.request(
|
|
method, httpbin('redirect-to'), data=body, params=params
|
|
)
|
|
assert r.request.url == httpbin(expected.lower())
|
|
assert r.request.method == expected
|
|
assert r.history[0].status_code == 303
|
|
assert r.history[0].is_redirect
|
|
assert r.request.body is None
|
|
|
|
def test_multiple_location_headers(self, httpbin):
|
|
headers = [
|
|
('Location', 'http://example.com'),
|
|
('Location', 'https://example.com/1'),
|
|
]
|
|
params = '&'.join(['%s=%s' % (k, v) for k, v in headers])
|
|
ses = requests.Session()
|
|
req = requests.Request('GET', httpbin('response-headers?%s' % params))
|
|
prep = ses.prepare_request(req)
|
|
resp = ses.send(prep)
|
|
# change response to redirect
|
|
resp.status_code = 302
|
|
with pytest.raises(InvalidHeader):
|
|
# next triggers yield on generator
|
|
next(ses.resolve_redirects(resp, prep))
|
|
|
|
def test_header_and_body_removal_on_redirect(self, httpbin):
|
|
purged_headers = ('Content-Length', 'Content-Type')
|
|
ses = requests.Session()
|
|
req = requests.Request('POST', httpbin('post'), data={'test': 'data'})
|
|
prep = ses.prepare_request(req)
|
|
resp = ses.send(prep)
|
|
# Mimic a redirect response
|
|
resp.status_code = 302
|
|
resp.headers['location'] = 'get'
|
|
# Run request through resolve_redirects
|
|
next_resp = next(ses.resolve_redirects(resp, prep))
|
|
assert next_resp.request.body is None
|
|
for header in purged_headers:
|
|
assert header not in next_resp.request.headers
|
|
|
|
def test_transfer_enc_removal_on_redirect(self, httpbin):
|
|
purged_headers = ('Transfer-Encoding', 'Content-Type')
|
|
ses = requests.Session()
|
|
req = requests.Request(
|
|
'POST', httpbin('post'), data=(b'x' for x in range(1))
|
|
)
|
|
prep = ses.prepare_request(req)
|
|
assert 'Transfer-Encoding' in prep.headers
|
|
# Create Response to avoid https://github.com/kevin1024/pytest-httpbin/issues/33
|
|
resp = requests.Response()
|
|
resp.raw = io.BytesIO(b'the content')
|
|
resp.request = prep
|
|
setattr(resp.raw, 'release_conn', lambda *args: args)
|
|
# Mimic a redirect response
|
|
resp.status_code = 302
|
|
resp.headers['location'] = httpbin('get')
|
|
# Run request through resolve_redirect
|
|
next_resp = next(ses.resolve_redirects(resp, prep))
|
|
assert next_resp.request.body is None
|
|
for header in purged_headers:
|
|
assert header not in next_resp.request.headers
|
|
|
|
def test_HTTP_200_OK_GET_WITH_PARAMS(self, httpbin):
|
|
heads = {'User-agent': 'Mozilla/5.0'}
|
|
r = requests.get(httpbin('user-agent'), headers=heads)
|
|
assert heads['User-agent'] in r.text
|
|
assert r.status_code == 200
|
|
|
|
def test_HTTP_200_OK_GET_WITH_MIXED_PARAMS(self, httpbin):
|
|
heads = {'User-agent': 'Mozilla/5.0'}
|
|
r = requests.get(
|
|
httpbin('get') + '?test=true', params={'q': 'test'}, headers=heads
|
|
)
|
|
assert r.status_code == 200
|
|
|
|
def test_set_cookie_on_301(self, httpbin):
|
|
s = requests.session()
|
|
url = httpbin('cookies/set?foo=bar')
|
|
s.get(url)
|
|
assert s.cookies['foo'] == 'bar'
|
|
|
|
def test_cookie_sent_on_redirect(self, httpbin):
|
|
s = requests.session()
|
|
s.get(httpbin('cookies/set?foo=bar'))
|
|
r = s.get(httpbin('redirect/1')) # redirects to httpbin('get')
|
|
assert 'Cookie' in r.json()['headers']
|
|
|
|
def test_cookie_removed_on_expire(self, httpbin):
|
|
s = requests.session()
|
|
s.get(httpbin('cookies/set?foo=bar'))
|
|
assert s.cookies['foo'] == 'bar'
|
|
s.get(
|
|
httpbin('response-headers'),
|
|
params={
|
|
'Set-Cookie': 'foo=deleted; expires=Thu, 01-Jan-1970 00:00:01 GMT'
|
|
},
|
|
)
|
|
assert 'foo' not in s.cookies
|
|
|
|
def test_cookie_quote_wrapped(self, httpbin):
|
|
s = requests.session()
|
|
s.get(httpbin('cookies/set?foo="bar:baz"'))
|
|
assert s.cookies['foo'] == '"bar:baz"'
|
|
|
|
def test_cookie_persists_via_api(self, httpbin):
|
|
s = requests.session()
|
|
r = s.get(httpbin('redirect/1'), cookies={'foo': 'bar'})
|
|
assert 'foo' in r.request.headers['Cookie']
|
|
assert 'foo' in r.history[0].request.headers['Cookie']
|
|
|
|
def test_request_cookie_overrides_session_cookie(self, httpbin):
|
|
s = requests.session()
|
|
s.cookies['foo'] = 'bar'
|
|
r = s.get(httpbin('cookies'), cookies={'foo': 'baz'})
|
|
assert r.json()['cookies']['foo'] == 'baz'
|
|
# Session cookie should not be modified
|
|
assert s.cookies['foo'] == 'bar'
|
|
|
|
def test_request_cookies_not_persisted(self, httpbin):
|
|
s = requests.session()
|
|
s.get(httpbin('cookies'), cookies={'foo': 'baz'})
|
|
# Sending a request with cookies should not add cookies to the session
|
|
assert not s.cookies
|
|
|
|
def test_generic_cookiejar_works(self, httpbin):
|
|
cj = cookielib.CookieJar()
|
|
cookiejar_from_dict({'foo': 'bar'}, cj)
|
|
s = requests.session()
|
|
s.cookies = cj
|
|
r = s.get(httpbin('cookies'))
|
|
# Make sure the cookie was sent
|
|
assert r.json()['cookies']['foo'] == 'bar'
|
|
# Make sure the session cj is still the custom one
|
|
assert s.cookies is cj
|
|
|
|
def test_param_cookiejar_works(self, httpbin):
|
|
cj = cookielib.CookieJar()
|
|
cookiejar_from_dict({'foo': 'bar'}, cj)
|
|
s = requests.session()
|
|
r = s.get(httpbin('cookies'), cookies=cj)
|
|
# Make sure the cookie was sent
|
|
assert r.json()['cookies']['foo'] == 'bar'
|
|
|
|
def test_cookielib_cookiejar_on_redirect(self, httpbin):
|
|
"""Tests resolve_redirect doesn't fail when merging cookies
|
|
with non-RequestsCookieJar cookiejar.
|
|
|
|
See GH #3579
|
|
"""
|
|
cj = cookiejar_from_dict({'foo': 'bar'}, cookielib.CookieJar())
|
|
s = requests.Session()
|
|
s.cookies = cookiejar_from_dict({'cookie': 'tasty'})
|
|
# Prepare request without using Session
|
|
req = requests.Request('GET', httpbin('headers'), cookies=cj)
|
|
prep_req = req.prepare()
|
|
# Send request and simulate redirect
|
|
resp = s.send(prep_req)
|
|
resp.status_code = 302
|
|
resp.headers['location'] = httpbin('get')
|
|
redirects = s.resolve_redirects(resp, prep_req)
|
|
resp = next(redirects)
|
|
# Verify CookieJar isn't being converted to RequestsCookieJar
|
|
assert isinstance(prep_req._cookies, cookielib.CookieJar)
|
|
assert isinstance(resp.request._cookies, cookielib.CookieJar)
|
|
assert not isinstance(
|
|
resp.request._cookies, requests.cookies.RequestsCookieJar
|
|
)
|
|
cookies = {}
|
|
for c in resp.request._cookies:
|
|
cookies[c.name] = c.value
|
|
assert cookies['foo'] == 'bar'
|
|
assert cookies['cookie'] == 'tasty'
|
|
|
|
@pytest.mark.parametrize(
|
|
'jar', (requests.cookies.RequestsCookieJar(), cookielib.CookieJar())
|
|
)
|
|
def test_custom_cookie_policy_persistence(self, httpbin, jar):
|
|
"""Verify a custom CookiePolicy is propagated on each session request."""
|
|
|
|
class TestCookiePolicy(cookielib.DefaultCookiePolicy):
|
|
"""Policy to restrict all cookies from localhost (127.0.0.1)."""
|
|
|
|
def __init__(self):
|
|
cookielib.DefaultCookiePolicy.__init__(
|
|
self, blocked_domains=['127.0.0.1']
|
|
)
|
|
|
|
# Establish session with jar and set some cookies.
|
|
s = requests.Session()
|
|
s.cookies = jar
|
|
s.get(httpbin('cookies/set?k1=v1&k2=v2'))
|
|
assert len(s.cookies) == 2
|
|
# Set different policy.
|
|
s.cookies.set_policy(TestCookiePolicy())
|
|
assert isinstance(s.cookies._policy, TestCookiePolicy)
|
|
# No cookies were sent to our blocked domain and none were set.
|
|
resp = s.get(httpbin('cookies/set?k3=v3'))
|
|
assert 'Cookie' not in resp.request.headers
|
|
assert len(s.cookies) == 2
|
|
assert 'k3' not in s.cookies
|
|
|
|
def test_requests_in_history_are_not_overridden(self, httpbin):
|
|
resp = requests.get(httpbin('redirect/3'))
|
|
urls = [r.url for r in resp.history]
|
|
req_urls = [r.request.url for r in resp.history]
|
|
assert urls == req_urls
|
|
|
|
def test_history_is_always_a_list(self, httpbin):
|
|
"""Show that even with redirects, Response.history is always a list."""
|
|
resp = requests.get(httpbin('get'))
|
|
assert isinstance(resp.history, list)
|
|
resp = requests.get(httpbin('redirect/1'))
|
|
assert isinstance(resp.history, list)
|
|
assert not isinstance(resp.history, tuple)
|
|
|
|
def test_headers_on_session_with_None_are_not_sent(self, httpbin):
|
|
"""Do not send headers in Session.headers with None values."""
|
|
ses = requests.Session()
|
|
ses.headers['Accept-Encoding'] = None
|
|
req = requests.Request('GET', httpbin('get'))
|
|
prep = ses.prepare_request(req)
|
|
assert 'Accept-Encoding' not in prep.headers
|
|
|
|
def test_headers_preserve_order(self, httpbin):
|
|
"""Preserve order when headers provided as OrderedDict."""
|
|
ses = requests.Session()
|
|
ses.headers = collections.OrderedDict()
|
|
ses.headers['Accept-Encoding'] = 'identity'
|
|
ses.headers['First'] = '1'
|
|
ses.headers['Second'] = '2'
|
|
headers = collections.OrderedDict([('Third', '3'), ('Fourth', '4')])
|
|
headers['Fifth'] = '5'
|
|
headers['Second'] = '222'
|
|
req = requests.Request('GET', httpbin('get'), headers=headers)
|
|
prep = ses.prepare_request(req)
|
|
items = list(prep.headers.items())
|
|
assert items[0] == ('Accept-Encoding', 'identity')
|
|
assert items[1] == ('First', '1')
|
|
assert items[2] == ('Second', '222')
|
|
assert items[3] == ('Third', '3')
|
|
assert items[4] == ('Fourth', '4')
|
|
assert items[5] == ('Fifth', '5')
|
|
|
|
@pytest.mark.parametrize('key', ('User-agent', 'user-agent'))
|
|
def test_user_agent_transfers(self, httpbin, key):
|
|
heads = {key: 'Mozilla/5.0 (github.com/requests/requests)'}
|
|
r = requests.get(httpbin('user-agent'), headers=heads)
|
|
assert heads[key] in r.text
|
|
|
|
def test_HTTP_200_OK_HEAD(self, httpbin):
|
|
r = requests.head(httpbin('get'))
|
|
assert r.status_code == 200
|
|
|
|
def test_HTTP_200_OK_PUT(self, httpbin):
|
|
r = requests.put(httpbin('put'))
|
|
assert r.status_code == 200
|
|
|
|
def test_BASICAUTH_TUPLE_HTTP_200_OK_GET(self, httpbin):
|
|
auth = ('user', 'pass')
|
|
url = httpbin('basic-auth', 'user', 'pass')
|
|
r = requests.get(url, auth=auth)
|
|
assert r.status_code == 200
|
|
r = requests.get(url)
|
|
assert r.status_code == 401
|
|
s = requests.session()
|
|
s.auth = auth
|
|
r = s.get(url)
|
|
assert r.status_code == 200
|
|
|
|
@pytest.mark.parametrize(
|
|
'username, password',
|
|
(
|
|
('user', 'pass'),
|
|
(u'имя'.encode('utf-8'), u'пароль'.encode('utf-8')),
|
|
),
|
|
)
|
|
def test_set_basicauth(self, httpbin, username, password):
|
|
auth = (username, password)
|
|
url = httpbin('get')
|
|
r = requests.Request('GET', url, auth=auth)
|
|
p = r.prepare()
|
|
assert p.headers['Authorization'] == _basic_auth_str(
|
|
username, password
|
|
)
|
|
|
|
@pytest.mark.parametrize(
|
|
'username, password', (('user', 1234), (None, 'test'))
|
|
)
|
|
def test_non_str_basicauth(self, username, password):
|
|
"""Ensure we only allow string or bytes values for basicauth"""
|
|
with pytest.raises(TypeError) as e:
|
|
requests.auth._basic_auth_str(username, password)
|
|
assert 'must be of type str or bytes' in str(e)
|
|
|
|
def test_basicauth_encodes_byte_strings(self):
|
|
"""Ensure b'test' formats as the byte string "test" rather
|
|
than the unicode string "b'test'" in Python 3.
|
|
"""
|
|
auth = (b'\xc5\xafsername', b'test\xc6\xb6')
|
|
r = requests.Request('GET', 'http://localhost', auth=auth)
|
|
p = r.prepare()
|
|
assert p.headers['Authorization'] == 'Basic xa9zZXJuYW1lOnRlc3TGtg=='
|
|
|
|
@pytest.mark.parametrize(
|
|
'url, exception',
|
|
(('http://doesnotexist.google.com', ConnectionError), ('http://localhost:1', ConnectionError), ('http://fe80::5054:ff:fe5a:fc0', InvalidURL)),
|
|
# Connecting to an unknown domain should raise a ConnectionError
|
|
# Connecting to an invalid port should raise a ConnectionError
|
|
# Inputing a URL that cannot be parsed should raise an InvalidURL error
|
|
)
|
|
def test_errors(self, url, exception):
|
|
with pytest.raises(exception):
|
|
requests.get(url, timeout=1)
|
|
|
|
def test_proxy_error(self):
|
|
# any proxy related error (address resolution, no route to host, etc) should result in a ProxyError
|
|
with pytest.raises(ProxyError):
|
|
requests.get(
|
|
'http://localhost:1',
|
|
proxies={'http': 'non-resolvable-address'},
|
|
)
|
|
|
|
def test_basicauth_with_netrc(self, httpbin):
|
|
auth = ('user', 'pass')
|
|
wrong_auth = ('wronguser', 'wrongpass')
|
|
url = httpbin('basic-auth', 'user', 'pass')
|
|
old_auth = requests.sessions.get_netrc_auth
|
|
try:
|
|
|
|
def get_netrc_auth_mock(url):
|
|
return auth
|
|
|
|
requests.sessions.get_netrc_auth = get_netrc_auth_mock
|
|
# Should use netrc and work.
|
|
r = requests.get(url)
|
|
assert r.status_code == 200
|
|
# Given auth should override and fail.
|
|
r = requests.get(url, auth=wrong_auth)
|
|
assert r.status_code == 401
|
|
s = requests.session()
|
|
# Should use netrc and work.
|
|
r = s.get(url)
|
|
assert r.status_code == 200
|
|
# Given auth should override and fail.
|
|
s.auth = wrong_auth
|
|
r = s.get(url)
|
|
assert r.status_code == 401
|
|
finally:
|
|
requests.sessions.get_netrc_auth = old_auth
|
|
|
|
def test_DIGEST_HTTP_200_OK_GET(self, httpbin):
|
|
auth = HTTPDigestAuth('user', 'pass')
|
|
url = httpbin('digest-auth', 'auth', 'user', 'pass')
|
|
r = requests.get(url, auth=auth)
|
|
assert r.status_code == 200
|
|
r = requests.get(url)
|
|
assert r.status_code == 401
|
|
s = requests.session()
|
|
s.auth = HTTPDigestAuth('user', 'pass')
|
|
r = s.get(url)
|
|
assert r.status_code == 200
|
|
|
|
def test_DIGEST_AUTH_RETURNS_COOKIE(self, httpbin):
|
|
url = httpbin('digest-auth', 'auth', 'user', 'pass')
|
|
auth = HTTPDigestAuth('user', 'pass')
|
|
r = requests.get(url)
|
|
assert r.cookies['fake'] == 'fake_value'
|
|
r = requests.get(url, auth=auth)
|
|
assert r.status_code == 200
|
|
|
|
def test_DIGEST_AUTH_SETS_SESSION_COOKIES(self, httpbin):
|
|
url = httpbin('digest-auth', 'auth', 'user', 'pass')
|
|
auth = HTTPDigestAuth('user', 'pass')
|
|
s = requests.Session()
|
|
s.get(url, auth=auth)
|
|
assert s.cookies['fake'] == 'fake_value'
|
|
|
|
def test_DIGEST_STREAM(self, httpbin):
|
|
auth = HTTPDigestAuth('user', 'pass')
|
|
url = httpbin('digest-auth', 'auth', 'user', 'pass')
|
|
r = requests.get(url, auth=auth, stream=True)
|
|
assert r.raw.read() != b''
|
|
r = requests.get(url, auth=auth, stream=False)
|
|
assert r.raw.read() == b''
|
|
|
|
def test_DIGESTAUTH_WRONG_HTTP_401_GET(self, httpbin):
|
|
auth = HTTPDigestAuth('user', 'wrongpass')
|
|
url = httpbin('digest-auth', 'auth', 'user', 'pass')
|
|
r = requests.get(url, auth=auth)
|
|
assert r.status_code == 401
|
|
r = requests.get(url)
|
|
assert r.status_code == 401
|
|
s = requests.session()
|
|
s.auth = auth
|
|
r = s.get(url)
|
|
assert r.status_code == 401
|
|
|
|
def test_DIGESTAUTH_QUOTES_QOP_VALUE(self, httpbin):
|
|
auth = HTTPDigestAuth('user', 'pass')
|
|
url = httpbin('digest-auth', 'auth', 'user', 'pass')
|
|
r = requests.get(url, auth=auth)
|
|
assert '"auth"' in r.request.headers['Authorization']
|
|
|
|
def test_POSTBIN_GET_POST_FILES(self, httpbin):
|
|
url = httpbin('post')
|
|
requests.post(url).raise_for_status()
|
|
post1 = requests.post(url, data={'some': 'data'})
|
|
assert post1.status_code == 200
|
|
with open('Pipfile') as f:
|
|
post2 = requests.post(url, files={'some': f})
|
|
assert post2.status_code == 200
|
|
post4 = requests.post(url, data='[{"some": "json"}]')
|
|
assert post4.status_code == 200
|
|
with pytest.raises(ValueError):
|
|
requests.post(url, files=['bad file data'])
|
|
|
|
def test_POSTBIN_SEEKED_OBJECT_WITH_NO_ITER(self, httpbin):
|
|
|
|
class TestStream(object):
|
|
|
|
def __init__(self, data):
|
|
self.data = data.encode()
|
|
self.length = len(self.data)
|
|
self.index = 0
|
|
|
|
def __len__(self):
|
|
return self.length
|
|
|
|
def read(self, size=None):
|
|
if size:
|
|
ret = self.data[self.index: self.index + size]
|
|
self.index += size
|
|
else:
|
|
ret = self.data[self.index:]
|
|
self.index = self.length
|
|
return ret
|
|
|
|
def tell(self):
|
|
return self.index
|
|
|
|
def seek(self, offset, where=0):
|
|
if where == 0:
|
|
self.index = offset
|
|
elif where == 1:
|
|
self.index += offset
|
|
elif where == 2:
|
|
self.index = self.length + offset
|
|
|
|
test = TestStream('test')
|
|
post1 = requests.post(httpbin('post'), data=test)
|
|
assert post1.status_code == 200
|
|
assert post1.json()['data'] == 'test'
|
|
test = TestStream('test')
|
|
test.seek(2)
|
|
post2 = requests.post(httpbin('post'), data=test)
|
|
assert post2.status_code == 200
|
|
assert post2.json()['data'] == 'st'
|
|
|
|
def test_POSTBIN_GET_POST_FILES_WITH_DATA(self, httpbin):
|
|
url = httpbin('post')
|
|
requests.post(url).raise_for_status()
|
|
post1 = requests.post(url, data={'some': 'data'})
|
|
assert post1.status_code == 200
|
|
with open('Pipfile') as f:
|
|
post2 = requests.post(
|
|
url, data={'some': 'data'}, files={'some': f}
|
|
)
|
|
assert post2.status_code == 200
|
|
post4 = requests.post(url, data='[{"some": "json"}]')
|
|
assert post4.status_code == 200
|
|
with pytest.raises(ValueError):
|
|
requests.post(url, files=['bad file data'])
|
|
|
|
def test_post_with_custom_mapping(self, httpbin):
|
|
|
|
class CustomMapping(collections.MutableMapping):
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self.data = dict(*args, **kwargs)
|
|
|
|
def __delitem__(self, key):
|
|
del self.data[key]
|
|
|
|
def __getitem__(self, key):
|
|
return self.data[key]
|
|
|
|
def __setitem__(self, key, value):
|
|
self.data[key] = value
|
|
|
|
def __iter__(self):
|
|
return iter(self.data)
|
|
|
|
def __len__(self):
|
|
return len(self.data)
|
|
|
|
data = CustomMapping({'some': 'data'})
|
|
url = httpbin('post')
|
|
found_json = requests.post(url, data=data).json().get('form')
|
|
assert found_json == {'some': 'data'}
|
|
|
|
def test_conflicting_post_params(self, httpbin):
|
|
url = httpbin('post')
|
|
with open('Pipfile') as f:
|
|
pytest.raises(
|
|
ValueError,
|
|
"requests.post(url, data='[{\"some\": \"data\"}]', files={'some': f})",
|
|
)
|
|
pytest.raises(
|
|
ValueError,
|
|
"requests.post(url, data=u('[{\"some\": \"data\"}]'), files={'some': f})",
|
|
)
|
|
|
|
def test_request_ok_set(self, httpbin):
|
|
r = requests.get(httpbin('status', '404'))
|
|
assert not r.ok
|
|
|
|
def test_status_raising(self, httpbin):
|
|
r = requests.get(httpbin('status', '404'))
|
|
with pytest.raises(requests.exceptions.HTTPError):
|
|
r.raise_for_status()
|
|
r = requests.get(httpbin('status', '500'))
|
|
assert not r.ok
|
|
|
|
def test_raise_for_status_returns_self(self, httpbin):
|
|
r = requests.get(httpbin('status', '200'))
|
|
assert r.raise_for_status() is r
|
|
|
|
def test_decompress_gzip(self, httpbin):
|
|
r = requests.get(httpbin('gzip'))
|
|
r.content.decode('ascii')
|
|
|
|
@pytest.mark.parametrize(
|
|
'url, params',
|
|
(
|
|
('/get', {'foo': 'føø'}),
|
|
('/get', {'føø': 'føø'}),
|
|
('/get', {'føø': 'føø'}),
|
|
('/get', {'foo': 'foo'}),
|
|
('ø', {'foo': 'foo'}),
|
|
),
|
|
)
|
|
def test_unicode_get(self, httpbin, url, params):
|
|
requests.get(httpbin(url), params=params)
|
|
|
|
def test_unicode_header_name(self, httpbin):
|
|
requests.put(
|
|
httpbin('put'),
|
|
headers={str('Content-Type'): 'application/octet-stream'},
|
|
data='\xff',
|
|
) # compat.str is unicode.
|
|
|
|
def test_pyopenssl_redirect(self, httpbin_secure, httpbin_ca_bundle):
|
|
requests.get(httpbin_secure('status', '301'), verify=httpbin_ca_bundle)
|
|
|
|
def test_invalid_ca_certificate_path(self, httpbin_secure):
|
|
INVALID_PATH = '/garbage'
|
|
with pytest.raises(IOError) as e:
|
|
requests.get(httpbin_secure(), verify=INVALID_PATH)
|
|
assert str(
|
|
e.value
|
|
) == 'Could not find a suitable TLS CA certificate bundle, invalid path: {0}'.format(
|
|
INVALID_PATH
|
|
)
|
|
|
|
def test_invalid_ssl_certificate_files(self, httpbin_secure):
|
|
INVALID_PATH = '/garbage'
|
|
with pytest.raises(IOError) as e:
|
|
requests.get(httpbin_secure(), cert=INVALID_PATH)
|
|
assert str(
|
|
e.value
|
|
) == 'Could not find the TLS certificate file, invalid path: {0}'.format(
|
|
INVALID_PATH
|
|
)
|
|
with pytest.raises(IOError) as e:
|
|
requests.get(httpbin_secure(), cert=('.', INVALID_PATH))
|
|
assert str(
|
|
e.value
|
|
) == 'Could not find the TLS key file, invalid path: {0}'.format(
|
|
INVALID_PATH
|
|
)
|
|
|
|
def test_http_with_certificate(self, httpbin):
|
|
r = requests.get(httpbin(), cert='.')
|
|
assert r.status_code == 200
|
|
|
|
def test_https_warnings(self, httpbin_secure, httpbin_ca_bundle):
|
|
"""warnings are emitted with requests.get"""
|
|
if HAS_MODERN_SSL or HAS_PYOPENSSL:
|
|
warnings_expected = ('SubjectAltNameWarning',)
|
|
else:
|
|
warnings_expected = (
|
|
'SNIMissingWarning',
|
|
'InsecurePlatformWarning',
|
|
'SubjectAltNameWarning',
|
|
)
|
|
with pytest.warns(None) as warning_records:
|
|
warnings.simplefilter('always')
|
|
requests.get(
|
|
httpbin_secure('status', '200'), verify=httpbin_ca_bundle
|
|
)
|
|
warning_records = [
|
|
item
|
|
for item in warning_records
|
|
if item.category.__name__ != 'ResourceWarning'
|
|
]
|
|
warnings_category = tuple(
|
|
item.category.__name__ for item in warning_records
|
|
)
|
|
assert warnings_category == warnings_expected
|
|
|
|
def test_certificate_failure(self, httpbin_secure):
|
|
"""
|
|
When underlying SSL problems occur, an SSLError is raised.
|
|
"""
|
|
with pytest.raises(SSLError):
|
|
# Our local httpbin does not have a trusted CA, so this call will
|
|
# fail if we use our default trust bundle.
|
|
requests.get(httpbin_secure('status', '200'))
|
|
|
|
def test_urlencoded_get_query_multivalued_param(self, httpbin):
|
|
r = requests.get(httpbin('get'), params={'test': ['foo', 'baz']})
|
|
assert r.status_code == 200
|
|
assert r.url == httpbin('get?test=foo&test=baz')
|
|
|
|
def test_different_encodings_dont_break_post(self, httpbin):
|
|
r = requests.post(
|
|
httpbin('post'),
|
|
data={'stuff': json.dumps({'a': 123})},
|
|
params={'blah': 'asdf1234'},
|
|
files={'file': ('test_requests.py', open(__file__, 'rb'))},
|
|
)
|
|
assert r.status_code == 200
|
|
|
|
@pytest.mark.parametrize(
|
|
'data',
|
|
(
|
|
{'stuff': u('ëlïxr')},
|
|
{'stuff': u('ëlïxr').encode('utf-8')},
|
|
{'stuff': 'elixr'},
|
|
{'stuff': 'elixr'.encode('utf-8')},
|
|
),
|
|
)
|
|
def test_unicode_multipart_post(self, httpbin, data):
|
|
r = requests.post(
|
|
httpbin('post'),
|
|
data=data,
|
|
files={'file': ('test_requests.py', open(__file__, 'rb'))},
|
|
)
|
|
assert r.status_code == 200
|
|
|
|
def test_unicode_multipart_post_fieldnames(self, httpbin):
|
|
filename = os.path.splitext(__file__)[0] + '.py'
|
|
r = requests.Request(
|
|
method='POST',
|
|
url=httpbin('post'),
|
|
data={'stuff'.encode('utf-8'): 'elixr'},
|
|
files={'file': ('test_requests.py', open(filename, 'rb'))},
|
|
)
|
|
prep = r.prepare()
|
|
assert b'name="stuff"' in prep.body
|
|
assert b'name="b\'stuff\'"' not in prep.body
|
|
|
|
def test_unicode_method_name(self, httpbin):
|
|
files = {'file': open(__file__, 'rb')}
|
|
r = requests.request(
|
|
method=u('POST'), url=httpbin('post'), files=files
|
|
)
|
|
assert r.status_code == 200
|
|
|
|
def test_unicode_method_name_with_request_object(self, httpbin):
|
|
files = {'file': open(__file__, 'rb')}
|
|
s = requests.Session()
|
|
req = requests.Request(u('POST'), httpbin('post'), files=files)
|
|
prep = s.prepare_request(req)
|
|
assert isinstance(prep.method, builtin_str)
|
|
assert prep.method == 'POST'
|
|
resp = s.send(prep)
|
|
assert resp.status_code == 200
|
|
|
|
def test_non_prepared_request_error(self):
|
|
s = requests.Session()
|
|
req = requests.Request(u('POST'), '/')
|
|
with pytest.raises(ValueError) as e:
|
|
s.send(req)
|
|
assert str(e.value) == 'You can only send PreparedRequests.'
|
|
|
|
def test_custom_content_type(self, httpbin):
|
|
r = requests.post(
|
|
httpbin('post'),
|
|
data={'stuff': json.dumps({'a': 123})},
|
|
files={
|
|
'file1': ('test_requests.py', open(__file__, 'rb')),
|
|
'file2': (
|
|
'test_requests',
|
|
open(__file__, 'rb'),
|
|
'text/py-content-type',
|
|
),
|
|
},
|
|
)
|
|
assert r.status_code == 200
|
|
assert b"text/py-content-type" in r.request.body
|
|
|
|
def test_hook_receives_request_arguments(self, httpbin):
|
|
|
|
def hook(resp, **kwargs):
|
|
assert resp is not None
|
|
assert kwargs != {}
|
|
|
|
s = requests.Session()
|
|
r = requests.Request('GET', httpbin(), hooks={'response': hook})
|
|
prep = s.prepare_request(r)
|
|
s.send(prep)
|
|
|
|
def test_session_hooks_are_used_with_no_request_hooks(self, httpbin):
|
|
hook = lambda x, *args, **kwargs: x
|
|
s = requests.Session()
|
|
s.hooks['response'].append(hook)
|
|
r = requests.Request('GET', httpbin())
|
|
prep = s.prepare_request(r)
|
|
assert prep.hooks['response'] != []
|
|
assert prep.hooks['response'] == [hook]
|
|
|
|
def test_session_hooks_are_overridden_by_request_hooks(self, httpbin):
|
|
hook1 = lambda x, *args, **kwargs: x
|
|
hook2 = lambda x, *args, **kwargs: x
|
|
assert hook1 is not hook2
|
|
s = requests.Session()
|
|
s.hooks['response'].append(hook2)
|
|
r = requests.Request('GET', httpbin(), hooks={'response': [hook1]})
|
|
prep = s.prepare_request(r)
|
|
assert prep.hooks['response'] == [hook1]
|
|
|
|
def test_prepared_request_hook(self, httpbin):
|
|
|
|
def hook(resp, **kwargs):
|
|
resp.hook_working = True
|
|
return resp
|
|
|
|
req = requests.Request('GET', httpbin(), hooks={'response': hook})
|
|
prep = req.prepare()
|
|
s = requests.Session()
|
|
s.proxies = getproxies()
|
|
resp = s.send(prep)
|
|
assert hasattr(resp, 'hook_working')
|
|
|
|
def test_prepared_from_session(self, httpbin):
|
|
|
|
class DummyAuth(requests.auth.AuthBase):
|
|
|
|
def __call__(self, r):
|
|
r.headers['Dummy-Auth-Test'] = 'dummy-auth-test-ok'
|
|
return r
|
|
|
|
req = requests.Request('GET', httpbin('headers'))
|
|
assert not req.auth
|
|
s = requests.Session()
|
|
s.auth = DummyAuth()
|
|
prep = s.prepare_request(req)
|
|
resp = s.send(prep)
|
|
assert resp.json()['headers'][
|
|
'Dummy-Auth-Test'
|
|
] == 'dummy-auth-test-ok'
|
|
|
|
def test_prepare_request_with_bytestring_url(self):
|
|
req = requests.Request('GET', b'https://httpbin.org/')
|
|
s = requests.Session()
|
|
prep = s.prepare_request(req)
|
|
assert prep.url == "https://httpbin.org/"
|
|
|
|
def test_request_with_bytestring_host(self, httpbin):
|
|
s = requests.Session()
|
|
resp = s.request(
|
|
'GET',
|
|
httpbin('cookies/set?cookie=value'),
|
|
allow_redirects=False,
|
|
headers={'Host': b'httpbin.org'},
|
|
)
|
|
assert resp.cookies.get('cookie') == 'value'
|
|
|
|
def test_links(self):
|
|
r = requests.Response()
|
|
r.headers = {
|
|
'cache-control': 'public, max-age=60, s-maxage=60',
|
|
'connection': 'keep-alive',
|
|
'content-encoding': 'gzip',
|
|
'content-type': 'application/json; charset=utf-8',
|
|
'date': 'Sat, 26 Jan 2013 16:47:56 GMT',
|
|
'etag': '"6ff6a73c0e446c1f61614769e3ceb778"',
|
|
'last-modified': 'Sat, 26 Jan 2013 16:22:39 GMT',
|
|
'link': (
|
|
'<https://api.github.com/users/kennethreitz/repos?'
|
|
'page=2&per_page=10>; rel="next", <https://api.github.'
|
|
'com/users/kennethreitz/repos?page=7&per_page=10>; '
|
|
' rel="last"'
|
|
),
|
|
'server': 'GitHub.com',
|
|
'status': '200 OK',
|
|
'vary': 'Accept',
|
|
'x-content-type-options': 'nosniff',
|
|
'x-github-media-type': 'github.beta',
|
|
'x-ratelimit-limit': '60',
|
|
'x-ratelimit-remaining': '57',
|
|
}
|
|
assert r.links['next']['rel'] == 'next'
|
|
|
|
def test_cookie_parameters(self):
|
|
key = 'some_cookie'
|
|
value = 'some_value'
|
|
secure = True
|
|
domain = 'test.com'
|
|
rest = {'HttpOnly': True}
|
|
jar = requests.cookies.RequestsCookieJar()
|
|
jar.set(key, value, secure=secure, domain=domain, rest=rest)
|
|
assert len(jar) == 1
|
|
assert 'some_cookie' in jar
|
|
cookie = list(jar)[0]
|
|
assert cookie.secure == secure
|
|
assert cookie.domain == domain
|
|
assert cookie._rest['HttpOnly'] == rest['HttpOnly']
|
|
|
|
def test_cookie_as_dict_keeps_len(self):
|
|
key = 'some_cookie'
|
|
value = 'some_value'
|
|
key1 = 'some_cookie1'
|
|
value1 = 'some_value1'
|
|
jar = requests.cookies.RequestsCookieJar()
|
|
jar.set(key, value)
|
|
jar.set(key1, value1)
|
|
d1 = dict(jar)
|
|
d2 = dict(jar.iteritems())
|
|
d3 = dict(jar.items())
|
|
assert len(jar) == 2
|
|
assert len(d1) == 2
|
|
assert len(d2) == 2
|
|
assert len(d3) == 2
|
|
|
|
def test_cookie_as_dict_keeps_items(self):
|
|
key = 'some_cookie'
|
|
value = 'some_value'
|
|
key1 = 'some_cookie1'
|
|
value1 = 'some_value1'
|
|
jar = requests.cookies.RequestsCookieJar()
|
|
jar.set(key, value)
|
|
jar.set(key1, value1)
|
|
d1 = dict(jar)
|
|
d2 = dict(jar.iteritems())
|
|
d3 = dict(jar.items())
|
|
assert d1['some_cookie'] == 'some_value'
|
|
assert d2['some_cookie'] == 'some_value'
|
|
assert d3['some_cookie1'] == 'some_value1'
|
|
|
|
def test_cookie_as_dict_keys(self):
|
|
key = 'some_cookie'
|
|
value = 'some_value'
|
|
key1 = 'some_cookie1'
|
|
value1 = 'some_value1'
|
|
jar = requests.cookies.RequestsCookieJar()
|
|
jar.set(key, value)
|
|
jar.set(key1, value1)
|
|
keys = jar.keys()
|
|
assert keys == list(keys)
|
|
# make sure one can use keys multiple times
|
|
assert list(keys) == list(keys)
|
|
|
|
def test_cookie_as_dict_values(self):
|
|
key = 'some_cookie'
|
|
value = 'some_value'
|
|
key1 = 'some_cookie1'
|
|
value1 = 'some_value1'
|
|
jar = requests.cookies.RequestsCookieJar()
|
|
jar.set(key, value)
|
|
jar.set(key1, value1)
|
|
values = jar.values()
|
|
assert values == list(values)
|
|
# make sure one can use values multiple times
|
|
assert list(values) == list(values)
|
|
|
|
def test_cookie_as_dict_items(self):
|
|
key = 'some_cookie'
|
|
value = 'some_value'
|
|
key1 = 'some_cookie1'
|
|
value1 = 'some_value1'
|
|
jar = requests.cookies.RequestsCookieJar()
|
|
jar.set(key, value)
|
|
jar.set(key1, value1)
|
|
items = jar.items()
|
|
assert items == list(items)
|
|
# make sure one can use items multiple times
|
|
assert list(items) == list(items)
|
|
|
|
def test_cookie_duplicate_names_different_domains(self):
|
|
key = 'some_cookie'
|
|
value = 'some_value'
|
|
domain1 = 'test1.com'
|
|
domain2 = 'test2.com'
|
|
jar = requests.cookies.RequestsCookieJar()
|
|
jar.set(key, value, domain=domain1)
|
|
jar.set(key, value, domain=domain2)
|
|
assert key in jar
|
|
items = jar.items()
|
|
assert len(items) == 2
|
|
# Verify that CookieConflictError is raised if domain is not specified
|
|
with pytest.raises(requests.cookies.CookieConflictError):
|
|
jar.get(key)
|
|
# Verify that CookieConflictError is not raised if domain is specified
|
|
cookie = jar.get(key, domain=domain1)
|
|
assert cookie == value
|
|
|
|
def test_cookie_duplicate_names_raises_cookie_conflict_error(self):
|
|
key = 'some_cookie'
|
|
value = 'some_value'
|
|
path = 'some_path'
|
|
jar = requests.cookies.RequestsCookieJar()
|
|
jar.set(key, value, path=path)
|
|
jar.set(key, value)
|
|
with pytest.raises(requests.cookies.CookieConflictError):
|
|
jar.get(key)
|
|
|
|
def test_time_elapsed_blank(self, httpbin):
|
|
r = requests.get(httpbin('get'))
|
|
td = r.elapsed
|
|
total_seconds = (
|
|
(td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6) /
|
|
10 **
|
|
6
|
|
)
|
|
assert total_seconds > 0.0
|
|
|
|
def test_empty_response_has_content_none(self):
|
|
r = requests.Response()
|
|
assert r.content is None
|
|
|
|
def test_response_is_iterable(self):
|
|
r = requests.Response()
|
|
io = StringIO.StringIO('abc')
|
|
read_ = io.read
|
|
|
|
def read_mock(amt, decode_content=None):
|
|
return read_(amt)
|
|
|
|
setattr(io, 'read', read_mock)
|
|
r.raw = io
|
|
assert next(iter(r))
|
|
io.close()
|
|
|
|
def test_response_decode_unicode(self):
|
|
"""When called with decode_unicode, Response.iter_content should always
|
|
return unicode.
|
|
"""
|
|
r = requests.Response()
|
|
r._content_consumed = True
|
|
r._content = b'the content'
|
|
r.encoding = 'ascii'
|
|
chunks = r.iter_content(decode_unicode=True)
|
|
assert all(isinstance(chunk, str) for chunk in chunks)
|
|
# also for streaming
|
|
r = requests.Response()
|
|
r.raw = io.BytesIO(b'the content')
|
|
r.encoding = 'ascii'
|
|
chunks = r.iter_content(decode_unicode=True)
|
|
assert all(isinstance(chunk, str) for chunk in chunks)
|
|
|
|
@pytest.mark.parametrize(
|
|
'encoding, exception',
|
|
((None, TypeError), ('invalid encoding', LookupError)),
|
|
)
|
|
def test_decode_unicode_encoding(self, encoding, exception):
|
|
# raise an exception if encoding isn't set
|
|
r = requests.Response()
|
|
r.raw = io.BytesIO(b'the content')
|
|
r.encoding = encoding
|
|
with pytest.raises(exception):
|
|
chunks = r.iter_content(decode_unicode=True)
|
|
|
|
def test_response_reason_unicode(self):
|
|
# check for unicode HTTP status
|
|
r = requests.Response()
|
|
r.url = u'unicode URL'
|
|
r.reason = u'Komponenttia ei löydy'.encode('utf-8')
|
|
r.status_code = 404
|
|
r.encoding = None
|
|
assert not r.ok # old behaviour - crashes here
|
|
|
|
def test_response_reason_unicode_fallback(self):
|
|
# check raise_status falls back to ISO-8859-1
|
|
r = requests.Response()
|
|
r.url = 'some url'
|
|
reason = u'Komponenttia ei löydy'
|
|
r.reason = reason.encode('latin-1')
|
|
r.status_code = 500
|
|
r.encoding = None
|
|
with pytest.raises(requests.exceptions.HTTPError) as e:
|
|
r.raise_for_status()
|
|
assert reason in e.value.args[0]
|
|
|
|
def test_response_chunk_size_type(self):
|
|
"""Ensure that chunk_size is passed as None or an integer, otherwise
|
|
raise a TypeError.
|
|
"""
|
|
r = requests.Response()
|
|
r.raw = io.BytesIO(b'the content')
|
|
chunks = r.iter_content(1)
|
|
assert all(len(chunk) == 1 for chunk in chunks)
|
|
r = requests.Response()
|
|
r.raw = io.BytesIO(b'the content')
|
|
chunks = r.iter_content(None)
|
|
assert list(chunks) == [b'the content']
|
|
r = requests.Response()
|
|
r.raw = io.BytesIO(b'the content')
|
|
with pytest.raises(TypeError):
|
|
chunks = r.iter_content("1024")
|
|
|
|
def test_request_and_response_are_pickleable(self, httpbin):
|
|
r = requests.get(httpbin('get'))
|
|
# verify we can pickle the original request
|
|
assert pickle.loads(pickle.dumps(r.request))
|
|
# verify we can pickle the response and that we have access to
|
|
# the original request.
|
|
pr = pickle.loads(pickle.dumps(r))
|
|
assert r.request.url == pr.request.url
|
|
assert r.request.headers == pr.request.headers
|
|
|
|
def test_response_lines(self):
|
|
"""
|
|
iter_lines should be able to handle data dribbling in which delimiters
|
|
might not be lined up ideally.
|
|
"""
|
|
mock_chunks = [
|
|
b'This \r\n',
|
|
b'',
|
|
b'is\r',
|
|
b'\n',
|
|
b'a',
|
|
b' ',
|
|
b'',
|
|
b'',
|
|
b'test.',
|
|
b'\r',
|
|
b'\n',
|
|
b'end.',
|
|
]
|
|
mock_data = b''.join(mock_chunks)
|
|
unicode_mock_data = mock_data.decode('utf-8')
|
|
|
|
def mock_iter_content(*args, **kwargs):
|
|
if kwargs.get("decode_unicode"):
|
|
return (e.decode('utf-8') for e in mock_chunks)
|
|
|
|
return (e for e in mock_chunks)
|
|
|
|
r = requests.Response()
|
|
r._content_consumed = True
|
|
r.iter_content = mock_iter_content
|
|
# decode_unicode=None, output raw bytes
|
|
assert list(r.iter_lines(delimiter=b'\r\n')) == mock_data.split(
|
|
b'\r\n'
|
|
)
|
|
# decode_unicode=True, output unicode strings
|
|
assert list(
|
|
r.iter_lines(decode_unicode=True, delimiter=u'\r\n')
|
|
) == unicode_mock_data.split(
|
|
u'\r\n'
|
|
)
|
|
# When delimiter is None, we should yield the same result as splitlines()
|
|
# which supports the universal newline.
|
|
# '\r', '\n', and '\r\n' are all treated as one line break.
|
|
# decode_unicode=None, output raw bytes
|
|
result = list(r.iter_lines())
|
|
assert result == mock_data.splitlines()
|
|
# decode_unicode=True, output unicode strings
|
|
result = list(r.iter_lines(decode_unicode=True))
|
|
assert result == unicode_mock_data.splitlines()
|
|
# If we change all the line breaks to `\r`, we should be okay.
|
|
# decode_unicode=None, output raw bytes
|
|
mock_chunks = [chunk.replace(b'\n', b'\r') for chunk in mock_chunks]
|
|
mock_data = b''.join(mock_chunks)
|
|
assert list(r.iter_lines()) == mock_data.splitlines()
|
|
# decode_unicode=True, output unicode strings
|
|
unicode_mock_data = mock_data.decode('utf-8')
|
|
assert list(
|
|
r.iter_lines(decode_unicode=True)
|
|
) == unicode_mock_data.splitlines(
|
|
)
|
|
|
|
@pytest.mark.parametrize(
|
|
'content, expected_no_delimiter, expected_delimiter',
|
|
(([b''], [], []), ([b'line\n'], [u'line'], [u'line\n']), ([b'line', b'\n'], [u'line'], [u'line\n']), ([b'line\r\n'], [u'line'], [u'line', u'']), ([b'line\r\n', b''], [u'line'], [u'line', u'']), ([b'line', b'\r\n'], [u'line'], [u'line', u'']), ([b'a\r', b'\nb\r'], [u'a', u'b'], [u'a', u'b\r']), ([b'a\r', b'\n', b'\nb'], [u'a', u'', u'b'], [u'a', u'\nb']), ([b'a\n', b'\nb'], [u'a', u'', u'b'], [u'a\n\nb']), ([b'a\r\n', b'\rb\n'], [u'a', u'', u'b'], [u'a', u'\rb\n']), ([b'a\nb', b'c'], [u'a', u'bc'], [u'a\nbc']), ([b'a\n', b'\rb', b'\r\nc'], [u'a', u'', u'b', u'c'], [u'a\n\rb', u'c']), ([b'a\r\nb', b'', b'c'], [u'a', u'bc'], [u'a', u'bc'])),
|
|
# Empty chunk in the end of stream, same behavior as the previous # Empty chunk with pending data
|
|
)
|
|
def test_response_lines_parametrized(
|
|
self, content, expected_no_delimiter, expected_delimiter
|
|
):
|
|
"""
|
|
Test a lot of potential chunk splits to ensure consistency of
|
|
iter_lines(delimiter=x), as well as the legacy behavior of
|
|
iter_lines() without delimiter
|
|
https://github.com/kennethreitz/requests/pull/2431#issuecomment-72333964
|
|
"""
|
|
mock_chunks = content
|
|
|
|
def mock_iter_content(*args, **kwargs):
|
|
if kwargs.get("decode_unicode"):
|
|
return (e.decode('utf-8') for e in mock_chunks)
|
|
|
|
return (e for e in mock_chunks)
|
|
|
|
r = requests.Response()
|
|
r._content_consumed = True
|
|
r.iter_content = mock_iter_content
|
|
# decode_unicode=True, output unicode strings
|
|
assert list(r.iter_lines(decode_unicode=True)) == expected_no_delimiter
|
|
assert list(
|
|
r.iter_lines(decode_unicode=True, delimiter='\r\n')
|
|
) == expected_delimiter
|
|
# decode_unicode=None, output raw bytes
|
|
assert list(r.iter_lines()) == [
|
|
line.encode('utf-8') for line in expected_no_delimiter
|
|
]
|
|
assert list(r.iter_lines(delimiter=b'\r\n')) == [
|
|
line.encode('utf-8') for line in expected_delimiter
|
|
]
|
|
|
|
def test_prepared_request_is_pickleable(self, httpbin):
|
|
p = requests.Request('GET', httpbin('get')).prepare()
|
|
# Verify PreparedRequest can be pickled and unpickled
|
|
r = pickle.loads(pickle.dumps(p))
|
|
assert r.url == p.url
|
|
assert r.headers == p.headers
|
|
assert r.body == p.body
|
|
# Verify unpickled PreparedRequest sends properly
|
|
s = requests.Session()
|
|
resp = s.send(r)
|
|
assert resp.status_code == 200
|
|
|
|
def test_prepared_request_with_file_is_pickleable(self, httpbin):
|
|
files = {'file': open(__file__, 'rb')}
|
|
r = requests.Request('POST', httpbin('post'), files=files)
|
|
p = r.prepare()
|
|
# Verify PreparedRequest can be pickled and unpickled
|
|
r = pickle.loads(pickle.dumps(p))
|
|
assert r.url == p.url
|
|
assert r.headers == p.headers
|
|
assert r.body == p.body
|
|
# Verify unpickled PreparedRequest sends properly
|
|
s = requests.Session()
|
|
resp = s.send(r)
|
|
assert resp.status_code == 200
|
|
|
|
def test_prepared_request_with_hook_is_pickleable(self, httpbin):
|
|
r = requests.Request('GET', httpbin('get'), hooks=default_hooks())
|
|
p = r.prepare()
|
|
# Verify PreparedRequest can be pickled
|
|
r = pickle.loads(pickle.dumps(p))
|
|
assert r.url == p.url
|
|
assert r.headers == p.headers
|
|
assert r.body == p.body
|
|
assert r.hooks == p.hooks
|
|
# Verify unpickled PreparedRequest sends properly
|
|
s = requests.Session()
|
|
resp = s.send(r)
|
|
assert resp.status_code == 200
|
|
|
|
def test_cannot_send_unprepared_requests(self, httpbin):
|
|
r = requests.Request(url=httpbin())
|
|
with pytest.raises(ValueError):
|
|
requests.Session().send(r)
|
|
|
|
def test_http_error(self):
|
|
error = requests.exceptions.HTTPError()
|
|
assert not error.response
|
|
response = requests.Response()
|
|
error = requests.exceptions.HTTPError(response=response)
|
|
assert error.response == response
|
|
error = requests.exceptions.HTTPError('message', response=response)
|
|
assert str(error) == 'message'
|
|
assert error.response == response
|
|
|
|
def test_session_pickling(self, httpbin):
|
|
r = requests.Request('GET', httpbin('get'))
|
|
s = requests.Session()
|
|
s = pickle.loads(pickle.dumps(s))
|
|
s.proxies = getproxies()
|
|
r = s.send(r.prepare())
|
|
assert r.status_code == 200
|
|
|
|
def test_fixes_1329(self, httpbin):
|
|
"""Ensure that header updates are done case-insensitively."""
|
|
s = requests.Session()
|
|
s.headers.update({'ACCEPT': 'BOGUS'})
|
|
s.headers.update({'accept': 'application/json'})
|
|
r = s.get(httpbin('get'))
|
|
headers = r.request.headers
|
|
assert headers['accept'] == 'application/json'
|
|
assert headers['Accept'] == 'application/json'
|
|
assert headers['ACCEPT'] == 'application/json'
|
|
|
|
def test_uppercase_scheme_redirect(self, httpbin):
|
|
parts = urlparse(httpbin('html'))
|
|
url = "HTTP://" + parts.netloc + parts.path
|
|
r = requests.get(httpbin('redirect-to'), params={'url': url})
|
|
assert r.status_code == 200
|
|
assert r.url.lower() == url.lower()
|
|
|
|
def test_transport_adapter_ordering(self):
|
|
s = requests.Session()
|
|
order = ['https://', 'http://']
|
|
assert order == list(s.adapters)
|
|
s.mount('http://git', HTTPAdapter())
|
|
s.mount('http://github', HTTPAdapter())
|
|
s.mount('http://github.com', HTTPAdapter())
|
|
s.mount('http://github.com/about/', HTTPAdapter())
|
|
order = [
|
|
'http://github.com/about/',
|
|
'http://github.com',
|
|
'http://github',
|
|
'http://git',
|
|
'https://',
|
|
'http://',
|
|
]
|
|
assert order == list(s.adapters)
|
|
s.mount('http://gittip', HTTPAdapter())
|
|
s.mount('http://gittip.com', HTTPAdapter())
|
|
s.mount('http://gittip.com/about/', HTTPAdapter())
|
|
order = [
|
|
'http://github.com/about/',
|
|
'http://gittip.com/about/',
|
|
'http://github.com',
|
|
'http://gittip.com',
|
|
'http://github',
|
|
'http://gittip',
|
|
'http://git',
|
|
'https://',
|
|
'http://',
|
|
]
|
|
assert order == list(s.adapters)
|
|
s2 = requests.Session()
|
|
s2.adapters = {'http://': HTTPAdapter()}
|
|
s2.mount('https://', HTTPAdapter())
|
|
assert 'http://' in s2.adapters
|
|
assert 'https://' in s2.adapters
|
|
|
|
def test_header_remove_is_case_insensitive(self, httpbin):
|
|
# From issue #1321
|
|
s = requests.Session()
|
|
s.headers['foo'] = 'bar'
|
|
r = s.get(httpbin('get'), headers={'FOO': None})
|
|
assert 'foo' not in r.request.headers
|
|
|
|
def test_params_are_merged_case_sensitive(self, httpbin):
|
|
s = requests.Session()
|
|
s.params['foo'] = 'bar'
|
|
r = s.get(httpbin('get'), params={'FOO': 'bar'})
|
|
assert r.json()['args'] == {'foo': 'bar', 'FOO': 'bar'}
|
|
|
|
def test_long_authinfo_in_url(self):
|
|
url = 'http://{0}:{1}@{2}:9000/path?query#frag'.format(
|
|
'E8A3BE87-9E3F-4620-8858-95478E385B5B',
|
|
'EA770032-DA4D-4D84-8CE9-29C6D910BF1E',
|
|
'exactly-------------sixty-----------three------------characters',
|
|
)
|
|
r = requests.Request('GET', url).prepare()
|
|
assert r.url == url
|
|
|
|
def test_header_keys_are_native(self, httpbin):
|
|
headers = {u('unicode'): 'blah', 'byte'.encode('ascii'): 'blah'}
|
|
r = requests.Request('GET', httpbin('get'), headers=headers)
|
|
p = r.prepare()
|
|
# This is testing that they are builtin strings. A bit weird, but there
|
|
# we go.
|
|
assert 'unicode' in p.headers.keys()
|
|
assert 'byte' in p.headers.keys()
|
|
|
|
def test_header_validation(self, httpbin):
|
|
"""Ensure prepare_headers regex isn't flagging valid header contents."""
|
|
headers_ok = {
|
|
'foo': 'bar baz qux',
|
|
'bar': u'fbbq'.encode('utf8'),
|
|
'baz': '',
|
|
'qux': '1',
|
|
}
|
|
r = requests.get(httpbin('get'), headers=headers_ok)
|
|
assert r.request.headers['foo'] == headers_ok['foo']
|
|
|
|
def test_header_value_not_str(self, httpbin):
|
|
"""Ensure the header value is of type string or bytes as
|
|
per discussion in GH issue #3386
|
|
"""
|
|
headers_int = {'foo': 3}
|
|
headers_dict = {'bar': {'foo': 'bar'}}
|
|
headers_list = {'baz': ['foo', 'bar']}
|
|
# Test for int
|
|
with pytest.raises(InvalidHeader) as excinfo:
|
|
r = requests.get(httpbin('get'), headers=headers_int)
|
|
assert 'foo' in str(excinfo.value)
|
|
# Test for dict
|
|
with pytest.raises(InvalidHeader) as excinfo:
|
|
r = requests.get(httpbin('get'), headers=headers_dict)
|
|
assert 'bar' in str(excinfo.value)
|
|
# Test for list
|
|
with pytest.raises(InvalidHeader) as excinfo:
|
|
r = requests.get(httpbin('get'), headers=headers_list)
|
|
assert 'baz' in str(excinfo.value)
|
|
|
|
def test_header_no_return_chars(self, httpbin):
|
|
"""Ensure that a header containing return character sequences raise an
|
|
exception. Otherwise, multiple headers are created from single string.
|
|
"""
|
|
headers_ret = {'foo': 'bar\r\nbaz: qux'}
|
|
headers_lf = {'foo': 'bar\nbaz: qux'}
|
|
headers_cr = {'foo': 'bar\rbaz: qux'}
|
|
# Test for newline
|
|
with pytest.raises(InvalidHeader):
|
|
r = requests.get(httpbin('get'), headers=headers_ret)
|
|
# Test for line feed
|
|
with pytest.raises(InvalidHeader):
|
|
r = requests.get(httpbin('get'), headers=headers_lf)
|
|
# Test for carriage return
|
|
with pytest.raises(InvalidHeader):
|
|
r = requests.get(httpbin('get'), headers=headers_cr)
|
|
|
|
def test_header_no_leading_space(self, httpbin):
|
|
"""Ensure headers containing leading whitespace raise
|
|
InvalidHeader Error before sending.
|
|
"""
|
|
headers_space = {'foo': ' bar'}
|
|
headers_tab = {'foo': ' bar'}
|
|
# Test for whitespace
|
|
with pytest.raises(InvalidHeader):
|
|
r = requests.get(httpbin('get'), headers=headers_space)
|
|
# Test for tab
|
|
with pytest.raises(InvalidHeader):
|
|
r = requests.get(httpbin('get'), headers=headers_tab)
|
|
|
|
@pytest.mark.parametrize('files', ('foo', b'foo', bytearray(b'foo')))
|
|
def test_can_send_objects_with_files(self, httpbin, files):
|
|
data = {'a': 'this is a string'}
|
|
files = {'b': files}
|
|
r = requests.Request('POST', httpbin('post'), data=data, files=files)
|
|
p = r.prepare()
|
|
assert 'multipart/form-data' in p.headers['Content-Type']
|
|
|
|
def test_can_send_file_object_with_non_string_filename(self, httpbin):
|
|
f = io.BytesIO()
|
|
f.name = 2
|
|
r = requests.Request('POST', httpbin('post'), files={'f': f})
|
|
p = r.prepare()
|
|
assert 'multipart/form-data' in p.headers['Content-Type']
|
|
|
|
def test_autoset_header_values_are_native(self, httpbin):
|
|
data = 'this is a string'
|
|
length = '16'
|
|
req = requests.Request('POST', httpbin('post'), data=data)
|
|
p = req.prepare()
|
|
assert p.headers['Content-Length'] == length
|
|
|
|
def test_nonhttp_schemes_dont_check_URLs(self):
|
|
test_urls = (
|
|
'data:image/gif;base64,R0lGODlhAQABAHAAACH5BAUAAAAALAAAAAABAAEAAAICRAEAOw==',
|
|
'file:///etc/passwd',
|
|
'magnet:?xt=urn:btih:be08f00302bc2d1d3cfa3af02024fa647a271431',
|
|
)
|
|
for test_url in test_urls:
|
|
req = requests.Request('GET', test_url)
|
|
preq = req.prepare()
|
|
assert test_url == preq.url
|
|
|
|
@pytest.mark.xfail(raises=ConnectionError)
|
|
def test_auth_is_stripped_on_redirect_off_host(self, httpbin):
|
|
r = requests.get(
|
|
httpbin('redirect-to'),
|
|
params={'url': 'http://www.google.co.uk'},
|
|
auth=('user', 'pass'),
|
|
)
|
|
assert r.history[0].request.headers['Authorization']
|
|
assert not r.request.headers.get('Authorization', '')
|
|
|
|
def test_auth_is_retained_for_redirect_on_host(self, httpbin):
|
|
r = requests.get(httpbin('redirect/1'), auth=('user', 'pass'))
|
|
h1 = r.history[0].request.headers['Authorization']
|
|
h2 = r.request.headers['Authorization']
|
|
assert h1 == h2
|
|
|
|
def test_manual_redirect_with_partial_body_read(self, httpbin):
|
|
s = requests.Session()
|
|
req = requests.Request('GET', httpbin('redirect/2')).prepare()
|
|
r1 = s.send(req, allow_redirects=False, stream=True)
|
|
assert r1.is_redirect
|
|
rg = s.resolve_redirects(r1, req, stream=True)
|
|
# read only the first eight bytes of the response body,
|
|
# then follow the redirect
|
|
r1.iter_content(8)
|
|
r2 = next(rg)
|
|
assert r2.is_redirect
|
|
# read all of the response via iter_content,
|
|
# then follow the redirect
|
|
for _ in r2.iter_content():
|
|
pass
|
|
r3 = next(rg)
|
|
assert not r3.is_redirect
|
|
|
|
def test_prepare_body_position_non_stream(self):
|
|
data = b'the data'
|
|
s = requests.Session()
|
|
prep = requests.Request(
|
|
'GET', 'http://example.com', data=data
|
|
).prepare(
|
|
)
|
|
assert prep._body_position is None
|
|
|
|
def test_rewind_body(self):
|
|
data = io.BytesIO(b'the data')
|
|
s = requests.Session()
|
|
prep = requests.Request(
|
|
'GET', 'http://example.com', data=data
|
|
).prepare(
|
|
)
|
|
assert prep._body_position == 0
|
|
assert prep.body.read() == b'the data'
|
|
# the data has all been read
|
|
assert prep.body.read() == b''
|
|
# rewind it back
|
|
requests.utils.rewind_body(prep)
|
|
assert prep.body.read() == b'the data'
|
|
|
|
def test_rewind_partially_read_body(self):
|
|
data = io.BytesIO(b'the data')
|
|
s = requests.Session()
|
|
data.read(4) # read some data
|
|
prep = requests.Request(
|
|
'GET', 'http://example.com', data=data
|
|
).prepare(
|
|
)
|
|
assert prep._body_position == 4
|
|
assert prep.body.read() == b'data'
|
|
# the data has all been read
|
|
assert prep.body.read() == b''
|
|
# rewind it back
|
|
requests.utils.rewind_body(prep)
|
|
assert prep.body.read() == b'data'
|
|
|
|
def test_rewind_body_no_seek(self):
|
|
|
|
class BadFileObj:
|
|
|
|
def __init__(self, data):
|
|
self.data = data
|
|
|
|
def tell(self):
|
|
return 0
|
|
|
|
def __iter__(self):
|
|
return
|
|
|
|
data = BadFileObj('the data')
|
|
s = requests.Session()
|
|
prep = requests.Request(
|
|
'GET', 'http://example.com', data=data
|
|
).prepare(
|
|
)
|
|
assert prep._body_position == 0
|
|
with pytest.raises(UnrewindableBodyError) as e:
|
|
requests.utils.rewind_body(prep)
|
|
assert 'Unable to rewind request body' in str(e)
|
|
|
|
def test_rewind_body_failed_seek(self):
|
|
|
|
class BadFileObj:
|
|
|
|
def __init__(self, data):
|
|
self.data = data
|
|
|
|
def tell(self):
|
|
return 0
|
|
|
|
def seek(self, pos, whence=0):
|
|
raise OSError()
|
|
|
|
def __iter__(self):
|
|
return
|
|
|
|
data = BadFileObj('the data')
|
|
s = requests.Session()
|
|
prep = requests.Request(
|
|
'GET', 'http://example.com', data=data
|
|
).prepare(
|
|
)
|
|
assert prep._body_position == 0
|
|
with pytest.raises(UnrewindableBodyError) as e:
|
|
requests.utils.rewind_body(prep)
|
|
assert 'error occurred when rewinding request body' in str(e)
|
|
|
|
def test_rewind_body_failed_tell(self):
|
|
|
|
class BadFileObj:
|
|
|
|
def __init__(self, data):
|
|
self.data = data
|
|
|
|
def tell(self):
|
|
raise OSError()
|
|
|
|
def __iter__(self):
|
|
return
|
|
|
|
data = BadFileObj('the data')
|
|
s = requests.Session()
|
|
prep = requests.Request(
|
|
'GET', 'http://example.com', data=data
|
|
).prepare(
|
|
)
|
|
assert prep._body_position is not None
|
|
with pytest.raises(UnrewindableBodyError) as e:
|
|
requests.utils.rewind_body(prep)
|
|
assert 'Unable to rewind request body' in str(e)
|
|
|
|
def _patch_adapter_gzipped_redirect(self, session, url):
|
|
adapter = session.get_adapter(url=url)
|
|
org_build_response = adapter.build_response
|
|
self._patched_response = False
|
|
|
|
def build_response(*args, **kwargs):
|
|
resp = org_build_response(*args, **kwargs)
|
|
if not self._patched_response:
|
|
resp.raw.headers['content-encoding'] = 'gzip'
|
|
self._patched_response = True
|
|
return resp
|
|
|
|
adapter.build_response = build_response
|
|
|
|
def test_redirect_with_wrong_gzipped_header(self, httpbin):
|
|
s = requests.Session()
|
|
url = httpbin('redirect/1')
|
|
self._patch_adapter_gzipped_redirect(s, url)
|
|
s.get(url)
|
|
|
|
@pytest.mark.parametrize(
|
|
'username, password, auth_str',
|
|
(
|
|
('test', 'test', 'Basic dGVzdDp0ZXN0'),
|
|
(
|
|
u'имя'.encode('utf-8'),
|
|
u'пароль'.encode('utf-8'),
|
|
'Basic 0LjQvNGPOtC/0LDRgNC+0LvRjA==',
|
|
),
|
|
),
|
|
)
|
|
def test_basic_auth_str_is_always_native(
|
|
self, username, password, auth_str
|
|
):
|
|
s = _basic_auth_str(username, password)
|
|
assert isinstance(s, builtin_str)
|
|
assert s == auth_str
|
|
|
|
def test_requests_history_is_saved(self, httpbin):
|
|
r = requests.get(httpbin('redirect/5'))
|
|
total = r.history[-1].history
|
|
i = 0
|
|
for item in r.history:
|
|
assert item.history == total[0:i]
|
|
i += 1
|
|
|
|
def test_json_param_post_content_type_works(self, httpbin):
|
|
r = requests.post(httpbin('post'), json={'life': 42})
|
|
assert r.status_code == 200
|
|
assert 'application/json' in r.request.headers['Content-Type']
|
|
assert {'life': 42} == r.json()['json']
|
|
|
|
def test_json_param_post_should_not_override_data_param(self, httpbin):
|
|
r = requests.Request(
|
|
method='POST',
|
|
url=httpbin('post'),
|
|
data={'stuff': 'elixr'},
|
|
json={'music': 'flute'},
|
|
)
|
|
prep = r.prepare()
|
|
assert 'stuff=elixr' == prep.body
|
|
|
|
@pytest.mark.parametrize('decode_unicode', (True, False))
|
|
def test_response_iter_lines(self, httpbin, decode_unicode):
|
|
r = requests.get(httpbin('stream/4'), stream=True)
|
|
assert r.status_code == 200
|
|
r.encoding = 'utf-8'
|
|
it = r.iter_lines(decode_unicode=decode_unicode)
|
|
next(it)
|
|
assert len(list(it)) == 3
|
|
|
|
def test_response_context_manager(self, httpbin):
|
|
with requests.get(httpbin('stream/4'), stream=True) as response:
|
|
assert isinstance(response, requests.Response)
|
|
assert response.raw.closed
|
|
|
|
def test_unconsumed_session_response_closes_connection(self, httpbin):
|
|
s = requests.session()
|
|
with contextlib.closing(
|
|
s.get(httpbin('stream/4'), stream=True)
|
|
) as response:
|
|
pass
|
|
assert response._content_consumed is False
|
|
assert response.raw.closed
|
|
|
|
@pytest.mark.xfail
|
|
def test_response_iter_lines_reentrant(self, httpbin):
|
|
"""Response.iter_lines() is not reentrant safe"""
|
|
r = requests.get(httpbin('stream/4'), stream=True)
|
|
assert r.status_code == 200
|
|
next(r.iter_lines())
|
|
assert len(list(r.iter_lines())) == 3
|
|
|
|
def test_environment_comes_after_session(self, httpbin):
|
|
"""The Session arguments should come before environment arguments."""
|
|
# We get proxies from the environment and verify from the argument.
|
|
s = requests.Session()
|
|
a = SendRecordingAdapter()
|
|
s.mount('http://', a)
|
|
# Both of these arguments are safe fallbacks that we can easily
|
|
# detect, but which will allow the request to succeed.
|
|
s.verify = False
|
|
s.proxies = {'http': None}
|
|
old_proxy = os.environ.get('HTTP_PROXY')
|
|
old_bundle = os.environ.get('REQUESTS_CA_BUNDLE')
|
|
try:
|
|
os.environ['HTTP_PROXY'] = '10.10.10.10:3128'
|
|
os.environ['REQUESTS_CA_BUNDLE'] = '/path/to/nowhere'
|
|
s.get(httpbin('get'), timeout=5)
|
|
finally:
|
|
if old_proxy is not None:
|
|
os.environ['HTTP_PROXY'] = old_proxy
|
|
else:
|
|
del os.environ['HTTP_PROXY']
|
|
if old_bundle is not None:
|
|
os.environ['REQUESTS_CA_BUNDLE'] = old_bundle
|
|
else:
|
|
del os.environ['REQUESTS_CA_BUNDLE']
|
|
call = a.send_calls[0]
|
|
assert call[1]['verify'] == False
|
|
proxies = call[1]['proxies']
|
|
with pytest.raises(KeyError):
|
|
proxies['http']
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def test_merge_environment_settings_verify(self, monkeypatch):
|
|
"""Assert CA environment settings are merged as expected when missing"""
|
|
session = requests.Session()
|
|
monkeypatch.delenv('CURL_CA_BUNDLE', raising=False)
|
|
monkeypatch.delenv('REQUESTS_CA_BUNDLE', raising=False)
|
|
assert session.trust_env is True
|
|
assert session.verify is True
|
|
assert 'REQUESTS_CA_BUNDLE' not in os.environ
|
|
assert 'CURL_CA_BUNDLE' not in os.environ
|
|
merged_settings = session.merge_environment_settings(
|
|
'http://example.com', {}, False, True, None
|
|
)
|
|
assert merged_settings['verify'] is True
|
|
|
|
def test_session_close_proxy_clear(self, mocker):
|
|
proxies = {'one': mocker.Mock(), 'two': mocker.Mock()}
|
|
session = requests.Session()
|
|
mocker.patch.dict(session.adapters['http://'].proxy_manager, proxies)
|
|
session.close()
|
|
proxies['one'].clear.assert_called_once_with()
|
|
proxies['two'].clear.assert_called_once_with()
|
|
|
|
def test_proxy_auth(self, httpbin):
|
|
adapter = HTTPAdapter()
|
|
headers = adapter.proxy_headers("http://user:pass@httpbin.org")
|
|
assert headers == {'Proxy-Authorization': 'Basic dXNlcjpwYXNz'}
|
|
|
|
def test_proxy_auth_empty_pass(self, httpbin):
|
|
adapter = HTTPAdapter()
|
|
headers = adapter.proxy_headers("http://user:@httpbin.org")
|
|
assert headers == {'Proxy-Authorization': 'Basic dXNlcjo='}
|
|
|
|
def test_response_json_when_content_is_None(self, httpbin):
|
|
r = requests.get(httpbin('/status/204'))
|
|
# Make sure r.content is None
|
|
r.status_code = 0
|
|
r._content = False
|
|
r._content_consumed = False
|
|
assert r.content is None
|
|
with pytest.raises(ValueError):
|
|
r.json()
|
|
|
|
def test_response_without_release_conn(self):
|
|
"""Test `close` call for non-urllib3-like raw objects.
|
|
Should work when `release_conn` attr doesn't exist on `response.raw`.
|
|
"""
|
|
resp = requests.Response()
|
|
resp.raw = StringIO.StringIO('test')
|
|
assert not resp.raw.closed
|
|
resp.close()
|
|
assert resp.raw.closed
|
|
|
|
def test_updating_ca_cert(self, httpbin_secure):
|
|
"""Assert that requests use the latest configured CA certificates."""
|
|
session = requests.session()
|
|
session.verify = pytest_httpbin.certs.where()
|
|
session.get(httpbin_secure('/'))
|
|
session.verify = True
|
|
with pytest.raises(requests.exceptions.SSLError) as e:
|
|
session.get(httpbin_secure('/'))
|
|
assert 'certificate verify failed' in str(e)
|
|
|
|
def test_updating_client_cert(self, httpbin_secure):
|
|
"""Assert that requests use the latest configured client certificates."""
|
|
ca_file = pytest_httpbin.certs.where()
|
|
cert_dir = os.path.dirname(ca_file)
|
|
# All we need is a valid certificate and key to make a request. httpbin_secure
|
|
# won't check the signature or subject name, so it's okay that these happen to
|
|
# be the server's certificate and key.
|
|
cert = os.path.join(cert_dir, 'cert.pem')
|
|
key = os.path.join(cert_dir, 'key.pem')
|
|
session = requests.session()
|
|
session.verify = ca_file
|
|
resp = session.get(httpbin_secure('/'))
|
|
resp_with_cert = session.get(httpbin_secure('/'), cert=(cert, key))
|
|
assert resp_with_cert.raw._pool.cert_file == cert
|
|
assert resp_with_cert.raw._pool.key_file == key
|
|
assert resp.raw._pool is not resp_with_cert.raw._pool
|
|
|
|
def test_empty_stream_with_auth_does_not_set_content_length_header(
|
|
self, httpbin
|
|
):
|
|
"""Ensure that a byte stream with size 0 will not set both a Content-Length
|
|
and Transfer-Encoding header.
|
|
"""
|
|
auth = ('user', 'pass')
|
|
url = httpbin('post')
|
|
file_obj = io.BytesIO(b'')
|
|
r = requests.Request('POST', url, auth=auth, data=file_obj)
|
|
prepared_request = r.prepare()
|
|
assert 'Transfer-Encoding' in prepared_request.headers
|
|
assert 'Content-Length' not in prepared_request.headers
|
|
|
|
def test_stream_with_auth_does_not_set_transfer_encoding_header(
|
|
self, httpbin
|
|
):
|
|
"""Ensure that a byte stream with size > 0 will not set both a Content-Length
|
|
and Transfer-Encoding header.
|
|
"""
|
|
auth = ('user', 'pass')
|
|
url = httpbin('post')
|
|
file_obj = io.BytesIO(b'test data')
|
|
r = requests.Request('POST', url, auth=auth, data=file_obj)
|
|
prepared_request = r.prepare()
|
|
assert 'Transfer-Encoding' not in prepared_request.headers
|
|
assert 'Content-Length' in prepared_request.headers
|
|
|
|
def test_chunked_upload_does_not_set_content_length_header(self, httpbin):
|
|
"""Ensure that requests with a generator body stream using
|
|
Transfer-Encoding: chunked, not a Content-Length header.
|
|
"""
|
|
data = (i for i in [b'a', b'b', b'c'])
|
|
url = httpbin('post')
|
|
r = requests.Request('POST', url, data=data)
|
|
prepared_request = r.prepare()
|
|
assert 'Transfer-Encoding' in prepared_request.headers
|
|
assert 'Content-Length' not in prepared_request.headers
|
|
|
|
def test_chunked_upload_with_manually_set_content_length_header_raises_error(
|
|
self, httpbin
|
|
):
|
|
"""Ensure that if a user manually sets a content length header, when
|
|
the data is chunked, that an InvalidHeader error is raised.
|
|
"""
|
|
data = (i for i in [b'a', b'b', b'c'])
|
|
url = httpbin('post')
|
|
with pytest.raises(InvalidHeader):
|
|
r = requests.post(
|
|
url, data=data, headers={'Content-Length': 'foo'}
|
|
)
|
|
|
|
def test_content_length_with_manually_set_transfer_encoding_raises_error(
|
|
self, httpbin
|
|
):
|
|
"""Ensure that if a user manually sets a Transfer-Encoding header when
|
|
data is not chunked that an InvalidHeader error is raised.
|
|
"""
|
|
data = 'test data'
|
|
url = httpbin('post')
|
|
with pytest.raises(InvalidHeader):
|
|
r = requests.post(
|
|
url, data=data, headers={'Transfer-Encoding': 'chunked'}
|
|
)
|
|
|
|
def test_null_body_does_not_raise_error(self, httpbin):
|
|
url = httpbin('post')
|
|
try:
|
|
requests.post(url, data=None)
|
|
except InvalidHeader:
|
|
pytest.fail('InvalidHeader error raised unexpectedly.')
|
|
|
|
@pytest.mark.parametrize(
|
|
'body, expected',
|
|
(
|
|
(None, ('Content-Length', '0')),
|
|
('test_data', ('Content-Length', '9')),
|
|
(io.BytesIO(b'test_data'), ('Content-Length', '9')),
|
|
(StringIO.StringIO(''), ('Transfer-Encoding', 'chunked')),
|
|
),
|
|
)
|
|
def test_prepare_content_length(self, httpbin, body, expected):
|
|
"""Test prepare_content_length creates expected header."""
|
|
prep = requests.PreparedRequest()
|
|
prep.headers = {}
|
|
prep.method = 'POST'
|
|
# Ensure Content-Length is set appropriately.
|
|
key, value = expected
|
|
prep.prepare_content_length(body)
|
|
assert prep.headers[key] == value
|
|
|
|
def test_prepare_content_length_with_bad_body(self, httpbin):
|
|
"""Test prepare_content_length raises exception with unsendable body."""
|
|
# Initialize minimum required PreparedRequest.
|
|
prep = requests.PreparedRequest()
|
|
prep.headers = {}
|
|
prep.method = 'POST'
|
|
with pytest.raises(InvalidBodyError) as e:
|
|
# Send object that isn't iterable and has no accessible content.
|
|
prep.prepare_content_length(object())
|
|
assert "Non-null body must have length or be streamable." in str(e)
|
|
|
|
def test_custom_redirect_mixin(self, httpbin):
|
|
"""Tests a custom mixin to overwrite ``get_redirect_target``.
|
|
|
|
Ensures a subclassed ``requests.Session`` can handle a certain type of
|
|
malformed redirect responses.
|
|
|
|
1. original request receives a proper response: 302 redirect
|
|
2. following the redirect, a malformed response is given:
|
|
status code = HTTP 200
|
|
location = alternate url
|
|
3. the custom session catches the edge case and follows the redirect
|
|
"""
|
|
url_final = httpbin('html')
|
|
querystring_malformed = urlencode({'location': url_final})
|
|
url_redirect_malformed = httpbin(
|
|
'response-headers?%s' % querystring_malformed
|
|
)
|
|
querystring_redirect = urlencode({'url': url_redirect_malformed})
|
|
url_redirect = httpbin('redirect-to?%s' % querystring_redirect)
|
|
urls_test = [url_redirect, url_redirect_malformed, url_final]
|
|
|
|
class CustomRedirectSession(requests.Session):
|
|
|
|
def get_redirect_target(self, resp):
|
|
# default behavior
|
|
if resp.is_redirect:
|
|
return resp.headers['location']
|
|
|
|
# edge case - check to see if 'location' is in headers anyways
|
|
location = resp.headers.get('location')
|
|
if location and (location != resp.url):
|
|
return location
|
|
|
|
return None
|
|
|
|
session = CustomRedirectSession()
|
|
r = session.get(urls_test[0])
|
|
assert len(r.history) == 2
|
|
assert r.status_code == 200
|
|
assert r.history[0].status_code == 302
|
|
assert r.history[0].is_redirect
|
|
assert r.history[1].status_code == 200
|
|
assert not r.history[1].is_redirect
|
|
assert r.url == urls_test[2]
|
|
|
|
def test_multiple_response_headers_with_same_name_same_case(self, httpbin):
|
|
qs = 'Fruit=Apple&Fruit=Blood+Orange&Fruit=Banana&Fruit=Berry,+Blue'
|
|
resp = requests.get(httpbin('response-headers?' + qs))
|
|
fruits = resp.headers['fruit']
|
|
assert fruits == 'Apple, Blood Orange, Banana, Berry, Blue'
|
|
|
|
# As we are using HTTPHeaderDict, we should be able to extract the
|
|
# individual header values too.
|
|
assert resp.headers.getlist('fruit') == [
|
|
'Apple', 'Blood Orange', 'Banana', 'Berry, Blue'
|
|
]
|
|
|
|
def test_multiple_response_headers_with_same_name_diff_case(self, httpbin):
|
|
# urllib3 seems to have trouble guaranteeing the order of the items when
|
|
# the case is different, so we just need to make sure all of the items
|
|
# are there, rather than asserting a particular order.
|
|
qs = 'Fruit=Apple&Fruit=Blood+Orange&Fruit=Banana&Fruit=Berry,+Blue'
|
|
resp = requests.get(httpbin('response-headers?' + qs))
|
|
|
|
# These are all possible acceptable combinations for the header.
|
|
fruit_choices = ['Apple', 'Blood Orange', 'Banana', 'Berry, Blue']
|
|
fruit_permutations = itertools.permutations(fruit_choices)
|
|
fruit_multiheaders = [list(fp) for fp in fruit_permutations]
|
|
fruit_headers = set(', '.join(fp) for fp in fruit_multiheaders)
|
|
assert resp.headers['fruit'] in fruit_headers
|
|
|
|
# As we are using HTTPHeaderDict, we should be able to extract the
|
|
# individual header values too.
|
|
assert resp.headers.getlist('fruit') in fruit_multiheaders
|
|
|
|
|
|
class TestCaseInsensitiveDict:
|
|
|
|
@pytest.mark.parametrize(
|
|
'cid',
|
|
(
|
|
CaseInsensitiveDict({'Foo': 'foo', 'BAr': 'bar'}),
|
|
CaseInsensitiveDict([('Foo', 'foo'), ('BAr', 'bar')]),
|
|
CaseInsensitiveDict(FOO='foo', BAr='bar'),
|
|
),
|
|
)
|
|
def test_init(self, cid):
|
|
assert len(cid) == 2
|
|
assert 'foo' in cid
|
|
assert 'bar' in cid
|
|
|
|
def test_docstring_example(self):
|
|
cid = CaseInsensitiveDict()
|
|
cid['Accept'] = 'application/json'
|
|
assert cid['aCCEPT'] == 'application/json'
|
|
assert list(cid) == ['Accept']
|
|
|
|
def test_len(self):
|
|
cid = CaseInsensitiveDict({'a': 'a', 'b': 'b'})
|
|
cid['A'] = 'a'
|
|
assert len(cid) == 2
|
|
|
|
def test_getitem(self):
|
|
cid = CaseInsensitiveDict({'Spam': 'blueval'})
|
|
assert cid['spam'] == 'blueval'
|
|
assert cid['SPAM'] == 'blueval'
|
|
|
|
def test_fixes_649(self):
|
|
"""__setitem__ should behave case-insensitively."""
|
|
cid = CaseInsensitiveDict()
|
|
cid['spam'] = 'oneval'
|
|
cid['Spam'] = 'twoval'
|
|
cid['sPAM'] = 'redval'
|
|
cid['SPAM'] = 'blueval'
|
|
assert cid['spam'] == 'blueval'
|
|
assert cid['SPAM'] == 'blueval'
|
|
assert list(cid.keys()) == ['SPAM']
|
|
|
|
def test_delitem(self):
|
|
cid = CaseInsensitiveDict()
|
|
cid['Spam'] = 'someval'
|
|
del cid['sPam']
|
|
assert 'spam' not in cid
|
|
assert len(cid) == 0
|
|
|
|
def test_contains(self):
|
|
cid = CaseInsensitiveDict()
|
|
cid['Spam'] = 'someval'
|
|
assert 'Spam' in cid
|
|
assert 'spam' in cid
|
|
assert 'SPAM' in cid
|
|
assert 'sPam' in cid
|
|
assert 'notspam' not in cid
|
|
|
|
def test_get(self):
|
|
cid = CaseInsensitiveDict()
|
|
cid['spam'] = 'oneval'
|
|
cid['SPAM'] = 'blueval'
|
|
assert cid.get('spam') == 'blueval'
|
|
assert cid.get('SPAM') == 'blueval'
|
|
assert cid.get('sPam') == 'blueval'
|
|
assert cid.get('notspam', 'default') == 'default'
|
|
|
|
def test_update(self):
|
|
cid = CaseInsensitiveDict()
|
|
cid['spam'] = 'blueval'
|
|
cid.update({'sPam': 'notblueval'})
|
|
assert cid['spam'] == 'notblueval'
|
|
cid = CaseInsensitiveDict({'Foo': 'foo', 'BAr': 'bar'})
|
|
cid.update({'fOO': 'anotherfoo', 'bAR': 'anotherbar'})
|
|
assert len(cid) == 2
|
|
assert cid['foo'] == 'anotherfoo'
|
|
assert cid['bar'] == 'anotherbar'
|
|
|
|
def test_update_retains_unchanged(self):
|
|
cid = CaseInsensitiveDict({'foo': 'foo', 'bar': 'bar'})
|
|
cid.update({'foo': 'newfoo'})
|
|
assert cid['bar'] == 'bar'
|
|
|
|
def test_iter(self):
|
|
cid = CaseInsensitiveDict({'Spam': 'spam', 'Eggs': 'eggs'})
|
|
keys = frozenset(['Spam', 'Eggs'])
|
|
assert frozenset(iter(cid)) == keys
|
|
|
|
def test_equality(self):
|
|
cid = CaseInsensitiveDict({'SPAM': 'blueval', 'Eggs': 'redval'})
|
|
othercid = CaseInsensitiveDict({'spam': 'blueval', 'eggs': 'redval'})
|
|
assert cid == othercid
|
|
del othercid['spam']
|
|
assert cid != othercid
|
|
assert cid == {'spam': 'blueval', 'eggs': 'redval'}
|
|
assert cid != object()
|
|
|
|
def test_setdefault(self):
|
|
cid = CaseInsensitiveDict({'Spam': 'blueval'})
|
|
assert cid.setdefault('spam', 'notblueval') == 'blueval'
|
|
assert cid.setdefault('notspam', 'notblueval') == 'notblueval'
|
|
|
|
def test_lower_items(self):
|
|
cid = CaseInsensitiveDict(
|
|
{'Accept': 'application/json', 'user-Agent': 'requests'}
|
|
)
|
|
keyset = frozenset(lowerkey for lowerkey, v in cid.lower_items())
|
|
lowerkeyset = frozenset(['accept', 'user-agent'])
|
|
assert keyset == lowerkeyset
|
|
|
|
def test_preserve_key_case(self):
|
|
cid = CaseInsensitiveDict(
|
|
{'Accept': 'application/json', 'user-Agent': 'requests'}
|
|
)
|
|
keyset = frozenset(['Accept', 'user-Agent'])
|
|
assert frozenset(i[0] for i in cid.items()) == keyset
|
|
assert frozenset(cid.keys()) == keyset
|
|
assert frozenset(cid) == keyset
|
|
|
|
def test_preserve_last_key_case(self):
|
|
cid = CaseInsensitiveDict(
|
|
{'Accept': 'application/json', 'user-Agent': 'requests'}
|
|
)
|
|
cid.update({'ACCEPT': 'application/json'})
|
|
cid['USER-AGENT'] = 'requests'
|
|
keyset = frozenset(['ACCEPT', 'USER-AGENT'])
|
|
assert frozenset(i[0] for i in cid.items()) == keyset
|
|
assert frozenset(cid.keys()) == keyset
|
|
assert frozenset(cid) == keyset
|
|
|
|
def test_copy(self):
|
|
cid = CaseInsensitiveDict(
|
|
{'Accept': 'application/json', 'user-Agent': 'requests'}
|
|
)
|
|
cid_copy = cid.copy()
|
|
assert cid == cid_copy
|
|
cid['changed'] = True
|
|
assert cid != cid_copy
|
|
|
|
def test_url_surrounding_whitespace(self, httpbin):
|
|
"""Test case with URLs surrounded by whitespace characters."""
|
|
get_url = httpbin('get')
|
|
# All surrounding whitespaces are supposed to be ignored:
|
|
assert requests.get(get_url + ' ').status_code == 200
|
|
assert requests.get(' ' + get_url).status_code == 200
|
|
assert requests.get(get_url + ' \t ').status_code == 200
|
|
assert requests.get(' \t' + get_url).status_code == 200
|
|
assert requests.get(get_url + '\n').status_code == 200
|
|
# The whitespaces can't be in the middle of the URL though:
|
|
assert requests.get(get_url + ' abc').status_code == 404
|
|
|
|
|
|
class TestMorselToCookieExpires:
|
|
"""Tests for morsel_to_cookie when morsel contains expires."""
|
|
|
|
def test_expires_valid_str(self):
|
|
"""Test case where we convert expires from string time."""
|
|
morsel = Morsel()
|
|
morsel['expires'] = 'Thu, 01-Jan-1970 00:00:01 GMT'
|
|
cookie = morsel_to_cookie(morsel)
|
|
assert cookie.expires == 1
|
|
|
|
@pytest.mark.parametrize(
|
|
'value, exception', ((100, TypeError), ('woops', ValueError))
|
|
)
|
|
def test_expires_invalid_int(self, value, exception):
|
|
"""Test case where an invalid type is passed for expires."""
|
|
morsel = Morsel()
|
|
morsel['expires'] = value
|
|
with pytest.raises(exception):
|
|
morsel_to_cookie(morsel)
|
|
|
|
def test_expires_none(self):
|
|
"""Test case where expires is None."""
|
|
morsel = Morsel()
|
|
morsel['expires'] = None
|
|
cookie = morsel_to_cookie(morsel)
|
|
assert cookie.expires is None
|
|
|
|
|
|
class TestMorselToCookieMaxAge:
|
|
"""Tests for morsel_to_cookie when morsel contains max-age."""
|
|
|
|
def test_max_age_valid_int(self):
|
|
"""Test case where a valid max age in seconds is passed."""
|
|
morsel = Morsel()
|
|
morsel['max-age'] = 60
|
|
cookie = morsel_to_cookie(morsel)
|
|
assert isinstance(cookie.expires, int)
|
|
|
|
def test_max_age_invalid_str(self):
|
|
"""Test case where a invalid max age is passed."""
|
|
morsel = Morsel()
|
|
morsel['max-age'] = 'woops'
|
|
with pytest.raises(TypeError):
|
|
morsel_to_cookie(morsel)
|
|
|
|
|
|
class TestTimeout:
|
|
|
|
def test_stream_timeout(self, httpbin):
|
|
try:
|
|
requests.get(httpbin('delay/10'), timeout=2.0)
|
|
except requests.exceptions.Timeout as e:
|
|
assert 'Read timed out' in e.args[0].args[0]
|
|
|
|
@pytest.mark.parametrize(
|
|
'timeout, error_text',
|
|
(
|
|
((3, 4, 5), '(connect, read)'),
|
|
('foo', 'must be an int, float or None'),
|
|
),
|
|
)
|
|
def test_invalid_timeout(self, httpbin, timeout, error_text):
|
|
with pytest.raises(ValueError) as e:
|
|
requests.get(httpbin('get'), timeout=timeout)
|
|
assert error_text in str(e)
|
|
|
|
@pytest.mark.parametrize(
|
|
'timeout', (None, Urllib3Timeout(connect=None, read=None))
|
|
)
|
|
def test_none_timeout(self, httpbin, timeout):
|
|
"""Check that you can set None as a valid timeout value.
|
|
|
|
To actually test this behavior, we'd want to check that setting the
|
|
timeout to None actually lets the request block past the system default
|
|
timeout. However, this would make the test suite unbearably slow.
|
|
Instead we verify that setting the timeout to None does not prevent the
|
|
request from succeeding.
|
|
"""
|
|
r = requests.get(httpbin('get'), timeout=timeout)
|
|
assert r.status_code == 200
|
|
|
|
@pytest.mark.parametrize(
|
|
'timeout', ((None, 0.1), Urllib3Timeout(connect=None, read=0.1))
|
|
)
|
|
def test_read_timeout(self, httpbin, timeout):
|
|
try:
|
|
requests.get(httpbin('delay/10'), timeout=timeout)
|
|
pytest.fail('The recv() request should time out.')
|
|
except ReadTimeout:
|
|
pass
|
|
|
|
@pytest.mark.parametrize(
|
|
'timeout', ((0.1, None), Urllib3Timeout(connect=0.1, read=None))
|
|
)
|
|
def test_connect_timeout(self, timeout):
|
|
try:
|
|
requests.get(TARPIT, timeout=timeout)
|
|
pytest.fail('The connect() request should time out.')
|
|
except ConnectTimeout as e:
|
|
assert isinstance(e, ConnectionError)
|
|
assert isinstance(e, Timeout)
|
|
|
|
@pytest.mark.parametrize(
|
|
'timeout', ((0.1, 0.1), Urllib3Timeout(connect=0.1, read=0.1))
|
|
)
|
|
def test_total_timeout_connect(self, timeout):
|
|
try:
|
|
requests.get(TARPIT, timeout=timeout)
|
|
pytest.fail('The connect() request should time out.')
|
|
except ConnectTimeout:
|
|
pass
|
|
|
|
def test_encoded_methods(self, httpbin):
|
|
"""See: https://github.com/requests/requests/issues/2316"""
|
|
r = requests.request(b'GET', httpbin('get'))
|
|
assert r.ok
|
|
|
|
|
|
SendCall = collections.namedtuple('SendCall', ('args', 'kwargs'))
|
|
|
|
|
|
class RedirectSession(SessionRedirectMixin):
|
|
|
|
def __init__(self, order_of_redirects):
|
|
self.redirects = order_of_redirects
|
|
self.calls = []
|
|
self.max_redirects = 30
|
|
self.cookies = {}
|
|
self.trust_env = False
|
|
self.location = '/'
|
|
|
|
def send(self, *args, **kwargs):
|
|
self.calls.append(SendCall(args, kwargs))
|
|
return self.build_response()
|
|
|
|
def build_response(self):
|
|
request = self.calls[-1].args[0]
|
|
r = requests.Response()
|
|
try:
|
|
r.status_code = int(self.redirects.pop(0))
|
|
except IndexError:
|
|
r.status_code = 200
|
|
r.headers = CaseInsensitiveDict({'Location': self.location})
|
|
r.raw = self._build_raw()
|
|
r.request = request
|
|
return r
|
|
|
|
def _build_raw(self):
|
|
string = StringIO.StringIO('')
|
|
setattr(string, 'release_conn', lambda *args: args)
|
|
return string
|
|
|
|
|
|
def test_json_encodes_as_bytes():
|
|
# urllib3 expects bodies as bytes-like objects
|
|
body = {"key": "value"}
|
|
p = PreparedRequest()
|
|
p.prepare(method='GET', url='https://www.example.com/', json=body)
|
|
assert isinstance(p.body, bytes)
|
|
|
|
|
|
def test_requests_are_updated_each_time(httpbin):
|
|
session = RedirectSession([303, 307])
|
|
prep = requests.Request('POST', httpbin('post')).prepare()
|
|
r0 = session.send(prep)
|
|
assert r0.request.method == 'POST'
|
|
assert session.calls[-1] == SendCall((r0.request,), {})
|
|
redirect_generator = session.resolve_redirects(r0, prep)
|
|
default_keyword_args = {
|
|
'stream': False,
|
|
'verify': True,
|
|
'cert': None,
|
|
'timeout': None,
|
|
'allow_redirects': False,
|
|
'proxies': {},
|
|
}
|
|
for response in redirect_generator:
|
|
assert response.request.method == 'GET'
|
|
send_call = SendCall((response.request,), default_keyword_args)
|
|
assert session.calls[-1] == send_call
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"var,url,proxy",
|
|
[
|
|
('http_proxy', 'http://example.com', 'socks5://proxy.com:9876'),
|
|
('https_proxy', 'https://example.com', 'socks5://proxy.com:9876'),
|
|
('all_proxy', 'http://example.com', 'socks5://proxy.com:9876'),
|
|
('all_proxy', 'https://example.com', 'socks5://proxy.com:9876'),
|
|
],
|
|
)
|
|
def test_proxy_env_vars_override_default(var, url, proxy):
|
|
session = requests.Session()
|
|
prep = PreparedRequest()
|
|
prep.prepare(method='GET', url=url)
|
|
kwargs = {var: proxy}
|
|
scheme = urlparse(url).scheme
|
|
with override_environ(**kwargs):
|
|
proxies = session.rebuild_proxies(prep, {})
|
|
assert scheme in proxies
|
|
assert proxies[scheme] == proxy
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
'data',
|
|
(
|
|
(('a', 'b'), ('c', 'd')),
|
|
(('c', 'd'), ('a', 'b')),
|
|
(('a', 'b'), ('c', 'd'), ('e', 'f')),
|
|
),
|
|
)
|
|
def test_data_argument_accepts_tuples(data):
|
|
"""Ensure that the data argument will accept tuples of strings
|
|
and properly encode them.
|
|
"""
|
|
p = PreparedRequest()
|
|
p.prepare(
|
|
method='GET',
|
|
url='http://www.example.com',
|
|
data=data,
|
|
hooks=default_hooks(),
|
|
)
|
|
assert p.body == urlencode(data)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
'kwargs',
|
|
(
|
|
None,
|
|
{
|
|
'method': 'GET',
|
|
'url': 'http://www.example.com',
|
|
'data': 'foo=bar',
|
|
'hooks': default_hooks(),
|
|
},
|
|
{
|
|
'method': 'GET',
|
|
'url': 'http://www.example.com',
|
|
'data': 'foo=bar',
|
|
'hooks': default_hooks(),
|
|
'cookies': {'foo': 'bar'},
|
|
},
|
|
{'method': 'GET', 'url': u('http://www.example.com/üniçø∂é')},
|
|
),
|
|
)
|
|
def test_prepared_copy(kwargs):
|
|
p = PreparedRequest()
|
|
if kwargs:
|
|
p.prepare(**kwargs)
|
|
copy = p.copy()
|
|
for attr in ('method', 'url', 'headers', '_cookies', 'body', 'hooks'):
|
|
assert getattr(p, attr) == getattr(copy, attr)
|
|
|
|
|
|
def test_prepare_requires_a_request_method():
|
|
req = requests.Request()
|
|
with pytest.raises(ValueError):
|
|
req.prepare()
|
|
prepped = PreparedRequest()
|
|
with pytest.raises(ValueError):
|
|
prepped.prepare()
|
|
|
|
|
|
def test_urllib3_retries(httpbin):
|
|
from urllib3.util import Retry
|
|
|
|
s = requests.Session()
|
|
s.mount(
|
|
'http://',
|
|
HTTPAdapter(max_retries=Retry(total=2, status_forcelist=[500])),
|
|
)
|
|
with pytest.raises(RetryError):
|
|
s.get(httpbin('status/500'))
|
|
|
|
|
|
def test_urllib3_pool_connection_closed(httpbin):
|
|
s = requests.Session()
|
|
s.mount('http://', HTTPAdapter(pool_connections=0, pool_maxsize=0))
|
|
try:
|
|
s.get(httpbin('status/200'))
|
|
except ConnectionError as e:
|
|
assert u"Pool is closed." in str(e)
|
|
|
|
|
|
class TestPreparingURLs(object):
|
|
|
|
@pytest.mark.parametrize(
|
|
'url,expected',
|
|
(
|
|
('http://google.com', 'http://google.com/'),
|
|
(u'http://ジェーピーニック.jp', u'http://xn--hckqz9bzb1cyrb.jp/'),
|
|
(u'http://xn--n3h.net/', u'http://xn--n3h.net/'),
|
|
(
|
|
u'http://ジェーピーニック.jp'.encode('utf-8'),
|
|
u'http://xn--hckqz9bzb1cyrb.jp/',
|
|
),
|
|
(
|
|
u'http://straße.de/straße',
|
|
u'http://xn--strae-oqa.de/stra%C3%9Fe',
|
|
),
|
|
(
|
|
u'http://straße.de/straße'.encode('utf-8'),
|
|
u'http://xn--strae-oqa.de/stra%C3%9Fe',
|
|
),
|
|
(
|
|
u'http://Königsgäßchen.de/straße',
|
|
u'http://xn--knigsgchen-b4a3dun.de/stra%C3%9Fe',
|
|
),
|
|
(
|
|
u'http://Königsgäßchen.de/straße'.encode('utf-8'),
|
|
u'http://xn--knigsgchen-b4a3dun.de/stra%C3%9Fe',
|
|
),
|
|
(b'http://xn--n3h.net/', u'http://xn--n3h.net/'),
|
|
(
|
|
b'http://[1200:0000:ab00:1234:0000:2552:7777:1313]:12345/',
|
|
u'http://[1200:0000:ab00:1234:0000:2552:7777:1313]:12345/',
|
|
),
|
|
(
|
|
u'http://[1200:0000:ab00:1234:0000:2552:7777:1313]:12345/',
|
|
u'http://[1200:0000:ab00:1234:0000:2552:7777:1313]:12345/',
|
|
),
|
|
),
|
|
)
|
|
def test_preparing_url(self, url, expected):
|
|
r = requests.Request('GET', url=url)
|
|
p = r.prepare()
|
|
assert p.url == expected
|
|
|
|
@pytest.mark.parametrize(
|
|
'url',
|
|
(
|
|
b"http://*.google.com",
|
|
b"http://*",
|
|
u"http://*.google.com",
|
|
u"http://*",
|
|
u"http://☃.net/",
|
|
),
|
|
)
|
|
def test_preparing_bad_url(self, url):
|
|
r = requests.Request('GET', url=url)
|
|
with pytest.raises(requests.exceptions.InvalidURL):
|
|
r.prepare()
|
|
|
|
@pytest.mark.parametrize(
|
|
'input, expected',
|
|
(
|
|
(
|
|
b"http+unix://%2Fvar%2Frun%2Fsocket/path%7E",
|
|
u"http+unix://%2Fvar%2Frun%2Fsocket/path~",
|
|
),
|
|
(
|
|
u"http+unix://%2Fvar%2Frun%2Fsocket/path%7E",
|
|
u"http+unix://%2Fvar%2Frun%2Fsocket/path~",
|
|
),
|
|
(b"mailto:user@example.org", u"mailto:user@example.org"),
|
|
(u"mailto:user@example.org", u"mailto:user@example.org"),
|
|
(b"data:SSDimaUgUHl0aG9uIQ==", u"data:SSDimaUgUHl0aG9uIQ=="),
|
|
),
|
|
)
|
|
def test_url_mutation(self, input, expected):
|
|
"""
|
|
This test validates that we correctly exclude some URLs from
|
|
preparation, and that we handle others. Specifically, it tests that
|
|
any URL whose scheme doesn't begin with "http" is left alone, and
|
|
those whose scheme *does* begin with "http" are mutated.
|
|
"""
|
|
r = requests.Request('GET', url=input)
|
|
p = r.prepare()
|
|
assert p.url == expected
|
|
|
|
@pytest.mark.parametrize(
|
|
'input, params, expected',
|
|
(
|
|
(
|
|
b"http+unix://%2Fvar%2Frun%2Fsocket/path",
|
|
{"key": "value"},
|
|
u"http+unix://%2Fvar%2Frun%2Fsocket/path?key=value",
|
|
),
|
|
(
|
|
u"http+unix://%2Fvar%2Frun%2Fsocket/path",
|
|
{"key": "value"},
|
|
u"http+unix://%2Fvar%2Frun%2Fsocket/path?key=value",
|
|
),
|
|
(
|
|
b"mailto:user@example.org",
|
|
{"key": "value"},
|
|
u"mailto:user@example.org",
|
|
),
|
|
(
|
|
u"mailto:user@example.org",
|
|
{"key": "value"},
|
|
u"mailto:user@example.org",
|
|
),
|
|
),
|
|
)
|
|
def test_parameters_for_nonstandard_schemes(self, input, params, expected):
|
|
"""
|
|
Setting parameters for nonstandard schemes is allowed if those schemes
|
|
begin with "http", and is forbidden otherwise.
|
|
"""
|
|
r = requests.Request('GET', url=input, params=params)
|
|
p = r.prepare()
|
|
assert p.url == expected
|
|
|
|
|
|
class TestGetConnection(object):
|
|
"""
|
|
Tests for the :meth:`requests.adapters.HTTPAdapter.get_connection` that assert
|
|
the connections are correctly configured.
|
|
"""
|
|
|
|
@pytest.mark.parametrize(
|
|
'proxies, verify, cert, expected',
|
|
(
|
|
(
|
|
{},
|
|
True,
|
|
None,
|
|
{
|
|
'cert_reqs': 'CERT_REQUIRED',
|
|
'ca_certs': DEFAULT_CA_BUNDLE_PATH,
|
|
'ca_cert_dir': None,
|
|
'cert_file': None,
|
|
'key_file': None,
|
|
},
|
|
),
|
|
(
|
|
{},
|
|
False,
|
|
None,
|
|
{
|
|
'cert_reqs': 'CERT_NONE',
|
|
'ca_certs': None,
|
|
'ca_cert_dir': None,
|
|
'cert_file': None,
|
|
'key_file': None,
|
|
},
|
|
),
|
|
(
|
|
{},
|
|
__file__,
|
|
None,
|
|
{
|
|
'cert_reqs': 'CERT_REQUIRED',
|
|
'ca_certs': __file__,
|
|
'ca_cert_dir': None,
|
|
'cert_file': None,
|
|
'key_file': None,
|
|
},
|
|
),
|
|
(
|
|
{},
|
|
os.path.dirname(__file__),
|
|
None,
|
|
{
|
|
'cert_reqs': 'CERT_REQUIRED',
|
|
'ca_certs': None,
|
|
'ca_cert_dir': os.path.dirname(__file__),
|
|
'cert_file': None,
|
|
'key_file': None,
|
|
},
|
|
),
|
|
(
|
|
{},
|
|
True,
|
|
None,
|
|
{
|
|
'cert_reqs': 'CERT_REQUIRED',
|
|
'ca_certs': DEFAULT_CA_BUNDLE_PATH,
|
|
'ca_cert_dir': None,
|
|
'cert_file': None,
|
|
'key_file': None,
|
|
},
|
|
),
|
|
(
|
|
{},
|
|
True,
|
|
__file__,
|
|
{
|
|
'cert_reqs': 'CERT_REQUIRED',
|
|
'ca_certs': DEFAULT_CA_BUNDLE_PATH,
|
|
'ca_cert_dir': None,
|
|
'cert_file': __file__,
|
|
'key_file': None,
|
|
},
|
|
),
|
|
(
|
|
{},
|
|
True,
|
|
(__file__, __file__),
|
|
{
|
|
'cert_reqs': 'CERT_REQUIRED',
|
|
'ca_certs': DEFAULT_CA_BUNDLE_PATH,
|
|
'ca_cert_dir': None,
|
|
'cert_file': __file__,
|
|
'key_file': __file__,
|
|
},
|
|
),
|
|
(
|
|
{},
|
|
True,
|
|
(__file__, __file__),
|
|
{
|
|
'cert_reqs': 'CERT_REQUIRED',
|
|
'ca_certs': DEFAULT_CA_BUNDLE_PATH,
|
|
'ca_cert_dir': None,
|
|
'cert_file': __file__,
|
|
'key_file': __file__,
|
|
},
|
|
),
|
|
(
|
|
{
|
|
'http': 'http://proxy.example.com',
|
|
'https': 'http://proxy.example.com',
|
|
},
|
|
True,
|
|
None,
|
|
{
|
|
'cert_reqs': 'CERT_REQUIRED',
|
|
'ca_certs': DEFAULT_CA_BUNDLE_PATH,
|
|
'ca_cert_dir': None,
|
|
'cert_file': None,
|
|
'key_file': None,
|
|
},
|
|
),
|
|
(
|
|
{
|
|
'http': 'http://proxy.example.com',
|
|
'https': 'http://proxy.example.com',
|
|
},
|
|
os.path.dirname(__file__),
|
|
None,
|
|
{
|
|
'cert_reqs': 'CERT_REQUIRED',
|
|
'ca_certs': None,
|
|
'ca_cert_dir': os.path.dirname(__file__),
|
|
'cert_file': None,
|
|
'key_file': None,
|
|
},
|
|
),
|
|
(
|
|
{
|
|
'http': 'http://proxy.example.com',
|
|
'https': 'http://proxy.example.com',
|
|
},
|
|
__file__,
|
|
None,
|
|
{
|
|
'cert_reqs': 'CERT_REQUIRED',
|
|
'ca_certs': __file__,
|
|
'ca_cert_dir': None,
|
|
'cert_file': None,
|
|
'key_file': None,
|
|
},
|
|
),
|
|
(
|
|
{
|
|
'http': 'http://proxy.example.com',
|
|
'https': 'http://proxy.example.com',
|
|
},
|
|
True,
|
|
__file__,
|
|
{
|
|
'cert_reqs': 'CERT_REQUIRED',
|
|
'ca_certs': DEFAULT_CA_BUNDLE_PATH,
|
|
'ca_cert_dir': None,
|
|
'cert_file': __file__,
|
|
'key_file': None,
|
|
},
|
|
),
|
|
(
|
|
{
|
|
'http': 'http://proxy.example.com',
|
|
'https': 'http://proxy.example.com',
|
|
},
|
|
True,
|
|
(__file__, __file__),
|
|
{
|
|
'cert_reqs': 'CERT_REQUIRED',
|
|
'ca_certs': DEFAULT_CA_BUNDLE_PATH,
|
|
'ca_cert_dir': None,
|
|
'cert_file': __file__,
|
|
'key_file': __file__,
|
|
},
|
|
),
|
|
),
|
|
)
|
|
def test_get_https_connection(self, proxies, verify, cert, expected):
|
|
"""Assert connections are configured correctly."""
|
|
adapter = requests.adapters.HTTPAdapter()
|
|
connection = adapter.get_connection(
|
|
'https://example.com', proxies=proxies, verify=verify, cert=cert
|
|
)
|
|
actual_config = {}
|
|
for key, value in connection.__dict__.items():
|
|
if key in expected:
|
|
actual_config[key] = value
|
|
assert actual_config == expected
|
|
|
|
@pytest.mark.parametrize(
|
|
'verify, cert',
|
|
(
|
|
('a/path/that/does/not/exist', None),
|
|
(True, 'a/path/that/does/not/exist'),
|
|
(True, (__file__, 'a/path/that/does/not/exist')),
|
|
(True, ('a/path/that/does/not/exist', __file__)),
|
|
),
|
|
)
|
|
def test_cert_files_missing(self, verify, cert):
|
|
"""
|
|
Assert an IOError is raised when one of the certificate files or
|
|
directories can't be found.
|
|
"""
|
|
adapter = requests.adapters.HTTPAdapter()
|
|
with pytest.raises(IOError) as excinfo:
|
|
adapter.get_connection(
|
|
'https://example.com', verify=verify, cert=cert
|
|
)
|
|
excinfo.match('invalid path: a/path/that/does/not/exist')
|