mirror of
https://github.com/kennethreitz/requests.git
synced 2026-06-05 22:50:18 +00:00
Merge remote-tracking branch 'origin/master'
This commit is contained in:
@@ -113,6 +113,36 @@ prepare it immediately and modified the ``PreparedRequest`` object. You then
|
||||
send that with the other parameters you would have sent to ``requests.*`` or
|
||||
``Sesssion.*``.
|
||||
|
||||
However, the above code will lose some of the advantages of having a Requests
|
||||
:class:`Session <requests.Session>` object. In particular,
|
||||
:class:`Session <requests.Session>`-level state such as cookies will
|
||||
not get applied to your request. To get a
|
||||
:class:`PreparedRequest <requests.models.PreparedRequest>` with that state
|
||||
applied, replace the call to ``Request.prepare()`` with a call to
|
||||
``Session.prepare_request()``, like this::
|
||||
|
||||
from requests import Request, Session
|
||||
|
||||
s = Session()
|
||||
req = Request('GET', # or any other method, 'POST', 'PUT', etc.
|
||||
url,
|
||||
data=data
|
||||
headers=headers
|
||||
# ...
|
||||
)
|
||||
prepped = s.prepare_request(req)
|
||||
# do something with prepped.body
|
||||
# do something with prepped.headers
|
||||
resp = s.send(prepped,
|
||||
stream=stream,
|
||||
verify=verify,
|
||||
proxies=proxies,
|
||||
cert=cert,
|
||||
timeout=timeout,
|
||||
# etc.
|
||||
)
|
||||
print(resp.status_code)
|
||||
|
||||
SSL Cert Verification
|
||||
---------------------
|
||||
|
||||
|
||||
+22
-17
@@ -77,7 +77,7 @@ class HTTPDigestAuth(AuthBase):
|
||||
else:
|
||||
_algorithm = algorithm.upper()
|
||||
# lambdas assume digest modules are imported at the top level
|
||||
if _algorithm == 'MD5':
|
||||
if _algorithm == 'MD5' or _algorithm == 'MD5-SESS':
|
||||
def md5_utf8(x):
|
||||
if isinstance(x, str):
|
||||
x = x.encode('utf-8')
|
||||
@@ -89,7 +89,7 @@ class HTTPDigestAuth(AuthBase):
|
||||
x = x.encode('utf-8')
|
||||
return hashlib.sha1(x).hexdigest()
|
||||
hash_utf8 = sha_utf8
|
||||
# XXX MD5-sess
|
||||
|
||||
KD = lambda s, d: hash_utf8("%s:%s" % (s, d))
|
||||
|
||||
if hash_utf8 is None:
|
||||
@@ -104,24 +104,29 @@ class HTTPDigestAuth(AuthBase):
|
||||
|
||||
A1 = '%s:%s:%s' % (self.username, realm, self.password)
|
||||
A2 = '%s:%s' % (method, path)
|
||||
|
||||
HA1 = hash_utf8(A1)
|
||||
HA2 = hash_utf8(A2)
|
||||
|
||||
if nonce == self.last_nonce:
|
||||
self.nonce_count += 1
|
||||
else:
|
||||
self.nonce_count = 1
|
||||
ncvalue = '%08x' % self.nonce_count
|
||||
s = str(self.nonce_count).encode('utf-8')
|
||||
s += nonce.encode('utf-8')
|
||||
s += time.ctime().encode('utf-8')
|
||||
s += os.urandom(8)
|
||||
|
||||
cnonce = (hashlib.sha1(s).hexdigest()[:16])
|
||||
noncebit = "%s:%s:%s:%s:%s" % (nonce, ncvalue, cnonce, qop, HA2)
|
||||
if _algorithm == 'MD5-SESS':
|
||||
HA1 = hash_utf8('%s:%s:%s' % (HA1, nonce, cnonce))
|
||||
|
||||
if qop is None:
|
||||
respdig = KD(hash_utf8(A1), "%s:%s" % (nonce, hash_utf8(A2)))
|
||||
respdig = KD(HA1, "%s:%s" % (nonce, HA2))
|
||||
elif qop == 'auth' or 'auth' in qop.split(','):
|
||||
if nonce == self.last_nonce:
|
||||
self.nonce_count += 1
|
||||
else:
|
||||
self.nonce_count = 1
|
||||
|
||||
ncvalue = '%08x' % self.nonce_count
|
||||
s = str(self.nonce_count).encode('utf-8')
|
||||
s += nonce.encode('utf-8')
|
||||
s += time.ctime().encode('utf-8')
|
||||
s += os.urandom(8)
|
||||
|
||||
cnonce = (hashlib.sha1(s).hexdigest()[:16])
|
||||
noncebit = "%s:%s:%s:%s:%s" % (nonce, ncvalue, cnonce, qop, hash_utf8(A2))
|
||||
respdig = KD(hash_utf8(A1), noncebit)
|
||||
respdig = KD(HA1, noncebit)
|
||||
else:
|
||||
# XXX handle auth-int.
|
||||
return None
|
||||
|
||||
@@ -26,12 +26,18 @@ packages = [
|
||||
|
||||
requires = []
|
||||
|
||||
with open('README.rst') as f:
|
||||
readme = f.read()
|
||||
with open('HISTORY.rst') as f:
|
||||
history = f.read()
|
||||
with open('LICENSE') as f:
|
||||
license = f.read()
|
||||
|
||||
setup(
|
||||
name='requests',
|
||||
version=requests.__version__,
|
||||
description='Python HTTP for Humans.',
|
||||
long_description=open('README.rst').read() + '\n\n' +
|
||||
open('HISTORY.rst').read(),
|
||||
long_description=readme + '\n\n' + history,
|
||||
author='Kenneth Reitz',
|
||||
author_email='me@kennethreitz.com',
|
||||
url='http://python-requests.org',
|
||||
@@ -40,7 +46,7 @@ setup(
|
||||
package_dir={'requests': 'requests'},
|
||||
include_package_data=True,
|
||||
install_requires=requires,
|
||||
license=open('LICENSE').read(),
|
||||
license=license,
|
||||
zip_safe=False,
|
||||
classifiers=(
|
||||
'Development Status :: 5 - Production/Stable',
|
||||
|
||||
+161
-182
@@ -57,8 +57,10 @@ class RequestsTestCase(unittest.TestCase):
|
||||
requests.post
|
||||
|
||||
def test_invalid_url(self):
|
||||
self.assertRaises(MissingSchema, requests.get, 'hiwpefhipowhefopw')
|
||||
self.assertRaises(InvalidURL, requests.get, 'http://')
|
||||
with pytest.raises(MissingSchema):
|
||||
requests.get('hiwpefhipowhefopw')
|
||||
with pytest.raises(InvalidURL):
|
||||
requests.get('http://')
|
||||
|
||||
def test_basic_building(self):
|
||||
req = requests.Request()
|
||||
@@ -71,24 +73,22 @@ class RequestsTestCase(unittest.TestCase):
|
||||
|
||||
def test_no_content_length(self):
|
||||
get_req = requests.Request('GET', httpbin('get')).prepare()
|
||||
self.assertTrue('Content-Length' not in get_req.headers)
|
||||
assert 'Content-Length' not in get_req.headers
|
||||
head_req = requests.Request('HEAD', httpbin('head')).prepare()
|
||||
self.assertTrue('Content-Length' not in head_req.headers)
|
||||
assert 'Content-Length' not in head_req.headers
|
||||
|
||||
def test_path_is_not_double_encoded(self):
|
||||
request = requests.Request('GET', "http://0.0.0.0/get/test case").prepare()
|
||||
|
||||
self.assertEqual(request.path_url, "/get/test%20case")
|
||||
assert request.path_url == '/get/test%20case'
|
||||
|
||||
def test_params_are_added_before_fragment(self):
|
||||
request = requests.Request('GET',
|
||||
"http://example.com/path#fragment", params={"a": "b"}).prepare()
|
||||
self.assertEqual(request.url,
|
||||
"http://example.com/path?a=b#fragment")
|
||||
assert request.url == "http://example.com/path?a=b#fragment"
|
||||
request = requests.Request('GET',
|
||||
"http://example.com/path?key=value#fragment", params={"a": "b"}).prepare()
|
||||
self.assertEqual(request.url,
|
||||
"http://example.com/path?key=value&a=b#fragment")
|
||||
assert request.url == "http://example.com/path?key=value&a=b#fragment"
|
||||
|
||||
def test_mixed_case_scheme_acceptable(self):
|
||||
s = requests.Session()
|
||||
@@ -100,8 +100,7 @@ class RequestsTestCase(unittest.TestCase):
|
||||
url = scheme + parts.netloc + parts.path
|
||||
r = requests.Request('GET', url)
|
||||
r = s.send(r.prepare())
|
||||
self.assertEqual(r.status_code, 200,
|
||||
"failed for scheme %s" % scheme)
|
||||
assert r.status_code == 200, 'failed for scheme {0}'.format(scheme)
|
||||
|
||||
def test_HTTP_200_OK_GET_ALTERNATIVE(self):
|
||||
r = requests.Request('GET', httpbin('get'))
|
||||
@@ -110,11 +109,11 @@ class RequestsTestCase(unittest.TestCase):
|
||||
|
||||
r = s.send(r.prepare())
|
||||
|
||||
self.assertEqual(r.status_code, 200)
|
||||
assert r.status_code == 200
|
||||
|
||||
def test_HTTP_302_ALLOW_REDIRECT_GET(self):
|
||||
r = requests.get(httpbin('redirect', '1'))
|
||||
self.assertEqual(r.status_code, 200)
|
||||
assert r.status_code == 200
|
||||
|
||||
# def test_HTTP_302_ALLOW_REDIRECT_POST(self):
|
||||
# r = requests.post(httpbin('status', '302'), data={'some': 'data'})
|
||||
@@ -125,31 +124,31 @@ class RequestsTestCase(unittest.TestCase):
|
||||
|
||||
r = requests.get(httpbin('user-agent'), headers=heads)
|
||||
|
||||
self.assertTrue(heads['User-agent'] in r.text)
|
||||
self.assertEqual(r.status_code, 200)
|
||||
assert heads['User-agent'] in r.text
|
||||
assert r.status_code == 200
|
||||
|
||||
def test_HTTP_200_OK_GET_WITH_MIXED_PARAMS(self):
|
||||
heads = {'User-agent': 'Mozilla/5.0'}
|
||||
|
||||
r = requests.get(httpbin('get') + '?test=true', params={'q': 'test'}, headers=heads)
|
||||
self.assertEqual(r.status_code, 200)
|
||||
assert r.status_code == 200
|
||||
|
||||
def test_set_cookie_on_301(self):
|
||||
s = requests.session()
|
||||
url = httpbin('cookies/set?foo=bar')
|
||||
r = s.get(url)
|
||||
self.assertTrue(s.cookies['foo'] == 'bar')
|
||||
assert s.cookies['foo'] == 'bar'
|
||||
|
||||
def test_cookie_sent_on_redirect(self):
|
||||
s = requests.session()
|
||||
s.get(httpbin('cookies/set?foo=bar'))
|
||||
r = s.get(httpbin('redirect/1')) # redirects to httpbin('get')
|
||||
self.assertTrue("Cookie" in r.json()["headers"])
|
||||
assert 'Cookie' in r.json()['headers']
|
||||
|
||||
def test_cookie_removed_on_expire(self):
|
||||
s = requests.session()
|
||||
s.get(httpbin('cookies/set?foo=bar'))
|
||||
self.assertTrue(s.cookies['foo'] == 'bar')
|
||||
assert s.cookies['foo'] == 'bar'
|
||||
s.get(
|
||||
httpbin('response-headers'),
|
||||
params={
|
||||
@@ -162,13 +161,13 @@ class RequestsTestCase(unittest.TestCase):
|
||||
def test_cookie_quote_wrapped(self):
|
||||
s = requests.session()
|
||||
s.get(httpbin('cookies/set?foo="bar:baz"'))
|
||||
self.assertTrue(s.cookies['foo'] == '"bar:baz"')
|
||||
assert s.cookies['foo'] == '"bar:baz"'
|
||||
|
||||
def test_cookie_persists_via_api(self):
|
||||
s = requests.session()
|
||||
r = s.get(httpbin('redirect/1'), cookies={'foo':'bar'})
|
||||
self.assertTrue('foo' in r.request.headers['Cookie'])
|
||||
self.assertTrue('foo' in r.history[0].request.headers['Cookie'])
|
||||
assert 'foo' in r.request.headers['Cookie']
|
||||
assert 'foo' in r.history[0].request.headers['Cookie']
|
||||
|
||||
def test_request_cookie_overrides_session_cookie(self):
|
||||
s = requests.session()
|
||||
@@ -193,7 +192,7 @@ class RequestsTestCase(unittest.TestCase):
|
||||
resp = requests.get(httpbin('redirect/3'))
|
||||
urls = [r.url for r in resp.history]
|
||||
req_urls = [r.request.url for r in resp.history]
|
||||
self.assertEquals(urls, req_urls)
|
||||
assert urls == req_urls
|
||||
|
||||
def test_user_agent_transfers(self):
|
||||
|
||||
@@ -202,37 +201,37 @@ class RequestsTestCase(unittest.TestCase):
|
||||
}
|
||||
|
||||
r = requests.get(httpbin('user-agent'), headers=heads)
|
||||
self.assertTrue(heads['User-agent'] in r.text)
|
||||
assert heads['User-agent'] in r.text
|
||||
|
||||
heads = {
|
||||
'user-agent': 'Mozilla/5.0 (github.com/kennethreitz/requests)'
|
||||
}
|
||||
|
||||
r = requests.get(httpbin('user-agent'), headers=heads)
|
||||
self.assertTrue(heads['user-agent'] in r.text)
|
||||
assert heads['user-agent'] in r.text
|
||||
|
||||
def test_HTTP_200_OK_HEAD(self):
|
||||
r = requests.head(httpbin('get'))
|
||||
self.assertEqual(r.status_code, 200)
|
||||
assert r.status_code == 200
|
||||
|
||||
def test_HTTP_200_OK_PUT(self):
|
||||
r = requests.put(httpbin('put'))
|
||||
self.assertEqual(r.status_code, 200)
|
||||
assert r.status_code == 200
|
||||
|
||||
def test_BASICAUTH_TUPLE_HTTP_200_OK_GET(self):
|
||||
auth = ('user', 'pass')
|
||||
url = httpbin('basic-auth', 'user', 'pass')
|
||||
|
||||
r = requests.get(url, auth=auth)
|
||||
self.assertEqual(r.status_code, 200)
|
||||
assert r.status_code == 200
|
||||
|
||||
r = requests.get(url)
|
||||
self.assertEqual(r.status_code, 401)
|
||||
assert r.status_code == 401
|
||||
|
||||
s = requests.session()
|
||||
s.auth = auth
|
||||
r = s.get(url)
|
||||
self.assertEqual(r.status_code, 200)
|
||||
assert r.status_code == 200
|
||||
|
||||
def test_basicauth_with_netrc(self):
|
||||
auth = ('user', 'pass')
|
||||
@@ -245,22 +244,22 @@ class RequestsTestCase(unittest.TestCase):
|
||||
|
||||
# Should use netrc and work.
|
||||
r = requests.get(url)
|
||||
self.assertEqual(r.status_code, 200)
|
||||
assert r.status_code == 200
|
||||
|
||||
# Given auth should override and fail.
|
||||
r = requests.get(url, auth=wrong_auth)
|
||||
self.assertEqual(r.status_code, 401)
|
||||
assert r.status_code == 401
|
||||
|
||||
s = requests.session()
|
||||
|
||||
# Should use netrc and work.
|
||||
r = s.get(url)
|
||||
self.assertEqual(r.status_code, 200)
|
||||
assert r.status_code == 200
|
||||
|
||||
# Given auth should override and fail.
|
||||
s.auth = wrong_auth
|
||||
r = s.get(url)
|
||||
self.assertEqual(r.status_code, 401)
|
||||
assert r.status_code == 401
|
||||
|
||||
def test_DIGEST_HTTP_200_OK_GET(self):
|
||||
|
||||
@@ -268,15 +267,15 @@ class RequestsTestCase(unittest.TestCase):
|
||||
url = httpbin('digest-auth', 'auth', 'user', 'pass')
|
||||
|
||||
r = requests.get(url, auth=auth)
|
||||
self.assertEqual(r.status_code, 200)
|
||||
assert r.status_code == 200
|
||||
|
||||
r = requests.get(url)
|
||||
self.assertEqual(r.status_code, 401)
|
||||
assert r.status_code == 401
|
||||
|
||||
s = requests.session()
|
||||
s.auth = HTTPDigestAuth('user', 'pass')
|
||||
r = s.get(url)
|
||||
self.assertEqual(r.status_code, 200)
|
||||
assert r.status_code == 200
|
||||
|
||||
def test_DIGEST_AUTH_RETURNS_COOKIE(self):
|
||||
url = httpbin('digest-auth', 'auth', 'user', 'pass')
|
||||
@@ -300,11 +299,10 @@ class RequestsTestCase(unittest.TestCase):
|
||||
url = httpbin('digest-auth', 'auth', 'user', 'pass')
|
||||
|
||||
r = requests.get(url, auth=auth, stream=True)
|
||||
self.assertNotEqual(r.raw.read(), b'')
|
||||
assert r.raw.read() != b''
|
||||
|
||||
r = requests.get(url, auth=auth, stream=False)
|
||||
self.assertEqual(r.raw.read(), b'')
|
||||
|
||||
assert r.raw.read() == b''
|
||||
|
||||
def test_DIGESTAUTH_WRONG_HTTP_401_GET(self):
|
||||
|
||||
@@ -312,15 +310,15 @@ class RequestsTestCase(unittest.TestCase):
|
||||
url = httpbin('digest-auth', 'auth', 'user', 'pass')
|
||||
|
||||
r = requests.get(url, auth=auth)
|
||||
self.assertEqual(r.status_code, 401)
|
||||
assert r.status_code == 401
|
||||
|
||||
r = requests.get(url)
|
||||
self.assertEqual(r.status_code, 401)
|
||||
assert r.status_code == 401
|
||||
|
||||
s = requests.session()
|
||||
s.auth = auth
|
||||
r = s.get(url)
|
||||
self.assertEqual(r.status_code, 401)
|
||||
assert r.status_code == 401
|
||||
|
||||
def test_POSTBIN_GET_POST_FILES(self):
|
||||
|
||||
@@ -328,19 +326,17 @@ class RequestsTestCase(unittest.TestCase):
|
||||
post1 = requests.post(url).raise_for_status()
|
||||
|
||||
post1 = requests.post(url, data={'some': 'data'})
|
||||
self.assertEqual(post1.status_code, 200)
|
||||
assert post1.status_code == 200
|
||||
|
||||
with open('requirements.txt') as f:
|
||||
post2 = requests.post(url, files={'some': f})
|
||||
self.assertEqual(post2.status_code, 200)
|
||||
assert post2.status_code == 200
|
||||
|
||||
post4 = requests.post(url, data='[{"some": "json"}]')
|
||||
self.assertEqual(post4.status_code, 200)
|
||||
assert post4.status_code == 200
|
||||
|
||||
try:
|
||||
requests.post(url, files=['bad file data'])
|
||||
except ValueError:
|
||||
pass
|
||||
with pytest.raises(ValueError):
|
||||
requests.post(url, files = ['bad file data'])
|
||||
|
||||
def test_POSTBIN_GET_POST_FILES_WITH_DATA(self):
|
||||
|
||||
@@ -348,19 +344,17 @@ class RequestsTestCase(unittest.TestCase):
|
||||
post1 = requests.post(url).raise_for_status()
|
||||
|
||||
post1 = requests.post(url, data={'some': 'data'})
|
||||
self.assertEqual(post1.status_code, 200)
|
||||
assert post1.status_code == 200
|
||||
|
||||
with open('requirements.txt') as f:
|
||||
post2 = requests.post(url, data={'some': 'data'}, files={'some': f})
|
||||
self.assertEqual(post2.status_code, 200)
|
||||
assert post2.status_code == 200
|
||||
|
||||
post4 = requests.post(url, data='[{"some": "json"}]')
|
||||
self.assertEqual(post4.status_code, 200)
|
||||
assert post4.status_code == 200
|
||||
|
||||
try:
|
||||
requests.post(url, files=['bad file data'])
|
||||
except ValueError:
|
||||
pass
|
||||
with pytest.raises(ValueError):
|
||||
requests.post(url, files = ['bad file data'])
|
||||
|
||||
def test_conflicting_post_params(self):
|
||||
url = httpbin('post')
|
||||
@@ -370,14 +364,15 @@ class RequestsTestCase(unittest.TestCase):
|
||||
|
||||
def test_request_ok_set(self):
|
||||
r = requests.get(httpbin('status', '404'))
|
||||
self.assertEqual(r.ok, False)
|
||||
assert not r.ok
|
||||
|
||||
def test_status_raising(self):
|
||||
r = requests.get(httpbin('status', '404'))
|
||||
self.assertRaises(requests.exceptions.HTTPError, r.raise_for_status)
|
||||
with pytest.raises(requests.exceptions.HTTPError):
|
||||
r.raise_for_status()
|
||||
|
||||
r = requests.get(httpbin('status', '500'))
|
||||
self.assertFalse(r.ok)
|
||||
assert not r.ok
|
||||
|
||||
def test_decompress_gzip(self):
|
||||
r = requests.get(httpbin('gzip'))
|
||||
@@ -397,36 +392,36 @@ class RequestsTestCase(unittest.TestCase):
|
||||
def test_urlencoded_get_query_multivalued_param(self):
|
||||
|
||||
r = requests.get(httpbin('get'), params=dict(test=['foo', 'baz']))
|
||||
self.assertEqual(r.status_code, 200)
|
||||
self.assertEqual(r.url, httpbin('get?test=foo&test=baz'))
|
||||
assert r.status_code == 200
|
||||
assert r.url == httpbin('get?test=foo&test=baz')
|
||||
|
||||
def test_different_encodings_dont_break_post(self):
|
||||
r = requests.post(httpbin('post'),
|
||||
data={'stuff': json.dumps({'a': 123})},
|
||||
params={'blah': 'asdf1234'},
|
||||
files={'file': ('test_requests.py', open(__file__, 'rb'))})
|
||||
self.assertEqual(r.status_code, 200)
|
||||
assert r.status_code == 200
|
||||
|
||||
def test_unicode_multipart_post(self):
|
||||
r = requests.post(httpbin('post'),
|
||||
data={'stuff': u'ëlïxr'},
|
||||
files={'file': ('test_requests.py', open(__file__, 'rb'))})
|
||||
self.assertEqual(r.status_code, 200)
|
||||
assert r.status_code == 200
|
||||
|
||||
r = requests.post(httpbin('post'),
|
||||
data={'stuff': u'ëlïxr'.encode('utf-8')},
|
||||
files={'file': ('test_requests.py', open(__file__, 'rb'))})
|
||||
self.assertEqual(r.status_code, 200)
|
||||
assert r.status_code == 200
|
||||
|
||||
r = requests.post(httpbin('post'),
|
||||
data={'stuff': 'elixr'},
|
||||
files={'file': ('test_requests.py', open(__file__, 'rb'))})
|
||||
self.assertEqual(r.status_code, 200)
|
||||
assert r.status_code == 200
|
||||
|
||||
r = requests.post(httpbin('post'),
|
||||
data={'stuff': 'elixr'.encode('utf-8')},
|
||||
files={'file': ('test_requests.py', open(__file__, 'rb'))})
|
||||
self.assertEqual(r.status_code, 200)
|
||||
assert r.status_code == 200
|
||||
|
||||
def test_unicode_multipart_post_fieldnames(self):
|
||||
filename = os.path.splitext(__file__)[0] + '.py'
|
||||
@@ -436,8 +431,8 @@ class RequestsTestCase(unittest.TestCase):
|
||||
files={'file': ('test_requests.py',
|
||||
open(filename, 'rb'))})
|
||||
prep = r.prepare()
|
||||
self.assertTrue(b'name="stuff"' in prep.body)
|
||||
self.assertFalse(b'name="b\'stuff\'"' in prep.body)
|
||||
assert b'name="stuff"' in prep.body
|
||||
assert b'name="b\'stuff\'"' not in prep.body
|
||||
|
||||
def test_custom_content_type(self):
|
||||
r = requests.post(httpbin('post'),
|
||||
@@ -445,8 +440,8 @@ class RequestsTestCase(unittest.TestCase):
|
||||
files={'file1': ('test_requests.py', open(__file__, 'rb')),
|
||||
'file2': ('test_requests', open(__file__, 'rb'),
|
||||
'text/py-content-type')})
|
||||
self.assertEqual(r.status_code, 200)
|
||||
self.assertTrue(b"text/py-content-type" in r.request.body)
|
||||
assert r.status_code == 200
|
||||
assert b"text/py-content-type" in r.request.body
|
||||
|
||||
def test_hook_receives_request_arguments(self):
|
||||
def hook(resp, **kwargs):
|
||||
@@ -467,7 +462,7 @@ class RequestsTestCase(unittest.TestCase):
|
||||
s.proxies = getproxies()
|
||||
resp = s.send(prep)
|
||||
|
||||
self.assertTrue(hasattr(resp, 'hook_working'))
|
||||
assert hasattr(resp, 'hook_working')
|
||||
|
||||
def test_prepared_from_session(self):
|
||||
class DummyAuth(requests.auth.AuthBase):
|
||||
@@ -476,7 +471,7 @@ class RequestsTestCase(unittest.TestCase):
|
||||
return r
|
||||
|
||||
req = requests.Request('GET', httpbin('headers'))
|
||||
self.assertEqual(req.auth, None)
|
||||
assert not req.auth
|
||||
|
||||
s = requests.Session()
|
||||
s.auth = DummyAuth()
|
||||
@@ -484,7 +479,7 @@ class RequestsTestCase(unittest.TestCase):
|
||||
prep = s.prepare_request(req)
|
||||
resp = s.send(prep)
|
||||
|
||||
self.assertTrue(resp.json()['headers']['Dummy-Auth-Test'], 'dummy-auth-test-ok')
|
||||
assert resp.json()['headers']['Dummy-Auth-Test'] == 'dummy-auth-test-ok'
|
||||
|
||||
def test_links(self):
|
||||
r = requests.Response()
|
||||
@@ -508,7 +503,7 @@ class RequestsTestCase(unittest.TestCase):
|
||||
'x-ratelimit-limit': '60',
|
||||
'x-ratelimit-remaining': '57'
|
||||
}
|
||||
self.assertEqual(r.links['next']['rel'], 'next')
|
||||
assert r.links['next']['rel'] == 'next'
|
||||
|
||||
def test_cookie_parameters(self):
|
||||
key = 'some_cookie'
|
||||
@@ -520,20 +515,20 @@ class RequestsTestCase(unittest.TestCase):
|
||||
jar = requests.cookies.RequestsCookieJar()
|
||||
jar.set(key, value, secure=secure, domain=domain, rest=rest)
|
||||
|
||||
self.assertEqual(len(jar), 1)
|
||||
self.assertTrue('some_cookie' in jar)
|
||||
assert len(jar) == 1
|
||||
assert 'some_cookie' in jar
|
||||
|
||||
cookie = list(jar)[0]
|
||||
self.assertEqual(cookie.secure, secure)
|
||||
self.assertEqual(cookie.domain, domain)
|
||||
self.assertEqual(cookie._rest['HttpOnly'], rest['HttpOnly'])
|
||||
assert cookie.secure == secure
|
||||
assert cookie.domain == domain
|
||||
assert cookie._rest['HttpOnly'] == rest['HttpOnly']
|
||||
|
||||
def test_time_elapsed_blank(self):
|
||||
r = requests.get(httpbin('get'))
|
||||
td = r.elapsed
|
||||
total_seconds = ((td.microseconds + (td.seconds + td.days * 24 * 3600)
|
||||
* 10**6) / 10**6)
|
||||
self.assertTrue(total_seconds > 0.0)
|
||||
assert total_seconds > 0.0
|
||||
|
||||
def test_response_is_iterable(self):
|
||||
r = requests.Response()
|
||||
@@ -544,27 +539,27 @@ class RequestsTestCase(unittest.TestCase):
|
||||
return read_(amt)
|
||||
setattr(io, 'read', read_mock)
|
||||
r.raw = io
|
||||
self.assertTrue(next(iter(r)))
|
||||
assert next(iter(r))
|
||||
io.close()
|
||||
|
||||
def test_get_auth_from_url(self):
|
||||
url = 'http://user:pass@complex.url.com/path?query=yes'
|
||||
self.assertEqual(('user', 'pass'),
|
||||
requests.utils.get_auth_from_url(url))
|
||||
assert ('user', 'pass') == requests.utils.get_auth_from_url(url)
|
||||
|
||||
def test_cannot_send_unprepared_requests(self):
|
||||
r = requests.Request(url=HTTPBIN)
|
||||
self.assertRaises(ValueError, requests.Session().send, r)
|
||||
with pytest.raises(ValueError):
|
||||
requests.Session().send(r)
|
||||
|
||||
def test_http_error(self):
|
||||
error = requests.exceptions.HTTPError()
|
||||
self.assertEqual(error.response, None)
|
||||
assert not error.response
|
||||
response = requests.Response()
|
||||
error = requests.exceptions.HTTPError(response=response)
|
||||
self.assertEqual(error.response, response)
|
||||
assert error.response == response
|
||||
error = requests.exceptions.HTTPError('message', response=response)
|
||||
self.assertEqual(str(error), 'message')
|
||||
self.assertEqual(error.response, response)
|
||||
assert str(error) == 'message'
|
||||
assert error.response == response
|
||||
|
||||
def test_session_pickling(self):
|
||||
r = requests.Request('GET', httpbin('get'))
|
||||
@@ -574,7 +569,7 @@ class RequestsTestCase(unittest.TestCase):
|
||||
s.proxies = getproxies()
|
||||
|
||||
r = s.send(r.prepare())
|
||||
self.assertEqual(r.status_code, 200)
|
||||
assert r.status_code == 200
|
||||
|
||||
def test_fixes_1329(self):
|
||||
"""
|
||||
@@ -585,30 +580,21 @@ class RequestsTestCase(unittest.TestCase):
|
||||
s.headers.update({'accept': 'application/json'})
|
||||
r = s.get(httpbin('get'))
|
||||
headers = r.request.headers
|
||||
self.assertEqual(
|
||||
headers['accept'],
|
||||
'application/json'
|
||||
)
|
||||
self.assertEqual(
|
||||
headers['Accept'],
|
||||
'application/json'
|
||||
)
|
||||
self.assertEqual(
|
||||
headers['ACCEPT'],
|
||||
'application/json'
|
||||
)
|
||||
assert headers['accept'] == 'application/json'
|
||||
assert headers['Accept'] == 'application/json'
|
||||
assert headers['ACCEPT'] == 'application/json'
|
||||
|
||||
def test_uppercase_scheme_redirect(self):
|
||||
parts = urlparse(httpbin('html'))
|
||||
url = "HTTP://" + parts.netloc + parts.path
|
||||
r = requests.get(httpbin('redirect-to'), params={'url': url})
|
||||
self.assertEqual(r.status_code, 200)
|
||||
self.assertEqual(r.url.lower(), url.lower())
|
||||
assert r.status_code == 200
|
||||
assert r.url.lower() == url.lower()
|
||||
|
||||
def test_transport_adapter_ordering(self):
|
||||
s = requests.Session()
|
||||
order = ['https://', 'http://']
|
||||
self.assertEqual(order, list(s.adapters))
|
||||
assert order == list(s.adapters)
|
||||
s.mount('http://git', HTTPAdapter())
|
||||
s.mount('http://github', HTTPAdapter())
|
||||
s.mount('http://github.com', HTTPAdapter())
|
||||
@@ -621,7 +607,7 @@ class RequestsTestCase(unittest.TestCase):
|
||||
'https://',
|
||||
'http://',
|
||||
]
|
||||
self.assertEqual(order, list(s.adapters))
|
||||
assert order == list(s.adapters)
|
||||
s.mount('http://gittip', HTTPAdapter())
|
||||
s.mount('http://gittip.com', HTTPAdapter())
|
||||
s.mount('http://gittip.com/about/', HTTPAdapter())
|
||||
@@ -636,12 +622,12 @@ class RequestsTestCase(unittest.TestCase):
|
||||
'https://',
|
||||
'http://',
|
||||
]
|
||||
self.assertEqual(order, list(s.adapters))
|
||||
assert order == list(s.adapters)
|
||||
s2 = requests.Session()
|
||||
s2.adapters = {'http://': HTTPAdapter()}
|
||||
s2.mount('https://', HTTPAdapter())
|
||||
self.assertTrue('http://' in s2.adapters)
|
||||
self.assertTrue('https://' in s2.adapters)
|
||||
assert 'http://' in s2.adapters
|
||||
assert 'https://' in s2.adapters
|
||||
|
||||
def test_header_remove_is_case_insensitive(self):
|
||||
# From issue #1321
|
||||
@@ -664,7 +650,7 @@ class RequestsTestCase(unittest.TestCase):
|
||||
'exactly-------------sixty-----------three------------characters',
|
||||
)
|
||||
r = requests.Request('GET', url).prepare()
|
||||
self.assertEqual(r.url, url)
|
||||
assert r.url == url
|
||||
|
||||
def test_header_keys_are_native(self):
|
||||
headers = {u'unicode': 'blah', 'byte'.encode('ascii'): 'blah'}
|
||||
@@ -673,8 +659,8 @@ class RequestsTestCase(unittest.TestCase):
|
||||
|
||||
# This is testing that they are builtin strings. A bit weird, but there
|
||||
# we go.
|
||||
self.assertTrue('unicode' in p.headers.keys())
|
||||
self.assertTrue('byte' in p.headers.keys())
|
||||
assert 'unicode' in p.headers.keys()
|
||||
assert 'byte' in p.headers.keys()
|
||||
|
||||
def test_can_send_nonstring_objects_with_files(self):
|
||||
data = {'a': 0.0}
|
||||
@@ -682,7 +668,7 @@ class RequestsTestCase(unittest.TestCase):
|
||||
r = requests.Request('POST', httpbin('post'), data=data, files=files)
|
||||
p = r.prepare()
|
||||
|
||||
self.assertTrue('multipart/form-data' in p.headers['Content-Type'])
|
||||
assert 'multipart/form-data' in p.headers['Content-Type']
|
||||
|
||||
def test_autoset_header_values_are_native(self):
|
||||
data = 'this is a string'
|
||||
@@ -690,42 +676,41 @@ class RequestsTestCase(unittest.TestCase):
|
||||
req = requests.Request('POST', httpbin('post'), data=data)
|
||||
p = req.prepare()
|
||||
|
||||
self.assertEqual(p.headers['Content-Length'], length)
|
||||
|
||||
assert p.headers['Content-Length'] == length
|
||||
|
||||
class TestContentEncodingDetection(unittest.TestCase):
|
||||
|
||||
def test_none(self):
|
||||
encodings = requests.utils.get_encodings_from_content('')
|
||||
self.assertEqual(len(encodings), 0)
|
||||
assert not len(encodings)
|
||||
|
||||
def test_html_charset(self):
|
||||
"""HTML5 meta charset attribute"""
|
||||
content = '<meta charset="UTF-8">'
|
||||
encodings = requests.utils.get_encodings_from_content(content)
|
||||
self.assertEqual(len(encodings), 1)
|
||||
self.assertEqual(encodings[0], 'UTF-8')
|
||||
assert len(encodings) == 1
|
||||
assert encodings[0] == 'UTF-8'
|
||||
|
||||
def test_html4_pragma(self):
|
||||
"""HTML4 pragma directive"""
|
||||
content = '<meta http-equiv="Content-type" content="text/html;charset=UTF-8">'
|
||||
encodings = requests.utils.get_encodings_from_content(content)
|
||||
self.assertEqual(len(encodings), 1)
|
||||
self.assertEqual(encodings[0], 'UTF-8')
|
||||
assert len(encodings) == 1
|
||||
assert encodings[0] == 'UTF-8'
|
||||
|
||||
def test_xhtml_pragma(self):
|
||||
"""XHTML 1.x served with text/html MIME type"""
|
||||
content = '<meta http-equiv="Content-type" content="text/html;charset=UTF-8" />'
|
||||
encodings = requests.utils.get_encodings_from_content(content)
|
||||
self.assertEqual(len(encodings), 1)
|
||||
self.assertEqual(encodings[0], 'UTF-8')
|
||||
assert len(encodings) == 1
|
||||
assert encodings[0] == 'UTF-8'
|
||||
|
||||
def test_xml(self):
|
||||
"""XHTML 1.x served as XML"""
|
||||
content = '<?xml version="1.0" encoding="UTF-8"?>'
|
||||
encodings = requests.utils.get_encodings_from_content(content)
|
||||
self.assertEqual(len(encodings), 1)
|
||||
self.assertEqual(encodings[0], 'UTF-8')
|
||||
assert len(encodings) == 1
|
||||
assert encodings[0] == 'UTF-8'
|
||||
|
||||
def test_precedence(self):
|
||||
content = '''
|
||||
@@ -734,44 +719,44 @@ class TestContentEncodingDetection(unittest.TestCase):
|
||||
<meta http-equiv="Content-type" content="text/html;charset=HTML4" />
|
||||
'''.strip()
|
||||
encodings = requests.utils.get_encodings_from_content(content)
|
||||
self.assertEqual(encodings, ['HTML5', 'HTML4', 'XML'])
|
||||
assert encodings == ['HTML5', 'HTML4', 'XML']
|
||||
|
||||
|
||||
class TestCaseInsensitiveDict(unittest.TestCase):
|
||||
|
||||
def test_mapping_init(self):
|
||||
cid = CaseInsensitiveDict({'Foo': 'foo','BAr': 'bar'})
|
||||
self.assertEqual(len(cid), 2)
|
||||
self.assertTrue('foo' in cid)
|
||||
self.assertTrue('bar' in cid)
|
||||
assert len(cid) == 2
|
||||
assert 'foo' in cid
|
||||
assert 'bar' in cid
|
||||
|
||||
def test_iterable_init(self):
|
||||
cid = CaseInsensitiveDict([('Foo', 'foo'), ('BAr', 'bar')])
|
||||
self.assertEqual(len(cid), 2)
|
||||
self.assertTrue('foo' in cid)
|
||||
self.assertTrue('bar' in cid)
|
||||
assert len(cid) == 2
|
||||
assert 'foo' in cid
|
||||
assert 'bar' in cid
|
||||
|
||||
def test_kwargs_init(self):
|
||||
cid = CaseInsensitiveDict(FOO='foo', BAr='bar')
|
||||
self.assertEqual(len(cid), 2)
|
||||
self.assertTrue('foo' in cid)
|
||||
self.assertTrue('bar' in cid)
|
||||
assert len(cid) == 2
|
||||
assert 'foo' in cid
|
||||
assert 'bar' in cid
|
||||
|
||||
def test_docstring_example(self):
|
||||
cid = CaseInsensitiveDict()
|
||||
cid['Accept'] = 'application/json'
|
||||
self.assertEqual(cid['aCCEPT'], 'application/json')
|
||||
self.assertEqual(list(cid), ['Accept'])
|
||||
assert cid['aCCEPT'] == 'application/json'
|
||||
assert list(cid) == ['Accept']
|
||||
|
||||
def test_len(self):
|
||||
cid = CaseInsensitiveDict({'a': 'a', 'b': 'b'})
|
||||
cid['A'] = 'a'
|
||||
self.assertEqual(len(cid), 2)
|
||||
assert len(cid) == 2
|
||||
|
||||
def test_getitem(self):
|
||||
cid = CaseInsensitiveDict({'Spam': 'blueval'})
|
||||
self.assertEqual(cid['spam'], 'blueval')
|
||||
self.assertEqual(cid['SPAM'], 'blueval')
|
||||
assert cid['spam'] == 'blueval'
|
||||
assert cid['SPAM'] == 'blueval'
|
||||
|
||||
def test_fixes_649(self):
|
||||
"""__setitem__ should behave case-insensitively."""
|
||||
@@ -780,74 +765,68 @@ class TestCaseInsensitiveDict(unittest.TestCase):
|
||||
cid['Spam'] = 'twoval'
|
||||
cid['sPAM'] = 'redval'
|
||||
cid['SPAM'] = 'blueval'
|
||||
self.assertEqual(cid['spam'], 'blueval')
|
||||
self.assertEqual(cid['SPAM'], 'blueval')
|
||||
self.assertEqual(list(cid.keys()), ['SPAM'])
|
||||
assert cid['spam'] == 'blueval'
|
||||
assert cid['SPAM'] == 'blueval'
|
||||
assert list(cid.keys()) == ['SPAM']
|
||||
|
||||
def test_delitem(self):
|
||||
cid = CaseInsensitiveDict()
|
||||
cid['Spam'] = 'someval'
|
||||
del cid['sPam']
|
||||
self.assertFalse('spam' in cid)
|
||||
self.assertEqual(len(cid), 0)
|
||||
assert 'spam' not in cid
|
||||
assert len(cid) == 0
|
||||
|
||||
def test_contains(self):
|
||||
cid = CaseInsensitiveDict()
|
||||
cid['Spam'] = 'someval'
|
||||
self.assertTrue('Spam' in cid)
|
||||
self.assertTrue('spam' in cid)
|
||||
self.assertTrue('SPAM' in cid)
|
||||
self.assertTrue('sPam' in cid)
|
||||
self.assertFalse('notspam' in cid)
|
||||
assert 'Spam' in cid
|
||||
assert 'spam' in cid
|
||||
assert 'SPAM' in cid
|
||||
assert 'sPam' in cid
|
||||
assert 'notspam' not in cid
|
||||
|
||||
def test_get(self):
|
||||
cid = CaseInsensitiveDict()
|
||||
cid['spam'] = 'oneval'
|
||||
cid['SPAM'] = 'blueval'
|
||||
self.assertEqual(cid.get('spam'), 'blueval')
|
||||
self.assertEqual(cid.get('SPAM'), 'blueval')
|
||||
self.assertEqual(cid.get('sPam'), 'blueval')
|
||||
self.assertEqual(cid.get('notspam', 'default'), 'default')
|
||||
assert cid.get('spam') == 'blueval'
|
||||
assert cid.get('SPAM') == 'blueval'
|
||||
assert cid.get('sPam') == 'blueval'
|
||||
assert cid.get('notspam', 'default') == 'default'
|
||||
|
||||
def test_update(self):
|
||||
cid = CaseInsensitiveDict()
|
||||
cid['spam'] = 'blueval'
|
||||
cid.update({'sPam': 'notblueval'})
|
||||
self.assertEqual(cid['spam'], 'notblueval')
|
||||
assert cid['spam'] == 'notblueval'
|
||||
cid = CaseInsensitiveDict({'Foo': 'foo','BAr': 'bar'})
|
||||
cid.update({'fOO': 'anotherfoo', 'bAR': 'anotherbar'})
|
||||
self.assertEqual(len(cid), 2)
|
||||
self.assertEqual(cid['foo'], 'anotherfoo')
|
||||
self.assertEqual(cid['bar'], 'anotherbar')
|
||||
assert len(cid) == 2
|
||||
assert cid['foo'] == 'anotherfoo'
|
||||
assert cid['bar'] == 'anotherbar'
|
||||
|
||||
def test_update_retains_unchanged(self):
|
||||
cid = CaseInsensitiveDict({'foo': 'foo', 'bar': 'bar'})
|
||||
cid.update({'foo': 'newfoo'})
|
||||
self.assertEquals(cid['bar'], 'bar')
|
||||
assert cid['bar'] == 'bar'
|
||||
|
||||
def test_iter(self):
|
||||
cid = CaseInsensitiveDict({'Spam': 'spam', 'Eggs': 'eggs'})
|
||||
keys = frozenset(['Spam', 'Eggs'])
|
||||
self.assertEqual(frozenset(iter(cid)), keys)
|
||||
assert frozenset(iter(cid)) == keys
|
||||
|
||||
def test_equality(self):
|
||||
cid = CaseInsensitiveDict({'SPAM': 'blueval', 'Eggs': 'redval'})
|
||||
othercid = CaseInsensitiveDict({'spam': 'blueval', 'eggs': 'redval'})
|
||||
self.assertEqual(cid, othercid)
|
||||
assert cid == othercid
|
||||
del othercid['spam']
|
||||
self.assertNotEqual(cid, othercid)
|
||||
self.assertEqual(cid, {'spam': 'blueval', 'eggs': 'redval'})
|
||||
assert cid != othercid
|
||||
assert cid == {'spam': 'blueval', 'eggs': 'redval'}
|
||||
|
||||
def test_setdefault(self):
|
||||
cid = CaseInsensitiveDict({'Spam': 'blueval'})
|
||||
self.assertEqual(
|
||||
cid.setdefault('spam', 'notblueval'),
|
||||
'blueval'
|
||||
)
|
||||
self.assertEqual(
|
||||
cid.setdefault('notspam', 'notblueval'),
|
||||
'notblueval'
|
||||
)
|
||||
assert cid.setdefault('spam', 'notblueval') == 'blueval'
|
||||
assert cid.setdefault('notspam', 'notblueval') == 'notblueval'
|
||||
|
||||
def test_lower_items(self):
|
||||
cid = CaseInsensitiveDict({
|
||||
@@ -856,7 +835,7 @@ class TestCaseInsensitiveDict(unittest.TestCase):
|
||||
})
|
||||
keyset = frozenset(lowerkey for lowerkey, v in cid.lower_items())
|
||||
lowerkeyset = frozenset(['accept', 'user-agent'])
|
||||
self.assertEqual(keyset, lowerkeyset)
|
||||
assert keyset == lowerkeyset
|
||||
|
||||
def test_preserve_key_case(self):
|
||||
cid = CaseInsensitiveDict({
|
||||
@@ -864,9 +843,9 @@ class TestCaseInsensitiveDict(unittest.TestCase):
|
||||
'user-Agent': 'requests',
|
||||
})
|
||||
keyset = frozenset(['Accept', 'user-Agent'])
|
||||
self.assertEqual(frozenset(i[0] for i in cid.items()), keyset)
|
||||
self.assertEqual(frozenset(cid.keys()), keyset)
|
||||
self.assertEqual(frozenset(cid), keyset)
|
||||
assert frozenset(i[0] for i in cid.items()) == keyset
|
||||
assert frozenset(cid.keys()) == keyset
|
||||
assert frozenset(cid) == keyset
|
||||
|
||||
def test_preserve_last_key_case(self):
|
||||
cid = CaseInsensitiveDict({
|
||||
@@ -876,9 +855,9 @@ class TestCaseInsensitiveDict(unittest.TestCase):
|
||||
cid.update({'ACCEPT': 'application/json'})
|
||||
cid['USER-AGENT'] = 'requests'
|
||||
keyset = frozenset(['ACCEPT', 'USER-AGENT'])
|
||||
self.assertEqual(frozenset(i[0] for i in cid.items()), keyset)
|
||||
self.assertEqual(frozenset(cid.keys()), keyset)
|
||||
self.assertEqual(frozenset(cid), keyset)
|
||||
assert frozenset(i[0] for i in cid.items()) == keyset
|
||||
assert frozenset(cid.keys()) == keyset
|
||||
assert frozenset(cid) == keyset
|
||||
|
||||
|
||||
class UtilsTestCase(unittest.TestCase):
|
||||
@@ -889,18 +868,18 @@ class UtilsTestCase(unittest.TestCase):
|
||||
from io import BytesIO
|
||||
from requests.utils import super_len
|
||||
|
||||
self.assertEqual(super_len(StringIO.StringIO()), 0)
|
||||
self.assertEqual(super_len(StringIO.StringIO('with so much drama in the LBC')), 29)
|
||||
assert super_len(StringIO.StringIO()) == 0
|
||||
assert super_len(StringIO.StringIO('with so much drama in the LBC')) == 29
|
||||
|
||||
self.assertEqual(super_len(BytesIO()), 0)
|
||||
self.assertEqual(super_len(BytesIO(b"it's kinda hard bein' snoop d-o-double-g")), 40)
|
||||
assert super_len(BytesIO()) == 0
|
||||
assert super_len(BytesIO(b"it's kinda hard bein' snoop d-o-double-g")) == 40
|
||||
|
||||
try:
|
||||
import cStringIO
|
||||
except ImportError:
|
||||
pass
|
||||
else:
|
||||
self.assertEqual(super_len(cStringIO.StringIO('but some how, some way...')), 25)
|
||||
assert super_len(cStringIO.StringIO('but some how, some way...')) == 25
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user