Merge pull request #2859 from kennethreitz/pytest-httpbin

Move test suite offline using pytest-httpbin
This commit is contained in:
Ian Cordasco
2015-11-05 09:37:22 -06:00
2 changed files with 141 additions and 137 deletions
+2
View File
@@ -1,4 +1,6 @@
py==1.4.30
pytest==2.8.1
pytest-cov==2.1.0
pytest-httpbin==0.0.7
httpbin==0.4.0
wheel
+139 -137
View File
@@ -48,20 +48,33 @@ else:
return s.decode('unicode-escape')
@pytest.fixture
def httpbin(httpbin):
# Issue #1483: Make sure the URL always has a trailing slash
httpbin_url = httpbin.url.rstrip('/') + '/'
def inner(*suffix):
return urljoin(httpbin_url, '/'.join(suffix))
return inner
@pytest.fixture
def httpsbin_url(httpbin_secure):
# Issue #1483: Make sure the URL always has a trailing slash
httpbin_url = httpbin_secure.url.rstrip('/') + '/'
def inner(*suffix):
return urljoin(httpbin_url, '/'.join(suffix))
return inner
# Requests to this URL should always fail with a connection timeout (nothing
# listening on that port)
TARPIT = "http://10.255.255.1"
HTTPBIN = os.environ.get('HTTPBIN_URL', 'http://httpbin.org/')
# Issue #1483: Make sure the URL always has a trailing slash
HTTPBIN = HTTPBIN.rstrip('/') + '/'
def httpbin(*suffix):
"""Returns url for HTTPBIN resource."""
return urljoin(HTTPBIN, '/'.join(suffix))
class RequestsTestCase(unittest.TestCase):
class TestRequests(object):
_multiprocess_can_split_ = True
@@ -105,13 +118,13 @@ class RequestsTestCase(unittest.TestCase):
assert pr.url == req.url
assert pr.body == 'life=42'
def test_no_content_length(self):
def test_no_content_length(self, httpbin):
get_req = requests.Request('GET', httpbin('get')).prepare()
assert 'Content-Length' not in get_req.headers
head_req = requests.Request('HEAD', httpbin('head')).prepare()
assert 'Content-Length' not in head_req.headers
def test_override_content_length(self):
def test_override_content_length(self, httpbin):
headers = {
'Content-Length': 'not zero'
}
@@ -144,19 +157,18 @@ class RequestsTestCase(unittest.TestCase):
params=b'test=foo').prepare()
assert request.url == 'http://example.com/?test=foo'
def test_mixed_case_scheme_acceptable(self):
def test_mixed_case_scheme_acceptable(self, httpbin):
s = requests.Session()
s.proxies = getproxies()
parts = urlparse(httpbin('get'))
schemes = ['http://', 'HTTP://', 'hTTp://', 'HttP://',
'https://', 'HTTPS://', 'hTTps://', 'HttPs://']
schemes = ['http://', 'HTTP://', 'hTTp://', 'HttP://']
for scheme in schemes:
url = scheme + parts.netloc + parts.path
r = requests.Request('GET', url)
r = s.send(r.prepare())
assert r.status_code == 200, 'failed for scheme {0}'.format(scheme)
def test_HTTP_200_OK_GET_ALTERNATIVE(self):
def test_HTTP_200_OK_GET_ALTERNATIVE(self, httpbin):
r = requests.Request('GET', httpbin('get'))
s = requests.Session()
s.proxies = getproxies()
@@ -165,7 +177,7 @@ class RequestsTestCase(unittest.TestCase):
assert r.status_code == 200
def test_HTTP_302_ALLOW_REDIRECT_GET(self):
def test_HTTP_302_ALLOW_REDIRECT_GET(self, httpbin):
r = requests.get(httpbin('redirect', '1'))
assert r.status_code == 200
assert r.history[0].status_code == 302
@@ -175,7 +187,7 @@ class RequestsTestCase(unittest.TestCase):
# r = requests.post(httpbin('status', '302'), data={'some': 'data'})
# self.assertEqual(r.status_code, 200)
def test_HTTP_200_OK_GET_WITH_PARAMS(self):
def test_HTTP_200_OK_GET_WITH_PARAMS(self, httpbin):
heads = {'User-agent': 'Mozilla/5.0'}
r = requests.get(httpbin('user-agent'), headers=heads)
@@ -183,25 +195,25 @@ class RequestsTestCase(unittest.TestCase):
assert heads['User-agent'] in r.text
assert r.status_code == 200
def test_HTTP_200_OK_GET_WITH_MIXED_PARAMS(self):
def test_HTTP_200_OK_GET_WITH_MIXED_PARAMS(self, httpbin):
heads = {'User-agent': 'Mozilla/5.0'}
r = requests.get(httpbin('get') + '?test=true', params={'q': 'test'}, headers=heads)
assert r.status_code == 200
def test_set_cookie_on_301(self):
def test_set_cookie_on_301(self, httpbin):
s = requests.session()
url = httpbin('cookies/set?foo=bar')
s.get(url)
assert s.cookies['foo'] == 'bar'
def test_cookie_sent_on_redirect(self):
def test_cookie_sent_on_redirect(self, httpbin):
s = requests.session()
s.get(httpbin('cookies/set?foo=bar'))
r = s.get(httpbin('redirect/1')) # redirects to httpbin('get')
assert 'Cookie' in r.json()['headers']
def test_cookie_removed_on_expire(self):
def test_cookie_removed_on_expire(self, httpbin):
s = requests.session()
s.get(httpbin('cookies/set?foo=bar'))
assert s.cookies['foo'] == 'bar'
@@ -214,18 +226,18 @@ class RequestsTestCase(unittest.TestCase):
)
assert 'foo' not in s.cookies
def test_cookie_quote_wrapped(self):
def test_cookie_quote_wrapped(self, httpbin):
s = requests.session()
s.get(httpbin('cookies/set?foo="bar:baz"'))
assert s.cookies['foo'] == '"bar:baz"'
def test_cookie_persists_via_api(self):
def test_cookie_persists_via_api(self, httpbin):
s = requests.session()
r = s.get(httpbin('redirect/1'), cookies={'foo': 'bar'})
assert 'foo' in r.request.headers['Cookie']
assert 'foo' in r.history[0].request.headers['Cookie']
def test_request_cookie_overrides_session_cookie(self):
def test_request_cookie_overrides_session_cookie(self, httpbin):
s = requests.session()
s.cookies['foo'] = 'bar'
r = s.get(httpbin('cookies'), cookies={'foo': 'baz'})
@@ -233,13 +245,13 @@ class RequestsTestCase(unittest.TestCase):
# Session cookie should not be modified
assert s.cookies['foo'] == 'bar'
def test_request_cookies_not_persisted(self):
def test_request_cookies_not_persisted(self, httpbin):
s = requests.session()
s.get(httpbin('cookies'), cookies={'foo': 'baz'})
# Sending a request with cookies should not add cookies to the session
assert not s.cookies
def test_generic_cookiejar_works(self):
def test_generic_cookiejar_works(self, httpbin):
cj = cookielib.CookieJar()
cookiejar_from_dict({'foo': 'bar'}, cj)
s = requests.session()
@@ -250,7 +262,7 @@ class RequestsTestCase(unittest.TestCase):
# Make sure the session cj is still the custom one
assert s.cookies is cj
def test_param_cookiejar_works(self):
def test_param_cookiejar_works(self, httpbin):
cj = cookielib.CookieJar()
cookiejar_from_dict({'foo': 'bar'}, cj)
s = requests.session()
@@ -258,13 +270,13 @@ class RequestsTestCase(unittest.TestCase):
# Make sure the cookie was sent
assert r.json()['cookies']['foo'] == 'bar'
def test_requests_in_history_are_not_overridden(self):
def test_requests_in_history_are_not_overridden(self, httpbin):
resp = requests.get(httpbin('redirect/3'))
urls = [r.url for r in resp.history]
req_urls = [r.request.url for r in resp.history]
assert urls == req_urls
def test_history_is_always_a_list(self):
def test_history_is_always_a_list(self, httpbin):
"""
Show that even with redirects, Response.history is always a list.
"""
@@ -274,7 +286,7 @@ class RequestsTestCase(unittest.TestCase):
assert isinstance(resp.history, list)
assert not isinstance(resp.history, tuple)
def test_headers_on_session_with_None_are_not_sent(self):
def test_headers_on_session_with_None_are_not_sent(self, httpbin):
"""Do not send headers in Session.headers with None values."""
ses = requests.Session()
ses.headers['Accept-Encoding'] = None
@@ -282,7 +294,7 @@ class RequestsTestCase(unittest.TestCase):
prep = ses.prepare_request(req)
assert 'Accept-Encoding' not in prep.headers
def test_user_agent_transfers(self):
def test_user_agent_transfers(self, httpbin):
heads = {
'User-agent': 'Mozilla/5.0 (github.com/kennethreitz/requests)'
@@ -298,15 +310,15 @@ class RequestsTestCase(unittest.TestCase):
r = requests.get(httpbin('user-agent'), headers=heads)
assert heads['user-agent'] in r.text
def test_HTTP_200_OK_HEAD(self):
def test_HTTP_200_OK_HEAD(self, httpbin):
r = requests.head(httpbin('get'))
assert r.status_code == 200
def test_HTTP_200_OK_PUT(self):
def test_HTTP_200_OK_PUT(self, httpbin):
r = requests.put(httpbin('put'))
assert r.status_code == 200
def test_BASICAUTH_TUPLE_HTTP_200_OK_GET(self):
def test_BASICAUTH_TUPLE_HTTP_200_OK_GET(self, httpbin):
auth = ('user', 'pass')
url = httpbin('basic-auth', 'user', 'pass')
@@ -329,42 +341,47 @@ class RequestsTestCase(unittest.TestCase):
def test_connection_error_invalid_port(self):
"""Connecting to an invalid port should raise a ConnectionError"""
with pytest.raises(ConnectionError):
requests.get("http://httpbin.org:1", timeout=1)
requests.get("http://localhost:1", timeout=1)
def test_LocationParseError(self):
"""Inputing a URL that cannot be parsed should raise an InvalidURL error"""
with pytest.raises(InvalidURL):
requests.get("http://fe80::5054:ff:fe5a:fc0")
def test_basicauth_with_netrc(self):
def test_basicauth_with_netrc(self, httpbin):
auth = ('user', 'pass')
wrong_auth = ('wronguser', 'wrongpass')
url = httpbin('basic-auth', 'user', 'pass')
def get_netrc_auth_mock(url):
return auth
requests.sessions.get_netrc_auth = get_netrc_auth_mock
old_auth = requests.sessions.get_netrc_auth
# Should use netrc and work.
r = requests.get(url)
assert r.status_code == 200
try:
def get_netrc_auth_mock(url):
return auth
requests.sessions.get_netrc_auth = get_netrc_auth_mock
# Given auth should override and fail.
r = requests.get(url, auth=wrong_auth)
assert r.status_code == 401
# Should use netrc and work.
r = requests.get(url)
assert r.status_code == 200
s = requests.session()
# Given auth should override and fail.
r = requests.get(url, auth=wrong_auth)
assert r.status_code == 401
# Should use netrc and work.
r = s.get(url)
assert r.status_code == 200
s = requests.session()
# Given auth should override and fail.
s.auth = wrong_auth
r = s.get(url)
assert r.status_code == 401
# Should use netrc and work.
r = s.get(url)
assert r.status_code == 200
def test_DIGEST_HTTP_200_OK_GET(self):
# Given auth should override and fail.
s.auth = wrong_auth
r = s.get(url)
assert r.status_code == 401
finally:
requests.sessions.get_netrc_auth = old_auth
def test_DIGEST_HTTP_200_OK_GET(self, httpbin):
auth = HTTPDigestAuth('user', 'pass')
url = httpbin('digest-auth', 'auth', 'user', 'pass')
@@ -380,7 +397,7 @@ class RequestsTestCase(unittest.TestCase):
r = s.get(url)
assert r.status_code == 200
def test_DIGEST_AUTH_RETURNS_COOKIE(self):
def test_DIGEST_AUTH_RETURNS_COOKIE(self, httpbin):
url = httpbin('digest-auth', 'auth', 'user', 'pass')
auth = HTTPDigestAuth('user', 'pass')
r = requests.get(url)
@@ -389,14 +406,14 @@ class RequestsTestCase(unittest.TestCase):
r = requests.get(url, auth=auth)
assert r.status_code == 200
def test_DIGEST_AUTH_SETS_SESSION_COOKIES(self):
def test_DIGEST_AUTH_SETS_SESSION_COOKIES(self, httpbin):
url = httpbin('digest-auth', 'auth', 'user', 'pass')
auth = HTTPDigestAuth('user', 'pass')
s = requests.Session()
s.get(url, auth=auth)
assert s.cookies['fake'] == 'fake_value'
def test_DIGEST_STREAM(self):
def test_DIGEST_STREAM(self, httpbin):
auth = HTTPDigestAuth('user', 'pass')
url = httpbin('digest-auth', 'auth', 'user', 'pass')
@@ -407,7 +424,7 @@ class RequestsTestCase(unittest.TestCase):
r = requests.get(url, auth=auth, stream=False)
assert r.raw.read() == b''
def test_DIGESTAUTH_WRONG_HTTP_401_GET(self):
def test_DIGESTAUTH_WRONG_HTTP_401_GET(self, httpbin):
auth = HTTPDigestAuth('user', 'wrongpass')
url = httpbin('digest-auth', 'auth', 'user', 'pass')
@@ -423,7 +440,7 @@ class RequestsTestCase(unittest.TestCase):
r = s.get(url)
assert r.status_code == 401
def test_DIGESTAUTH_QUOTES_QOP_VALUE(self):
def test_DIGESTAUTH_QUOTES_QOP_VALUE(self, httpbin):
auth = HTTPDigestAuth('user', 'pass')
url = httpbin('digest-auth', 'auth', 'user', 'pass')
@@ -431,22 +448,7 @@ class RequestsTestCase(unittest.TestCase):
r = requests.get(url, auth=auth)
assert '"auth"' in r.request.headers['Authorization']
def test_DIGESTAUTH_THREADED(self):
auth = HTTPDigestAuth('user', 'pass')
url = httpbin('digest-auth', 'auth', 'user', 'pass')
session = requests.Session()
session.auth=auth
def do_request(i):
r = session.get(url)
assert '"auth"' in r.request.headers['Authorization']
return 1
if ThreadPool is not None:
pool = ThreadPool(processes=50)
pool.map(do_request, range(100))
def test_POSTBIN_GET_POST_FILES(self):
def test_POSTBIN_GET_POST_FILES(self, httpbin):
url = httpbin('post')
post1 = requests.post(url).raise_for_status()
@@ -464,7 +466,7 @@ class RequestsTestCase(unittest.TestCase):
with pytest.raises(ValueError):
requests.post(url, files=['bad file data'])
def test_POSTBIN_GET_POST_FILES_WITH_DATA(self):
def test_POSTBIN_GET_POST_FILES_WITH_DATA(self, httpbin):
url = httpbin('post')
post1 = requests.post(url).raise_for_status()
@@ -483,17 +485,17 @@ class RequestsTestCase(unittest.TestCase):
with pytest.raises(ValueError):
requests.post(url, files=['bad file data'])
def test_conflicting_post_params(self):
def test_conflicting_post_params(self, httpbin):
url = httpbin('post')
with open('requirements.txt') as f:
pytest.raises(ValueError, "requests.post(url, data='[{\"some\": \"data\"}]', files={'some': f})")
pytest.raises(ValueError, "requests.post(url, data=u('[{\"some\": \"data\"}]'), files={'some': f})")
def test_request_ok_set(self):
def test_request_ok_set(self, httpbin):
r = requests.get(httpbin('status', '404'))
assert not r.ok
def test_status_raising(self):
def test_status_raising(self, httpbin):
r = requests.get(httpbin('status', '404'))
with pytest.raises(requests.exceptions.HTTPError):
r.raise_for_status()
@@ -501,11 +503,11 @@ class RequestsTestCase(unittest.TestCase):
r = requests.get(httpbin('status', '500'))
assert not r.ok
def test_decompress_gzip(self):
def test_decompress_gzip(self, httpbin):
r = requests.get(httpbin('gzip'))
r.content.decode('ascii')
def test_unicode_get(self):
def test_unicode_get(self, httpbin):
url = httpbin('/get')
requests.get(url, params={'foo': 'føø'})
requests.get(url, params={'føø': 'føø'})
@@ -513,29 +515,29 @@ class RequestsTestCase(unittest.TestCase):
requests.get(url, params={'foo': 'foo'})
requests.get(httpbin('ø'), params={'foo': 'foo'})
def test_unicode_header_name(self):
def test_unicode_header_name(self, httpbin):
requests.put(
httpbin('put'),
headers={str('Content-Type'): 'application/octet-stream'},
data='\xff') # compat.str is unicode.
def test_pyopenssl_redirect(self):
requests.get('https://httpbin.org/status/301')
def test_pyopenssl_redirect(self, httpsbin_url, httpbin_ca_bundle):
requests.get(httpsbin_url('status', '301'), verify=httpbin_ca_bundle)
def test_urlencoded_get_query_multivalued_param(self):
def test_urlencoded_get_query_multivalued_param(self, httpbin):
r = requests.get(httpbin('get'), params=dict(test=['foo', 'baz']))
assert r.status_code == 200
assert r.url == httpbin('get?test=foo&test=baz')
def test_different_encodings_dont_break_post(self):
def test_different_encodings_dont_break_post(self, httpbin):
r = requests.post(httpbin('post'),
data={'stuff': json.dumps({'a': 123})},
params={'blah': 'asdf1234'},
files={'file': ('test_requests.py', open(__file__, 'rb'))})
assert r.status_code == 200
def test_unicode_multipart_post(self):
def test_unicode_multipart_post(self, httpbin):
r = requests.post(httpbin('post'),
data={'stuff': u('ëlïxr')},
files={'file': ('test_requests.py', open(__file__, 'rb'))})
@@ -556,7 +558,7 @@ class RequestsTestCase(unittest.TestCase):
files={'file': ('test_requests.py', open(__file__, 'rb'))})
assert r.status_code == 200
def test_unicode_multipart_post_fieldnames(self):
def test_unicode_multipart_post_fieldnames(self, httpbin):
filename = os.path.splitext(__file__)[0] + '.py'
r = requests.Request(method='POST',
url=httpbin('post'),
@@ -567,13 +569,13 @@ class RequestsTestCase(unittest.TestCase):
assert b'name="stuff"' in prep.body
assert b'name="b\'stuff\'"' not in prep.body
def test_unicode_method_name(self):
def test_unicode_method_name(self, httpbin):
files = {'file': open('test_requests.py', 'rb')}
r = requests.request(
method=u('POST'), url=httpbin('post'), files=files)
assert r.status_code == 200
def test_unicode_method_name_with_request_object(self):
def test_unicode_method_name_with_request_object(self, httpbin):
files = {'file': open('test_requests.py', 'rb')}
s = requests.Session()
req = requests.Request(u("POST"), httpbin('post'), files=files)
@@ -584,7 +586,7 @@ class RequestsTestCase(unittest.TestCase):
resp = s.send(prep)
assert resp.status_code == 200
def test_custom_content_type(self):
def test_custom_content_type(self, httpbin):
r = requests.post(
httpbin('post'),
data={'stuff': json.dumps({'a': 123})},
@@ -594,38 +596,38 @@ class RequestsTestCase(unittest.TestCase):
assert r.status_code == 200
assert b"text/py-content-type" in r.request.body
def test_hook_receives_request_arguments(self):
def test_hook_receives_request_arguments(self, httpbin):
def hook(resp, **kwargs):
assert resp is not None
assert kwargs != {}
requests.Request('GET', HTTPBIN, hooks={'response': hook})
requests.Request('GET', httpbin(), hooks={'response': hook})
def test_session_hooks_are_used_with_no_request_hooks(self):
def test_session_hooks_are_used_with_no_request_hooks(self, httpbin):
hook = lambda x, *args, **kwargs: x
s = requests.Session()
s.hooks['response'].append(hook)
r = requests.Request('GET', HTTPBIN)
r = requests.Request('GET', httpbin())
prep = s.prepare_request(r)
assert prep.hooks['response'] != []
assert prep.hooks['response'] == [hook]
def test_session_hooks_are_overriden_by_request_hooks(self):
def test_session_hooks_are_overriden_by_request_hooks(self, httpbin):
hook1 = lambda x, *args, **kwargs: x
hook2 = lambda x, *args, **kwargs: x
assert hook1 is not hook2
s = requests.Session()
s.hooks['response'].append(hook2)
r = requests.Request('GET', HTTPBIN, hooks={'response': [hook1]})
r = requests.Request('GET', httpbin(), hooks={'response': [hook1]})
prep = s.prepare_request(r)
assert prep.hooks['response'] == [hook1]
def test_prepared_request_hook(self):
def test_prepared_request_hook(self, httpbin):
def hook(resp, **kwargs):
resp.hook_working = True
return resp
req = requests.Request('GET', HTTPBIN, hooks={'response': hook})
req = requests.Request('GET', httpbin(), hooks={'response': hook})
prep = req.prepare()
s = requests.Session()
@@ -634,7 +636,7 @@ class RequestsTestCase(unittest.TestCase):
assert hasattr(resp, 'hook_working')
def test_prepared_from_session(self):
def test_prepared_from_session(self, httpbin):
class DummyAuth(requests.auth.AuthBase):
def __call__(self, r):
r.headers['Dummy-Auth-Test'] = 'dummy-auth-test-ok'
@@ -787,7 +789,7 @@ class RequestsTestCase(unittest.TestCase):
# make sure one can use items multiple times
assert list(items) == list(items)
def test_time_elapsed_blank(self):
def test_time_elapsed_blank(self, httpbin):
r = requests.get(httpbin('get'))
td = r.elapsed
total_seconds = ((td.microseconds + (td.seconds + td.days * 24 * 3600)
@@ -826,7 +828,7 @@ class RequestsTestCase(unittest.TestCase):
chunks = r.iter_content(decode_unicode=True)
assert all(isinstance(chunk, str) for chunk in chunks)
def test_request_and_response_are_pickleable(self):
def test_request_and_response_are_pickleable(self, httpbin):
r = requests.get(httpbin('get'))
# verify we can pickle the original request
@@ -858,8 +860,8 @@ class RequestsTestCase(unittest.TestCase):
url = 'http://user:pass%23pass@complex.url.com/path?query=yes'
assert ('user', 'pass#pass') == requests.utils.get_auth_from_url(url)
def test_cannot_send_unprepared_requests(self):
r = requests.Request(url=HTTPBIN)
def test_cannot_send_unprepared_requests(self, httpbin):
r = requests.Request(url=httpbin())
with pytest.raises(ValueError):
requests.Session().send(r)
@@ -873,7 +875,7 @@ class RequestsTestCase(unittest.TestCase):
assert str(error) == 'message'
assert error.response == response
def test_session_pickling(self):
def test_session_pickling(self, httpbin):
r = requests.Request('GET', httpbin('get'))
s = requests.Session()
@@ -883,7 +885,7 @@ class RequestsTestCase(unittest.TestCase):
r = s.send(r.prepare())
assert r.status_code == 200
def test_fixes_1329(self):
def test_fixes_1329(self, httpbin):
"""
Ensure that header updates are done case-insensitively.
"""
@@ -896,7 +898,7 @@ class RequestsTestCase(unittest.TestCase):
assert headers['Accept'] == 'application/json'
assert headers['ACCEPT'] == 'application/json'
def test_uppercase_scheme_redirect(self):
def test_uppercase_scheme_redirect(self, httpbin):
parts = urlparse(httpbin('html'))
url = "HTTP://" + parts.netloc + parts.path
r = requests.get(httpbin('redirect-to'), params={'url': url})
@@ -941,14 +943,14 @@ class RequestsTestCase(unittest.TestCase):
assert 'http://' in s2.adapters
assert 'https://' in s2.adapters
def test_header_remove_is_case_insensitive(self):
def test_header_remove_is_case_insensitive(self, httpbin):
# From issue #1321
s = requests.Session()
s.headers['foo'] = 'bar'
r = s.get(httpbin('get'), headers={'FOO': None})
assert 'foo' not in r.request.headers
def test_params_are_merged_case_sensitive(self):
def test_params_are_merged_case_sensitive(self, httpbin):
s = requests.Session()
s.params['foo'] = 'bar'
r = s.get(httpbin('get'), params={'FOO': 'bar'})
@@ -963,7 +965,7 @@ class RequestsTestCase(unittest.TestCase):
r = requests.Request('GET', url).prepare()
assert r.url == url
def test_header_keys_are_native(self):
def test_header_keys_are_native(self, httpbin):
headers = {u('unicode'): 'blah', 'byte'.encode('ascii'): 'blah'}
r = requests.Request('GET', httpbin('get'), headers=headers)
p = r.prepare()
@@ -973,7 +975,7 @@ class RequestsTestCase(unittest.TestCase):
assert 'unicode' in p.headers.keys()
assert 'byte' in p.headers.keys()
def test_can_send_nonstring_objects_with_files(self):
def test_can_send_nonstring_objects_with_files(self, httpbin):
data = {'a': 0.0}
files = {'b': 'foo'}
r = requests.Request('POST', httpbin('post'), data=data, files=files)
@@ -981,7 +983,7 @@ class RequestsTestCase(unittest.TestCase):
assert 'multipart/form-data' in p.headers['Content-Type']
def test_can_send_bytes_bytearray_objects_with_files(self):
def test_can_send_bytes_bytearray_objects_with_files(self, httpbin):
# Test bytes:
data = {'a': 'this is a string'}
files = {'b': b'foo'}
@@ -994,7 +996,7 @@ class RequestsTestCase(unittest.TestCase):
p = r.prepare()
assert 'multipart/form-data' in p.headers['Content-Type']
def test_can_send_file_object_with_non_string_filename(self):
def test_can_send_file_object_with_non_string_filename(self, httpbin):
f = io.BytesIO()
f.name = 2
r = requests.Request('POST', httpbin('post'), files={'f': f})
@@ -1002,7 +1004,7 @@ class RequestsTestCase(unittest.TestCase):
assert 'multipart/form-data' in p.headers['Content-Type']
def test_autoset_header_values_are_native(self):
def test_autoset_header_values_are_native(self, httpbin):
data = 'this is a string'
length = '16'
req = requests.Request('POST', httpbin('post'), data=data)
@@ -1021,7 +1023,7 @@ class RequestsTestCase(unittest.TestCase):
preq = req.prepare()
assert test_url == preq.url
def test_auth_is_stripped_on_redirect_off_host(self):
def test_auth_is_stripped_on_redirect_off_host(self, httpbin):
r = requests.get(
httpbin('redirect-to'),
params={'url': 'http://www.google.co.uk'},
@@ -1030,14 +1032,14 @@ class RequestsTestCase(unittest.TestCase):
assert r.history[0].request.headers['Authorization']
assert not r.request.headers.get('Authorization', '')
def test_auth_is_retained_for_redirect_on_host(self):
def test_auth_is_retained_for_redirect_on_host(self, httpbin):
r = requests.get(httpbin('redirect/1'), auth=('user', 'pass'))
h1 = r.history[0].request.headers['Authorization']
h2 = r.request.headers['Authorization']
assert h1 == h2
def test_manual_redirect_with_partial_body_read(self):
def test_manual_redirect_with_partial_body_read(self, httpbin):
s = requests.Session()
r1 = s.get(httpbin('redirect/2'), allow_redirects=False, stream=True)
assert r1.is_redirect
@@ -1070,7 +1072,7 @@ class RequestsTestCase(unittest.TestCase):
adapter.build_response = build_response
def test_redirect_with_wrong_gzipped_header(self):
def test_redirect_with_wrong_gzipped_header(self, httpbin):
s = requests.Session()
url = httpbin('redirect/1')
self._patch_adapter_gzipped_redirect(s, url)
@@ -1081,7 +1083,7 @@ class RequestsTestCase(unittest.TestCase):
assert isinstance(s, builtin_str)
assert s == "Basic dGVzdDp0ZXN0"
def test_requests_history_is_saved(self):
def test_requests_history_is_saved(self, httpbin):
r = requests.get(httpbin('redirect/5'))
total = r.history[-1].history
i = 0
@@ -1089,7 +1091,7 @@ class RequestsTestCase(unittest.TestCase):
assert item.history == total[0:i]
i = i + 1
def test_json_param_post_content_type_works(self):
def test_json_param_post_content_type_works(self, httpbin):
r = requests.post(
httpbin('post'),
json={'life': 42}
@@ -1098,14 +1100,14 @@ class RequestsTestCase(unittest.TestCase):
assert 'application/json' in r.request.headers['Content-Type']
assert {'life': 42} == r.json()['json']
def test_json_param_post_should_not_override_data_param(self):
r = requests.Request(method='POST', url='http://httpbin.org/post',
def test_json_param_post_should_not_override_data_param(self, httpbin):
r = requests.Request(method='POST', url=httpbin('post'),
data={'stuff': 'elixr'},
json={'music': 'flute'})
prep = r.prepare()
assert 'stuff=elixr' == prep.body
def test_response_iter_lines(self):
def test_response_iter_lines(self, httpbin):
r = requests.get(httpbin('stream/4'), stream=True)
assert r.status_code == 200
@@ -1113,17 +1115,17 @@ class RequestsTestCase(unittest.TestCase):
next(it)
assert len(list(it)) == 3
def test_unconsumed_session_response_closes_connection(self):
def test_unconsumed_session_response_closes_connection(self, httpbin):
s = requests.session()
with contextlib.closing(s.get(httpbin('stream/4'), stream=True)) as response:
pass
self.assertFalse(response._content_consumed)
self.assertTrue(response.raw.closed)
assert response._content_consumed is False
assert response.raw.closed
@pytest.mark.xfail
def test_response_iter_lines_reentrant(self):
def test_response_iter_lines_reentrant(self, httpbin):
"""Response.iter_lines() is not reentrant safe"""
r = requests.get(httpbin('stream/4'), stream=True)
assert r.status_code == 200
@@ -1517,13 +1519,13 @@ class TestMorselToCookieMaxAge(unittest.TestCase):
class TestTimeout:
def test_stream_timeout(self):
def test_stream_timeout(self, httpbin):
try:
requests.get(httpbin('delay/10'), timeout=2.0)
except requests.exceptions.Timeout as e:
assert 'Read timed out' in e.args[0].args[0]
def test_invalid_timeout(self):
def test_invalid_timeout(self, httpbin):
with pytest.raises(ValueError) as e:
requests.get(httpbin('get'), timeout=(3, 4, 5))
assert '(connect, read)' in str(e)
@@ -1532,7 +1534,7 @@ class TestTimeout:
requests.get(httpbin('get'), timeout="foo")
assert 'must be an int or float' in str(e)
def test_none_timeout(self):
def test_none_timeout(self, httpbin):
""" Check that you can set None as a valid timeout value.
To actually test this behavior, we'd want to check that setting the
@@ -1544,7 +1546,7 @@ class TestTimeout:
r = requests.get(httpbin('get'), timeout=None)
assert r.status_code == 200
def test_read_timeout(self):
def test_read_timeout(self, httpbin):
try:
requests.get(httpbin('delay/10'), timeout=(None, 0.1))
assert False, "The recv() request should time out."
@@ -1566,7 +1568,7 @@ class TestTimeout:
except ConnectTimeout:
pass
def test_encoded_methods(self):
def test_encoded_methods(self, httpbin):
"""See: https://github.com/kennethreitz/requests/issues/2316"""
r = requests.request(b'GET', httpbin('get'))
assert r.ok
@@ -1617,7 +1619,7 @@ class TestRedirects:
'proxies': {},
}
def test_requests_are_updated_each_time(self):
def test_requests_are_updated_each_time(self, httpbin):
session = RedirectSession([303, 307])
prep = requests.Request('POST', httpbin('post')).prepare()
r0 = session.send(prep)
@@ -1699,7 +1701,7 @@ def test_prepare_unicode_url():
assert_copy(p, p.copy())
def test_urllib3_retries():
def test_urllib3_retries(httpbin):
from requests.packages.urllib3.util import Retry
s = requests.Session()
s.mount('http://', HTTPAdapter(max_retries=Retry(
@@ -1710,14 +1712,14 @@ def test_urllib3_retries():
s.get(httpbin('status/500'))
def test_urllib3_pool_connection_closed():
def test_urllib3_pool_connection_closed(httpbin):
s = requests.Session()
s.mount('http://', HTTPAdapter(pool_connections=0, pool_maxsize=0))
try:
s.get(httpbin('status/200'))
except ConnectionError as e:
assert u"HTTPConnectionPool(host='httpbin.org', port=80): Pool is closed." in str(e)
assert u"Pool is closed." in str(e)
def test_vendor_aliases():
from requests.packages import urllib3