mirror of
https://github.com/kennethreitz/requests.git
synced 2026-06-05 22:50:18 +00:00
+112
-145
@@ -79,13 +79,16 @@ try:
|
||||
except AttributeError:
|
||||
HAS_PYOPENSSL = False
|
||||
|
||||
@pytest.fixture
|
||||
def s(request, *args, **kwargs):
|
||||
return requests.Session()
|
||||
|
||||
|
||||
class TestRequests:
|
||||
|
||||
def test_entry_points(self):
|
||||
requests.session
|
||||
requests.session().get
|
||||
requests.session().head
|
||||
requests.Session().get
|
||||
requests.Session().head
|
||||
requests.get
|
||||
requests.head
|
||||
requests.put
|
||||
@@ -163,15 +166,14 @@ class TestRequests:
|
||||
request = requests.Request('GET', url, params={"a": "b"}).prepare()
|
||||
assert request.url == expected
|
||||
|
||||
def test_params_original_order_is_preserved_by_default(self):
|
||||
def test_params_original_order_is_preserved_by_default(self, s):
|
||||
param_ordered_dict = collections.OrderedDict(
|
||||
(('z', 1), ('a', 1), ('k', 1), ('d', 1))
|
||||
)
|
||||
session = requests.Session()
|
||||
request = requests.Request(
|
||||
'GET', 'http://example.com/', params=param_ordered_dict
|
||||
)
|
||||
prep = session.prepare_request(request)
|
||||
prep = s.prepare_request(request)
|
||||
assert prep.url == 'http://example.com/?z=1&a=1&k=1&d=1'
|
||||
|
||||
|
||||
@@ -196,8 +198,7 @@ class TestRequests:
|
||||
@pytest.mark.parametrize(
|
||||
'scheme', ('http://', 'HTTP://', 'hTTp://', 'HttP://')
|
||||
)
|
||||
def test_mixed_case_scheme_acceptable(self, httpbin, scheme):
|
||||
s = requests.Session()
|
||||
def test_mixed_case_scheme_acceptable(self, s, httpbin, scheme):
|
||||
s.proxies = getproxies()
|
||||
parts = urlparse(httpbin('get'))
|
||||
url = scheme + parts.netloc + parts.path
|
||||
@@ -205,9 +206,8 @@ class TestRequests:
|
||||
r = s.send(r.prepare())
|
||||
assert r.status_code == 200, 'failed for scheme {0}'.format(scheme)
|
||||
|
||||
def test_HTTP_200_OK_GET_ALTERNATIVE(self, httpbin):
|
||||
def test_HTTP_200_OK_GET_ALTERNATIVE(self, s, httpbin):
|
||||
r = requests.Request('GET', httpbin('get'))
|
||||
s = requests.Session()
|
||||
s.proxies = getproxies()
|
||||
r = s.send(r.prepare())
|
||||
assert r.status_code == 200
|
||||
@@ -254,8 +254,7 @@ class TestRequests:
|
||||
'Expected redirect to raise TooManyRedirects but it did not'
|
||||
)
|
||||
|
||||
def test_HTTP_302_TOO_MANY_REDIRECTS_WITH_PARAMS(self, httpbin):
|
||||
s = requests.session()
|
||||
def test_HTTP_302_TOO_MANY_REDIRECTS_WITH_PARAMS(self, s, httpbin):
|
||||
s.max_redirects = 5
|
||||
try:
|
||||
s.get(httpbin('relative-redirect', '50'))
|
||||
@@ -365,44 +364,44 @@ class TestRequests:
|
||||
assert r.history[0].is_redirect
|
||||
assert r.request.body is None
|
||||
|
||||
def test_multiple_location_headers(self, httpbin):
|
||||
def test_multiple_location_headers(self, s, httpbin):
|
||||
headers = [
|
||||
('Location', 'http://example.com'),
|
||||
('Location', 'https://example.com/1'),
|
||||
]
|
||||
params = '&'.join(['%s=%s' % (k, v) for k, v in headers])
|
||||
ses = requests.Session()
|
||||
|
||||
req = requests.Request('GET', httpbin('response-headers?%s' % params))
|
||||
prep = ses.prepare_request(req)
|
||||
resp = ses.send(prep)
|
||||
prep = s.prepare_request(req)
|
||||
resp = s.send(prep)
|
||||
# change response to redirect
|
||||
resp.status_code = 302
|
||||
with pytest.raises(InvalidHeader):
|
||||
# next triggers yield on generator
|
||||
next(ses.resolve_redirects(resp, prep))
|
||||
next(s.resolve_redirects(resp, prep))
|
||||
|
||||
def test_header_and_body_removal_on_redirect(self, httpbin):
|
||||
def test_header_and_body_removal_on_redirect(self, s, httpbin):
|
||||
purged_headers = ('Content-Length', 'Content-Type')
|
||||
ses = requests.Session()
|
||||
|
||||
req = requests.Request('POST', httpbin('post'), data={'test': 'data'})
|
||||
prep = ses.prepare_request(req)
|
||||
resp = ses.send(prep)
|
||||
prep = s.prepare_request(req)
|
||||
resp = s.send(prep)
|
||||
# Mimic a redirect response
|
||||
resp.status_code = 302
|
||||
resp.headers['location'] = 'get'
|
||||
# Run request through resolve_redirects
|
||||
next_resp = next(ses.resolve_redirects(resp, prep))
|
||||
next_resp = next(s.resolve_redirects(resp, prep))
|
||||
assert next_resp.request.body is None
|
||||
for header in purged_headers:
|
||||
assert header not in next_resp.request.headers
|
||||
|
||||
def test_transfer_enc_removal_on_redirect(self, httpbin):
|
||||
def test_transfer_enc_removal_on_redirect(self, s, httpbin):
|
||||
purged_headers = ('Transfer-Encoding', 'Content-Type')
|
||||
ses = requests.Session()
|
||||
|
||||
req = requests.Request(
|
||||
'POST', httpbin('post'), data=(b'x' for x in range(1))
|
||||
)
|
||||
prep = ses.prepare_request(req)
|
||||
prep = s.prepare_request(req)
|
||||
assert 'Transfer-Encoding' in prep.headers
|
||||
# Create Response to avoid https://github.com/kevin1024/pytest-httpbin/issues/33
|
||||
resp = requests.Response()
|
||||
@@ -413,7 +412,7 @@ class TestRequests:
|
||||
resp.status_code = 302
|
||||
resp.headers['location'] = httpbin('get')
|
||||
# Run request through resolve_redirect
|
||||
next_resp = next(ses.resolve_redirects(resp, prep))
|
||||
next_resp = next(s.resolve_redirects(resp, prep))
|
||||
assert next_resp.request.body is None
|
||||
for header in purged_headers:
|
||||
assert header not in next_resp.request.headers
|
||||
@@ -431,20 +430,17 @@ class TestRequests:
|
||||
)
|
||||
assert r.status_code == 200
|
||||
|
||||
def test_set_cookie_on_301(self, httpbin):
|
||||
s = requests.session()
|
||||
def test_set_cookie_on_301(self, s, httpbin):
|
||||
url = httpbin('cookies/set?foo=bar')
|
||||
s.get(url)
|
||||
assert s.cookies['foo'] == 'bar'
|
||||
|
||||
def test_cookie_sent_on_redirect(self, httpbin):
|
||||
s = requests.session()
|
||||
def test_cookie_sent_on_redirect(self, s, httpbin):
|
||||
s.get(httpbin('cookies/set?foo=bar'))
|
||||
r = s.get(httpbin('redirect/1')) # redirects to httpbin('get')
|
||||
assert 'Cookie' in r.json()['headers']
|
||||
|
||||
def test_cookie_removed_on_expire(self, httpbin):
|
||||
s = requests.session()
|
||||
def test_cookie_removed_on_expire(self, s, httpbin):
|
||||
s.get(httpbin('cookies/set?foo=bar'))
|
||||
assert s.cookies['foo'] == 'bar'
|
||||
s.get(
|
||||
@@ -455,35 +451,30 @@ class TestRequests:
|
||||
)
|
||||
assert 'foo' not in s.cookies
|
||||
|
||||
def test_cookie_quote_wrapped(self, httpbin):
|
||||
s = requests.session()
|
||||
def test_cookie_quote_wrapped(self, s, httpbin):
|
||||
s.get(httpbin('cookies/set?foo="bar:baz"'))
|
||||
assert s.cookies['foo'] == '"bar:baz"'
|
||||
|
||||
def test_cookie_persists_via_api(self, httpbin):
|
||||
s = requests.session()
|
||||
def test_cookie_persists_via_api(self, s, httpbin):
|
||||
r = s.get(httpbin('redirect/1'), cookies={'foo': 'bar'})
|
||||
assert 'foo' in r.request.headers['Cookie']
|
||||
assert 'foo' in r.history[0].request.headers['Cookie']
|
||||
|
||||
def test_request_cookie_overrides_session_cookie(self, httpbin):
|
||||
s = requests.session()
|
||||
def test_request_cookie_overrides_session_cookie(self, s, httpbin):
|
||||
s.cookies['foo'] = 'bar'
|
||||
r = s.get(httpbin('cookies'), cookies={'foo': 'baz'})
|
||||
assert r.json()['cookies']['foo'] == 'baz'
|
||||
# Session cookie should not be modified
|
||||
assert s.cookies['foo'] == 'bar'
|
||||
|
||||
def test_request_cookies_not_persisted(self, httpbin):
|
||||
s = requests.session()
|
||||
def test_request_cookies_not_persisted(self, s, httpbin):
|
||||
s.get(httpbin('cookies'), cookies={'foo': 'baz'})
|
||||
# Sending a request with cookies should not add cookies to the session
|
||||
assert not s.cookies
|
||||
|
||||
def test_generic_cookiejar_works(self, httpbin):
|
||||
def test_generic_cookiejar_works(self, s, httpbin):
|
||||
cj = cookielib.CookieJar()
|
||||
cookiejar_from_dict({'foo': 'bar'}, cj)
|
||||
s = requests.session()
|
||||
s.cookies = cj
|
||||
r = s.get(httpbin('cookies'))
|
||||
# Make sure the cookie was sent
|
||||
@@ -491,22 +482,20 @@ class TestRequests:
|
||||
# Make sure the session cj is still the custom one
|
||||
assert s.cookies is cj
|
||||
|
||||
def test_param_cookiejar_works(self, httpbin):
|
||||
def test_param_cookiejar_works(self, s, httpbin):
|
||||
cj = cookielib.CookieJar()
|
||||
cookiejar_from_dict({'foo': 'bar'}, cj)
|
||||
s = requests.session()
|
||||
r = s.get(httpbin('cookies'), cookies=cj)
|
||||
# Make sure the cookie was sent
|
||||
assert r.json()['cookies']['foo'] == 'bar'
|
||||
|
||||
def test_cookielib_cookiejar_on_redirect(self, httpbin):
|
||||
def test_cookielib_cookiejar_on_redirect(self, s, httpbin):
|
||||
"""Tests resolve_redirect doesn't fail when merging cookies
|
||||
with non-RequestsCookieJar cookiejar.
|
||||
|
||||
See GH #3579
|
||||
"""
|
||||
cj = cookiejar_from_dict({'foo': 'bar'}, cookielib.CookieJar())
|
||||
s = requests.Session()
|
||||
s.cookies = cookiejar_from_dict({'cookie': 'tasty'})
|
||||
# Prepare request without using Session
|
||||
req = requests.Request('GET', httpbin('headers'), cookies=cj)
|
||||
@@ -532,7 +521,7 @@ class TestRequests:
|
||||
@pytest.mark.parametrize(
|
||||
'jar', (requests.cookies.RequestsCookieJar(), cookielib.CookieJar())
|
||||
)
|
||||
def test_custom_cookie_policy_persistence(self, httpbin, jar):
|
||||
def test_custom_cookie_policy_persistence(self, s, httpbin, jar):
|
||||
"""Verify a custom CookiePolicy is propagated on each session request."""
|
||||
|
||||
class TestCookiePolicy(cookielib.DefaultCookiePolicy):
|
||||
@@ -544,7 +533,6 @@ class TestRequests:
|
||||
)
|
||||
|
||||
# Establish session with jar and set some cookies.
|
||||
s = requests.Session()
|
||||
s.cookies = jar
|
||||
s.get(httpbin('cookies/set?k1=v1&k2=v2'))
|
||||
assert len(s.cookies) == 2
|
||||
@@ -571,26 +559,24 @@ class TestRequests:
|
||||
assert isinstance(resp.history, list)
|
||||
assert not isinstance(resp.history, tuple)
|
||||
|
||||
def test_headers_on_session_with_None_are_not_sent(self, httpbin):
|
||||
def test_headers_on_session_with_None_are_not_sent(self, httpbin, s):
|
||||
"""Do not send headers in Session.headers with None values."""
|
||||
ses = requests.Session()
|
||||
ses.headers['Accept-Encoding'] = None
|
||||
s.headers['Accept-Encoding'] = None
|
||||
req = requests.Request('GET', httpbin('get'))
|
||||
prep = ses.prepare_request(req)
|
||||
prep = s.prepare_request(req)
|
||||
assert 'Accept-Encoding' not in prep.headers
|
||||
|
||||
def test_headers_preserve_order(self, httpbin):
|
||||
def test_headers_preserve_order(self, s, httpbin):
|
||||
"""Preserve order when headers provided as OrderedDict."""
|
||||
ses = requests.Session()
|
||||
ses.headers = collections.OrderedDict()
|
||||
ses.headers['Accept-Encoding'] = 'identity'
|
||||
ses.headers['First'] = '1'
|
||||
ses.headers['Second'] = '2'
|
||||
s.headers = collections.OrderedDict()
|
||||
s.headers['Accept-Encoding'] = 'identity'
|
||||
s.headers['First'] = '1'
|
||||
s.headers['Second'] = '2'
|
||||
headers = collections.OrderedDict([('Third', '3'), ('Fourth', '4')])
|
||||
headers['Fifth'] = '5'
|
||||
headers['Second'] = '222'
|
||||
req = requests.Request('GET', httpbin('get'), headers=headers)
|
||||
prep = ses.prepare_request(req)
|
||||
prep = s.prepare_request(req)
|
||||
items = list(prep.headers.items())
|
||||
assert items[0] == ('Accept-Encoding', 'identity')
|
||||
assert items[1] == ('First', '1')
|
||||
@@ -613,14 +599,13 @@ class TestRequests:
|
||||
r = requests.put(httpbin('put'))
|
||||
assert r.status_code == 200
|
||||
|
||||
def test_BASICAUTH_TUPLE_HTTP_200_OK_GET(self, httpbin):
|
||||
def test_BASICAUTH_TUPLE_HTTP_200_OK_GET(self, httpbin, s):
|
||||
auth = ('user', 'pass')
|
||||
url = httpbin('basic-auth', 'user', 'pass')
|
||||
r = requests.get(url, auth=auth)
|
||||
assert r.status_code == 200
|
||||
r = requests.get(url)
|
||||
assert r.status_code == 401
|
||||
s = requests.session()
|
||||
s.auth = auth
|
||||
r = s.get(url)
|
||||
assert r.status_code == 200
|
||||
@@ -678,7 +663,7 @@ class TestRequests:
|
||||
proxies={'http': 'non-resolvable-address'},
|
||||
)
|
||||
|
||||
def test_basicauth_with_netrc(self, httpbin):
|
||||
def test_basicauth_with_netrc(self, httpbin, s):
|
||||
auth = ('user', 'pass')
|
||||
wrong_auth = ('wronguser', 'wrongpass')
|
||||
url = httpbin('basic-auth', 'user', 'pass')
|
||||
@@ -695,7 +680,7 @@ class TestRequests:
|
||||
# Given auth should override and fail.
|
||||
r = requests.get(url, auth=wrong_auth)
|
||||
assert r.status_code == 401
|
||||
s = requests.session()
|
||||
|
||||
# Should use netrc and work.
|
||||
r = s.get(url)
|
||||
assert r.status_code == 200
|
||||
@@ -706,14 +691,14 @@ class TestRequests:
|
||||
finally:
|
||||
requests.sessions.get_netrc_auth = old_auth
|
||||
|
||||
def test_DIGEST_HTTP_200_OK_GET(self, httpbin):
|
||||
def test_DIGEST_HTTP_200_OK_GET(self, httpbin, s):
|
||||
auth = HTTPDigestAuth('user', 'pass')
|
||||
url = httpbin('digest-auth', 'auth', 'user', 'pass')
|
||||
r = requests.get(url, auth=auth)
|
||||
assert r.status_code == 200
|
||||
r = requests.get(url)
|
||||
assert r.status_code == 401
|
||||
s = requests.session()
|
||||
|
||||
s.auth = HTTPDigestAuth('user', 'pass')
|
||||
r = s.get(url)
|
||||
assert r.status_code == 200
|
||||
@@ -726,10 +711,10 @@ class TestRequests:
|
||||
r = requests.get(url, auth=auth)
|
||||
assert r.status_code == 200
|
||||
|
||||
def test_DIGEST_AUTH_SETS_SESSION_COOKIES(self, httpbin):
|
||||
def test_DIGEST_AUTH_SETS_SESSION_COOKIES(self, httpbin, s):
|
||||
url = httpbin('digest-auth', 'auth', 'user', 'pass')
|
||||
auth = HTTPDigestAuth('user', 'pass')
|
||||
s = requests.Session()
|
||||
|
||||
s.get(url, auth=auth)
|
||||
assert s.cookies['fake'] == 'fake_value'
|
||||
|
||||
@@ -741,14 +726,13 @@ class TestRequests:
|
||||
r = requests.get(url, auth=auth, stream=False)
|
||||
assert r.raw.read() == b''
|
||||
|
||||
def test_DIGESTAUTH_WRONG_HTTP_401_GET(self, httpbin):
|
||||
def test_DIGESTAUTH_WRONG_HTTP_401_GET(self, httpbin, s):
|
||||
auth = HTTPDigestAuth('user', 'wrongpass')
|
||||
url = httpbin('digest-auth', 'auth', 'user', 'pass')
|
||||
r = requests.get(url, auth=auth)
|
||||
assert r.status_code == 401
|
||||
r = requests.get(url)
|
||||
assert r.status_code == 401
|
||||
s = requests.session()
|
||||
s.auth = auth
|
||||
r = s.get(url)
|
||||
assert r.status_code == 401
|
||||
@@ -1025,9 +1009,9 @@ class TestRequests:
|
||||
)
|
||||
assert r.status_code == 200
|
||||
|
||||
def test_unicode_method_name_with_request_object(self, httpbin):
|
||||
def test_unicode_method_name_with_request_object(self, httpbin, s):
|
||||
files = {'file': open(__file__, 'rb')}
|
||||
s = requests.Session()
|
||||
|
||||
req = requests.Request(u('POST'), httpbin('post'), files=files)
|
||||
prep = s.prepare_request(req)
|
||||
assert isinstance(prep.method, builtin_str)
|
||||
@@ -1035,8 +1019,7 @@ class TestRequests:
|
||||
resp = s.send(prep)
|
||||
assert resp.status_code == 200
|
||||
|
||||
def test_non_prepared_request_error(self):
|
||||
s = requests.Session()
|
||||
def test_non_prepared_request_error(self, s):
|
||||
req = requests.Request(u('POST'), '/')
|
||||
with pytest.raises(ValueError) as e:
|
||||
s.send(req)
|
||||
@@ -1058,37 +1041,36 @@ class TestRequests:
|
||||
assert r.status_code == 200
|
||||
assert b"text/py-content-type" in r.request.body
|
||||
|
||||
def test_hook_receives_request_arguments(self, httpbin):
|
||||
def test_hook_receives_request_arguments(self, httpbin, s):
|
||||
|
||||
def hook(resp, **kwargs):
|
||||
assert resp is not None
|
||||
assert kwargs != {}
|
||||
|
||||
s = requests.Session()
|
||||
r = requests.Request('GET', httpbin(), hooks={'response': hook})
|
||||
prep = s.prepare_request(r)
|
||||
s.send(prep)
|
||||
|
||||
def test_session_hooks_are_used_with_no_request_hooks(self, httpbin):
|
||||
def test_session_hooks_are_used_with_no_request_hooks(self, httpbin, s):
|
||||
hook = lambda x, *args, **kwargs: x
|
||||
s = requests.Session()
|
||||
|
||||
s.hooks['response'].append(hook)
|
||||
r = requests.Request('GET', httpbin())
|
||||
prep = s.prepare_request(r)
|
||||
assert prep.hooks['response'] != []
|
||||
assert prep.hooks['response'] == [hook]
|
||||
|
||||
def test_session_hooks_are_overridden_by_request_hooks(self, httpbin):
|
||||
def test_session_hooks_are_overridden_by_request_hooks(self, httpbin, s):
|
||||
hook1 = lambda x, *args, **kwargs: x
|
||||
hook2 = lambda x, *args, **kwargs: x
|
||||
assert hook1 is not hook2
|
||||
s = requests.Session()
|
||||
|
||||
s.hooks['response'].append(hook2)
|
||||
r = requests.Request('GET', httpbin(), hooks={'response': [hook1]})
|
||||
prep = s.prepare_request(r)
|
||||
assert prep.hooks['response'] == [hook1]
|
||||
|
||||
def test_prepared_request_hook(self, httpbin):
|
||||
def test_prepared_request_hook(self, httpbin, s):
|
||||
|
||||
def hook(resp, **kwargs):
|
||||
resp.headers['hook-working'] = 'True'
|
||||
@@ -1096,12 +1078,11 @@ class TestRequests:
|
||||
|
||||
req = requests.Request('GET', httpbin(), hooks={'response': hook})
|
||||
prep = req.prepare()
|
||||
s = requests.Session()
|
||||
s.proxies = getproxies()
|
||||
resp = s.send(prep)
|
||||
assert resp.headers['hook-working']
|
||||
|
||||
def test_prepared_from_session(self, httpbin):
|
||||
def test_prepared_from_session(self, httpbin, s):
|
||||
|
||||
class DummyAuth(requests.auth.AuthBase):
|
||||
|
||||
@@ -1111,7 +1092,7 @@ class TestRequests:
|
||||
|
||||
req = requests.Request('GET', httpbin('headers'))
|
||||
assert not req.auth
|
||||
s = requests.Session()
|
||||
|
||||
s.auth = DummyAuth()
|
||||
prep = s.prepare_request(req)
|
||||
resp = s.send(prep)
|
||||
@@ -1479,7 +1460,7 @@ class TestRequests:
|
||||
line.encode('utf-8') for line in expected_delimiter
|
||||
]
|
||||
|
||||
def test_prepared_request_is_pickleable(self, httpbin):
|
||||
def test_prepared_request_is_pickleable(self, httpbin, s):
|
||||
p = requests.Request('GET', httpbin('get')).prepare()
|
||||
# Verify PreparedRequest can be pickled and unpickled
|
||||
r = pickle.loads(pickle.dumps(p))
|
||||
@@ -1487,11 +1468,11 @@ class TestRequests:
|
||||
assert r.headers == p.headers
|
||||
assert r.body == p.body
|
||||
# Verify unpickled PreparedRequest sends properly
|
||||
s = requests.Session()
|
||||
|
||||
resp = s.send(r)
|
||||
assert resp.status_code == 200
|
||||
|
||||
def test_prepared_request_with_file_is_pickleable(self, httpbin):
|
||||
def test_prepared_request_with_file_is_pickleable(self, httpbin, s):
|
||||
files = {'file': open(__file__, 'rb')}
|
||||
r = requests.Request('POST', httpbin('post'), files=files)
|
||||
p = r.prepare()
|
||||
@@ -1501,11 +1482,11 @@ class TestRequests:
|
||||
assert r.headers == p.headers
|
||||
assert r.body == p.body
|
||||
# Verify unpickled PreparedRequest sends properly
|
||||
s = requests.Session()
|
||||
|
||||
resp = s.send(r)
|
||||
assert resp.status_code == 200
|
||||
|
||||
def test_prepared_request_with_hook_is_pickleable(self, httpbin):
|
||||
def test_prepared_request_with_hook_is_pickleable(self, httpbin, s):
|
||||
r = requests.Request('GET', httpbin('get'), hooks=default_hooks())
|
||||
p = r.prepare()
|
||||
# Verify PreparedRequest can be pickled
|
||||
@@ -1515,7 +1496,7 @@ class TestRequests:
|
||||
assert r.body == p.body
|
||||
assert r.hooks == p.hooks
|
||||
# Verify unpickled PreparedRequest sends properly
|
||||
s = requests.Session()
|
||||
|
||||
resp = s.send(r)
|
||||
assert resp.status_code == 200
|
||||
|
||||
@@ -1534,17 +1515,17 @@ class TestRequests:
|
||||
assert str(error) == 'message'
|
||||
assert error.response == response
|
||||
|
||||
def test_session_pickling(self, httpbin):
|
||||
def test_session_pickling(self, httpbin, s):
|
||||
r = requests.Request('GET', httpbin('get'))
|
||||
s = requests.Session()
|
||||
|
||||
s = pickle.loads(pickle.dumps(s))
|
||||
s.proxies = getproxies()
|
||||
r = s.send(r.prepare())
|
||||
assert r.status_code == 200
|
||||
|
||||
def test_fixes_1329(self, httpbin):
|
||||
def test_fixes_1329(self, httpbin, s):
|
||||
"""Ensure that header updates are done case-insensitively."""
|
||||
s = requests.Session()
|
||||
|
||||
s.headers.update({'ACCEPT': 'BOGUS'})
|
||||
s.headers.update({'accept': 'application/json'})
|
||||
r = s.get(httpbin('get'))
|
||||
@@ -1560,8 +1541,8 @@ class TestRequests:
|
||||
assert r.status_code == 200
|
||||
assert r.url.lower() == url.lower()
|
||||
|
||||
def test_transport_adapter_ordering(self):
|
||||
s = requests.Session()
|
||||
def test_transport_adapter_ordering(self, s):
|
||||
|
||||
order = ['https://', 'http://']
|
||||
assert order == list(s.adapters)
|
||||
s.mount('http://git', HTTPAdapter())
|
||||
@@ -1598,15 +1579,13 @@ class TestRequests:
|
||||
assert 'http://' in s2.adapters
|
||||
assert 'https://' in s2.adapters
|
||||
|
||||
def test_header_remove_is_case_insensitive(self, httpbin):
|
||||
def test_header_remove_is_case_insensitive(self, httpbin, s):
|
||||
# From issue #1321
|
||||
s = requests.Session()
|
||||
s.headers['foo'] = 'bar'
|
||||
r = s.get(httpbin('get'), headers={'FOO': None})
|
||||
assert 'foo' not in r.request.headers
|
||||
|
||||
def test_params_are_merged_case_sensitive(self, httpbin):
|
||||
s = requests.Session()
|
||||
def test_params_are_merged_case_sensitive(self, httpbin, s):
|
||||
s.params['foo'] = 'bar'
|
||||
r = s.get(httpbin('get'), params={'FOO': 'bar'})
|
||||
assert r.json()['args'] == {'foo': 'bar', 'FOO': 'bar'}
|
||||
@@ -1736,8 +1715,8 @@ class TestRequests:
|
||||
h2 = r.request.headers['Authorization']
|
||||
assert h1 == h2
|
||||
|
||||
def test_manual_redirect_with_partial_body_read(self, httpbin):
|
||||
s = requests.Session()
|
||||
def test_manual_redirect_with_partial_body_read(self, httpbin, s):
|
||||
|
||||
req = requests.Request('GET', httpbin('redirect/2')).prepare()
|
||||
r1 = s.send(req, allow_redirects=False, stream=True)
|
||||
assert r1.is_redirect
|
||||
@@ -1754,18 +1733,16 @@ class TestRequests:
|
||||
r3 = next(rg)
|
||||
assert not r3.is_redirect
|
||||
|
||||
def test_prepare_body_position_non_stream(self):
|
||||
def test_prepare_body_position_non_stream(self, s):
|
||||
data = b'the data'
|
||||
s = requests.Session()
|
||||
prep = requests.Request(
|
||||
'GET', 'http://example.com', data=data
|
||||
).prepare(
|
||||
)
|
||||
assert prep._body_position is None
|
||||
|
||||
def test_rewind_body(self):
|
||||
def test_rewind_body(self, s):
|
||||
data = io.BytesIO(b'the data')
|
||||
s = requests.Session()
|
||||
prep = requests.Request(
|
||||
'GET', 'http://example.com', data=data
|
||||
).prepare(
|
||||
@@ -1778,9 +1755,8 @@ class TestRequests:
|
||||
requests.utils.rewind_body(prep)
|
||||
assert prep.body.read() == b'the data'
|
||||
|
||||
def test_rewind_partially_read_body(self):
|
||||
def test_rewind_partially_read_body(self, s):
|
||||
data = io.BytesIO(b'the data')
|
||||
s = requests.Session()
|
||||
data.read(4) # read some data
|
||||
prep = requests.Request(
|
||||
'GET', 'http://example.com', data=data
|
||||
@@ -1794,7 +1770,7 @@ class TestRequests:
|
||||
requests.utils.rewind_body(prep)
|
||||
assert prep.body.read() == b'data'
|
||||
|
||||
def test_rewind_body_no_seek(self):
|
||||
def test_rewind_body_no_seek(self, s):
|
||||
|
||||
class BadFileObj:
|
||||
|
||||
@@ -1808,7 +1784,7 @@ class TestRequests:
|
||||
return
|
||||
|
||||
data = BadFileObj('the data')
|
||||
s = requests.Session()
|
||||
|
||||
prep = requests.Request(
|
||||
'GET', 'http://example.com', data=data
|
||||
).prepare(
|
||||
@@ -1818,7 +1794,7 @@ class TestRequests:
|
||||
requests.utils.rewind_body(prep)
|
||||
assert 'Unable to rewind request body' in str(e)
|
||||
|
||||
def test_rewind_body_failed_seek(self):
|
||||
def test_rewind_body_failed_seek(self, s):
|
||||
|
||||
class BadFileObj:
|
||||
|
||||
@@ -1835,7 +1811,6 @@ class TestRequests:
|
||||
return
|
||||
|
||||
data = BadFileObj('the data')
|
||||
s = requests.Session()
|
||||
prep = requests.Request(
|
||||
'GET', 'http://example.com', data=data
|
||||
).prepare(
|
||||
@@ -1845,7 +1820,7 @@ class TestRequests:
|
||||
requests.utils.rewind_body(prep)
|
||||
assert 'error occurred when rewinding request body' in str(e)
|
||||
|
||||
def test_rewind_body_failed_tell(self):
|
||||
def test_rewind_body_failed_tell(self, s):
|
||||
|
||||
class BadFileObj:
|
||||
|
||||
@@ -1859,7 +1834,6 @@ class TestRequests:
|
||||
return
|
||||
|
||||
data = BadFileObj('the data')
|
||||
s = requests.Session()
|
||||
prep = requests.Request(
|
||||
'GET', 'http://example.com', data=data
|
||||
).prepare(
|
||||
@@ -1883,8 +1857,7 @@ class TestRequests:
|
||||
|
||||
adapter.build_response = build_response
|
||||
|
||||
def test_redirect_with_wrong_gzipped_header(self, httpbin):
|
||||
s = requests.Session()
|
||||
def test_redirect_with_wrong_gzipped_header(self, httpbin, s):
|
||||
url = httpbin('redirect/1')
|
||||
self._patch_adapter_gzipped_redirect(s, url)
|
||||
s.get(url)
|
||||
@@ -1945,8 +1918,7 @@ class TestRequests:
|
||||
assert isinstance(response, requests.Response)
|
||||
assert response.raw.closed
|
||||
|
||||
def test_unconsumed_session_response_closes_connection(self, httpbin):
|
||||
s = requests.session()
|
||||
def test_unconsumed_session_response_closes_connection(self, httpbin, s):
|
||||
with contextlib.closing(
|
||||
s.get(httpbin('stream/4'), stream=True)
|
||||
) as response:
|
||||
@@ -1962,10 +1934,9 @@ class TestRequests:
|
||||
next(r.iter_lines())
|
||||
assert len(list(r.iter_lines())) == 3
|
||||
|
||||
def test_environment_comes_after_session(self, httpbin):
|
||||
def test_environment_comes_after_session(self, httpbin, s):
|
||||
"""The Session arguments should come before environment arguments."""
|
||||
# We get proxies from the environment and verify from the argument.
|
||||
s = requests.Session()
|
||||
a = SendRecordingAdapter()
|
||||
s.mount('http://', a)
|
||||
# Both of these arguments are safe fallbacks that we can easily
|
||||
@@ -1994,25 +1965,24 @@ class TestRequests:
|
||||
proxies['http']
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def test_merge_environment_settings_verify(self, monkeypatch):
|
||||
def test_merge_environment_settings_verify(self, monkeypatch, s):
|
||||
"""Assert CA environment settings are merged as expected when missing"""
|
||||
session = requests.Session()
|
||||
monkeypatch.delenv('CURL_CA_BUNDLE', raising=False)
|
||||
monkeypatch.delenv('REQUESTS_CA_BUNDLE', raising=False)
|
||||
assert session.trust_env is True
|
||||
assert session.verify is True
|
||||
assert s.trust_env is True
|
||||
assert s.verify is True
|
||||
assert 'REQUESTS_CA_BUNDLE' not in os.environ
|
||||
assert 'CURL_CA_BUNDLE' not in os.environ
|
||||
merged_settings = session.merge_environment_settings(
|
||||
merged_settings = s.merge_environment_settings(
|
||||
'http://example.com', {}, False, True, None
|
||||
)
|
||||
assert merged_settings['verify'] is True
|
||||
|
||||
def test_session_close_proxy_clear(self, mocker):
|
||||
def test_session_close_proxy_clear(self, mocker, s):
|
||||
proxies = {'one': mocker.Mock(), 'two': mocker.Mock()}
|
||||
session = requests.Session()
|
||||
mocker.patch.dict(session.adapters['http://'].proxy_manager, proxies)
|
||||
session.close()
|
||||
|
||||
mocker.patch.dict(s.adapters['http://'].proxy_manager, proxies)
|
||||
s.close()
|
||||
proxies['one'].clear.assert_called_once_with()
|
||||
proxies['two'].clear.assert_called_once_with()
|
||||
|
||||
@@ -2046,17 +2016,16 @@ class TestRequests:
|
||||
resp.close()
|
||||
assert resp.raw.closed
|
||||
|
||||
def test_updating_ca_cert(self, httpbin_secure):
|
||||
def test_updating_ca_cert(self, httpbin_secure, s):
|
||||
"""Assert that requests use the latest configured CA certificates."""
|
||||
session = requests.session()
|
||||
session.verify = pytest_httpbin.certs.where()
|
||||
session.get(httpbin_secure('/'))
|
||||
session.verify = True
|
||||
s.verify = pytest_httpbin.certs.where()
|
||||
s.get(httpbin_secure('/'))
|
||||
s.verify = True
|
||||
with pytest.raises(requests.exceptions.SSLError) as e:
|
||||
session.get(httpbin_secure('/'))
|
||||
s.get(httpbin_secure('/'))
|
||||
assert 'certificate verify failed' in str(e)
|
||||
|
||||
def test_updating_client_cert(self, httpbin_secure):
|
||||
def test_updating_client_cert(self, httpbin_secure, s):
|
||||
"""Assert that requests use the latest configured client certificates."""
|
||||
ca_file = pytest_httpbin.certs.where()
|
||||
cert_dir = os.path.dirname(ca_file)
|
||||
@@ -2065,10 +2034,10 @@ class TestRequests:
|
||||
# be the server's certificate and key.
|
||||
cert = os.path.join(cert_dir, 'cert.pem')
|
||||
key = os.path.join(cert_dir, 'key.pem')
|
||||
session = requests.session()
|
||||
session.verify = ca_file
|
||||
resp = session.get(httpbin_secure('/'))
|
||||
resp_with_cert = session.get(httpbin_secure('/'), cert=(cert, key))
|
||||
|
||||
s.verify = ca_file
|
||||
resp = s.get(httpbin_secure('/'))
|
||||
resp_with_cert = s.get(httpbin_secure('/'), cert=(cert, key))
|
||||
assert resp_with_cert.raw._pool.cert_file == cert
|
||||
assert resp_with_cert.raw._pool.key_file == key
|
||||
assert resp.raw._pool is not resp_with_cert.raw._pool
|
||||
@@ -2591,8 +2560,8 @@ def test_requests_are_updated_each_time(httpbin):
|
||||
('all_proxy', 'https://example.com', 'socks5://proxy.com:9876'),
|
||||
],
|
||||
)
|
||||
def test_proxy_env_vars_override_default(var, url, proxy):
|
||||
session = requests.Session()
|
||||
def test_proxy_env_vars_override_default(var, url, proxy, s):
|
||||
|
||||
prep = PreparedRequest()
|
||||
prep.prepare(method='GET', url=url)
|
||||
kwargs = {var: proxy}
|
||||
@@ -2663,10 +2632,9 @@ def test_prepare_requires_a_request_method():
|
||||
prepped.prepare()
|
||||
|
||||
|
||||
def test_urllib3_retries(httpbin):
|
||||
def test_urllib3_retries(httpbin, s):
|
||||
from urllib3.util import Retry
|
||||
|
||||
s = requests.Session()
|
||||
s.mount(
|
||||
'http://',
|
||||
HTTPAdapter(max_retries=Retry(total=2, status_forcelist=[500])),
|
||||
@@ -2675,8 +2643,7 @@ def test_urllib3_retries(httpbin):
|
||||
s.get(httpbin('status/500'))
|
||||
|
||||
|
||||
def test_urllib3_pool_connection_closed(httpbin):
|
||||
s = requests.Session()
|
||||
def test_urllib3_pool_connection_closed(httpbin, s):
|
||||
s.mount('http://', HTTPAdapter(pool_connections=0, pool_maxsize=0))
|
||||
try:
|
||||
s.get(httpbin('status/200'))
|
||||
|
||||
Reference in New Issue
Block a user