This commit is contained in:
2018-03-15 06:56:20 -04:00
parent 4bd079f871
commit 8a1baaf47b
23 changed files with 472 additions and 142 deletions
+1 -1
View File
@@ -22,7 +22,7 @@ tox = "*"
detox = "*"
httpbin = "==0.5.0"
pytest-mypy = "*"
black = {version="*", python_version="=='3.6'"}
white = {version="*"}
"e1839a8" = {path = ".", editable = true, extras=["socks"]}
mypy = "==0.540"
+3 -1
View File
@@ -46,7 +46,9 @@ from .exceptions import RequestsDependencyWarning
def check_compatibility(urllib3_version: str, chardet_version: str) -> None:
urllib3_version = urllib3_version.split('.')
assert urllib3_version != ['dev'] # Verify urllib3 isn't installed from git.
assert urllib3_version != [
'dev'
] # Verify urllib3 isn't installed from git.
# Sometimes, urllib3 only reports its version as 16.1.
if len(urllib3_version) == 2:
urllib3_version.append('0')
+36 -9
View File
@@ -117,7 +117,8 @@ def _pool_kwargs(verify, cert):
if key_file and not os.path.exists(key_file):
raise IOError(
"Could not find the TLS key file, " "invalid path: {0}".format(key_file)
"Could not find the TLS key file, "
"invalid path: {0}".format(key_file)
)
return pool_kwargs
@@ -130,7 +131,13 @@ class BaseAdapter(object):
super(BaseAdapter, self).__init__()
def send(
self, request, stream=False, timeout=None, verify=True, cert=None, proxies=None
self,
request,
stream=False,
timeout=None,
verify=True,
cert=None,
proxies=None,
):
"""Sends PreparedRequest object. Returns Response object.
@@ -180,7 +187,11 @@ class HTTPAdapter(BaseAdapter):
>>> s.mount('http://', a)
"""
__attrs__ = [
'max_retries', 'config', '_pool_connections', '_pool_maxsize', '_pool_block'
'max_retries',
'config',
'_pool_connections',
'_pool_maxsize',
'_pool_block',
]
def __init__(
@@ -323,12 +334,16 @@ class HTTPAdapter(BaseAdapter):
if proxy:
proxy = prepend_scheme_if_needed(proxy, 'http')
proxy_manager = self.proxy_manager_for(proxy)
conn = proxy_manager.connection_from_url(url, pool_kwargs=pool_kwargs)
conn = proxy_manager.connection_from_url(
url, pool_kwargs=pool_kwargs
)
else:
# Only scheme should be lower case
parsed = urlparse(url)
url = parsed.geturl()
conn = self.poolmanager.connection_from_url(url, pool_kwargs=pool_kwargs)
conn = self.poolmanager.connection_from_url(
url, pool_kwargs=pool_kwargs
)
return conn
def close(self):
@@ -397,11 +412,19 @@ class HTTPAdapter(BaseAdapter):
headers = {}
username, password = get_auth_from_url(proxy)
if username:
headers['Proxy-Authorization'] = _basic_auth_str(username, password)
headers['Proxy-Authorization'] = _basic_auth_str(
username, password
)
return headers
def send(
self, request, stream=False, timeout=None, verify=True, cert=None, proxies=None
self,
request,
stream=False,
timeout=None,
verify=True,
cert=None,
proxies=None,
):
"""Sends PreparedRequest object. Returns Response object.
@@ -421,7 +444,9 @@ class HTTPAdapter(BaseAdapter):
conn = self.get_connection(request.url, proxies, verify, cert)
url = self.request_url(request, proxies)
self.add_headers(request)
chunked = not (request.body is None or 'Content-Length' in request.headers)
chunked = not (
request.body is None or 'Content-Length' in request.headers
)
if isinstance(timeout, tuple):
try:
connect, read = timeout
@@ -460,7 +485,9 @@ class HTTPAdapter(BaseAdapter):
conn = conn.proxy_pool
low_conn = conn._get_conn(timeout=DEFAULT_POOL_TIMEOUT)
try:
low_conn.putrequest(request.method, url, skip_accept_encoding=True)
low_conn.putrequest(
request.method, url, skip_accept_encoding=True
)
for header, value in request.headers.items():
low_conn.putheader(header, value)
low_conn.endheaders()
+19 -5
View File
@@ -14,7 +14,11 @@ from .import types
def request(
method: types.Method, url: types.URL, *, session: types.Session = None, **kwargs
method: types.Method,
url: types.URL,
*,
session: types.Session = None,
**kwargs,
) -> types.Response:
"""Constructs and sends a :class:`Request <Request>`.
@@ -61,7 +65,9 @@ def request(
return session.request(method=method, url=url, **kwargs)
def get(url: types.URL, *, params: types.Params = None, **kwargs) -> types.Response:
def get(
url: types.URL, *, params: types.Params = None, **kwargs
) -> types.Response:
r"""Sends a GET request.
:param url: URL for the new :class:`Request` object.
@@ -99,7 +105,11 @@ def head(url: types.URL, **kwargs) -> types.Response:
def post(
url: types.URL, *, data: types.Data = None, json: types.JSON = None, **kwargs
url: types.URL,
*,
data: types.Data = None,
json: types.JSON = None,
**kwargs,
) -> types.Response:
r"""Sends a POST request.
@@ -113,7 +123,9 @@ def post(
return request('post', url, data=data, json=json, **kwargs)
def put(url: types.URL, *, data: types.Data = None, **kwargs) -> types.Response:
def put(
url: types.URL, *, data: types.Data = None, **kwargs
) -> types.Response:
r"""Sends a PUT request.
:param url: URL for the new :class:`Request` object.
@@ -126,7 +138,9 @@ def put(url: types.URL, *, data: types.Data = None, **kwargs) -> types.Response:
return request('put', url, data=data, **kwargs)
def patch(url: types.URL, *, data: types.Data = None, **kwargs) -> types.Response:
def patch(
url: types.URL, *, data: types.Data = None, **kwargs
) -> types.Response:
r"""Sends a PATCH request.
:param url: URL for the new :class:`Request` object.
+9 -3
View File
@@ -73,7 +73,9 @@ class HTTPBasicAuth(AuthBase):
return not self == other
def __call__(self, r):
r.headers['Authorization'] = _basic_auth_str(self.username, self.password)
r.headers['Authorization'] = _basic_auth_str(
self.username, self.password
)
return r
@@ -203,7 +205,9 @@ class HTTPDigestAuth(AuthBase):
if 'digest' in s_auth.lower() and self._thread_local.num_401_calls < 2:
self._thread_local.num_401_calls += 1
pat = re.compile(r'digest ', flags=re.IGNORECASE)
self._thread_local.chal = parse_dict_header(pat.sub('', s_auth, count=1))
self._thread_local.chal = parse_dict_header(
pat.sub('', s_auth, count=1)
)
# Consume content and release the original connection
# to allow our new request to reuse the same one.
r.content
@@ -227,7 +231,9 @@ class HTTPDigestAuth(AuthBase):
self.init_per_thread_state()
# If we have a saved nonce, skip the 401
if self._thread_local.last_nonce:
r.headers['Authorization'] = self.build_digest_header(r.method, r.url)
r.headers['Authorization'] = self.build_digest_header(
r.method, r.url
)
try:
self._thread_local.pos = r.body.tell()
except AttributeError:
+18 -6
View File
@@ -131,7 +131,9 @@ def extract_cookies_to_jar(jar, request, response):
:param request: our own requests.Request object
:param response: urllib3.HTTPResponse object
"""
if not (hasattr(response, '_original_response') and response._original_response):
if not (
hasattr(response, '_original_response') and response._original_response
):
return
# the _original_response field is the wrapped httplib.HTTPResponse object,
@@ -218,7 +220,10 @@ class RequestsCookieJar(cookielib.CookieJar, collections.MutableMapping):
# support client code that unsets cookies by assignment of a None value:
if value is None:
remove_cookie_by_name(
self, name, domain=kwargs.get('domain'), path=kwargs.get('path')
self,
name,
domain=kwargs.get('domain'),
path=kwargs.get('path'),
)
return
@@ -363,7 +368,9 @@ class RequestsCookieJar(cookielib.CookieJar, collections.MutableMapping):
'"'
):
cookie.value = cookie.value.replace('\\"', '')
return super(RequestsCookieJar, self).set_cookie(cookie, *args, **kwargs)
return super(RequestsCookieJar, self).set_cookie(
cookie, *args, **kwargs
)
def update(self, other):
"""Updates this jar with cookies from another CookieJar or dict-like"""
@@ -412,7 +419,8 @@ class RequestsCookieJar(cookielib.CookieJar, collections.MutableMapping):
if path is None or cookie.path == path:
if toReturn is not None: # if there are multiple cookies that meet passed in criteria
raise CookieConflictError(
'There are multiple cookies with name, %r' % (name)
'There are multiple cookies with name, %r' %
(name)
)
toReturn = cookie.value # we will eventually return this as long as no cookie conflict
@@ -502,7 +510,9 @@ def morsel_to_cookie(morsel):
elif morsel['expires']:
time_template = '%a, %d-%b-%Y %H:%M:%S GMT'
expires = calendar.timegm(time.strptime(morsel['expires'], time_template))
expires = calendar.timegm(
time.strptime(morsel['expires'], time_template)
)
return create_cookie(
comment=morsel['comment'],
comment_url=bool(morsel['comment']),
@@ -548,7 +558,9 @@ def merge_cookies(cookiejar, cookies):
raise ValueError('You can only merge into CookieJar')
if isinstance(cookies, dict):
cookiejar = cookiejar_from_dict(cookies, cookiejar=cookiejar, overwrite=False)
cookiejar = cookiejar_from_dict(
cookies, cookiejar=cookiejar, overwrite=False
)
elif isinstance(cookies, cookielib.CookieJar):
try:
cookiejar.update(cookies)
+5 -1
View File
@@ -18,7 +18,11 @@ class RequestException(IOError):
response = kwargs.pop('response', None)
self.response = response
self.request = kwargs.pop('request', None)
if (response is not None and not self.request and hasattr(response, 'request')):
if (
response is not None and
not self.request and
hasattr(response, 'request')
):
self.request = self.response.request
super(RequestException, self).__init__(*args, **kwargs)
+6 -2
View File
@@ -61,7 +61,9 @@ def _implementation() -> types.Help:
def info() -> types.Help:
"""Generate information for a bug report."""
try:
platform_info = {'system': platform.system(), 'release': platform.release()}
platform_info = {
'system': platform.system(), 'release': platform.release()
}
except IOError:
platform_info = {'system': 'Unknown', 'release': 'Unknown'}
implementation_info = _implementation()
@@ -77,7 +79,9 @@ def info() -> types.Help:
idna_info = {'version': getattr(idna, '__version__', '')}
# OPENSSL_VERSION_NUMBER doesn't exist in the Python 2.6 ssl module.
system_ssl = getattr(ssl, 'OPENSSL_VERSION_NUMBER', None)
system_ssl_info = {'version': '%x' % system_ssl if system_ssl is not None else ''}
system_ssl_info = {
'version': '%x' % system_ssl if system_ssl is not None else ''
}
return {
'platform': platform_info,
'implementation': implementation_info,
+22 -8
View File
@@ -458,7 +458,9 @@ class PreparedRequest(RequestEncodingMixin, RequestHooksMixin):
query = '%s&%s' % (query, enc_params)
else:
query = enc_params
url = requote_uri(urlunparse([scheme, netloc, path, None, query, fragment]))
url = requote_uri(
urlunparse([scheme, netloc, path, None, query, fragment])
)
self.url = url
def prepare_headers(self, headers):
@@ -707,14 +709,18 @@ class Response(object):
"""True if this Response is a well-formed HTTP redirect that could have
been processed automatically (by :meth:`Session.resolve_redirects`).
"""
return ('location' in self.headers and self.status_code in REDIRECT_STATI)
return (
'location' in self.headers and self.status_code in REDIRECT_STATI
)
@property
def is_permanent_redirect(self):
"""True if this Response one of the permanent versions of redirect."""
return (
'location' in self.headers and
self.status_code in (codes.moved_permanently, codes.permanent_redirect)
self.status_code in (
codes.moved_permanently, codes.permanent_redirect
)
)
@property
@@ -748,7 +754,9 @@ class Response(object):
# Special case for urllib3.
if hasattr(self.raw, 'stream'):
try:
for chunk in self.raw.stream(chunk_size, decode_content=True):
for chunk in self.raw.stream(
chunk_size, decode_content=True
):
yield chunk
except ProtocolError as e:
@@ -780,7 +788,8 @@ class Response(object):
elif chunk_size is not None and not isinstance(chunk_size, int):
raise TypeError(
"chunk_size must be an int, it is instead a %s." % type(chunk_size)
"chunk_size must be an int, it is instead a %s." %
type(chunk_size)
)
# simulate reading small chunks of the content
@@ -790,7 +799,8 @@ class Response(object):
if decode_unicode:
if self.encoding is None:
raise TypeError(
'encoding must be set before consuming streaming ' 'responses'
'encoding must be set before consuming streaming '
'responses'
)
# check encoding value here, don't wait for the generator to be
@@ -880,7 +890,9 @@ class Response(object):
if self._content is False:
# Read the contents.
if self._content_consumed:
raise RuntimeError('The content for this response was already consumed')
raise RuntimeError(
'The content for this response was already consumed'
)
if self.status_code == 0 or self.raw is None:
self._content = None
@@ -942,7 +954,9 @@ class Response(object):
encoding = guess_json_utf(self.content)
if encoding is not None:
try:
return complexjson.loads(self.content.decode(encoding), **kwargs)
return complexjson.loads(
self.content.decode(encoding), **kwargs
)
except UnicodeDecodeError:
# Wrong UTF codec detected; usually because it's not UTF-8
+28 -9
View File
@@ -75,7 +75,8 @@ def merge_setting(request_setting, session_setting, dict_class=OrderedDict):
# Bypass if not a dictionary (e.g. verify)
if not (
isinstance(session_setting, Mapping) and isinstance(request_setting, Mapping)
isinstance(session_setting, Mapping) and
isinstance(request_setting, Mapping)
):
return request_setting
@@ -166,7 +167,8 @@ class SessionRedirectMixin(object):
response.raw.read(decode_content=False)
if len(response.history) >= self.max_redirects:
raise TooManyRedirects(
'Exceeded %s redirects.' % self.max_redirects, response=response
'Exceeded %s redirects.' % self.max_redirects,
response=response,
)
# Release the connection back into the pool.
@@ -193,7 +195,9 @@ class SessionRedirectMixin(object):
# If method is changed to GET we need to remove body and associated headers.
if method_changed and prepared_request.method == 'GET':
# https://github.com/requests/requests/issues/3490
purged_headers = ('Content-Length', 'Content-Type', 'Transfer-Encoding')
purged_headers = (
'Content-Length', 'Content-Type', 'Transfer-Encoding'
)
for header in purged_headers:
prepared_request.headers.pop(header, None)
prepared_request.body = None
@@ -205,7 +209,9 @@ class SessionRedirectMixin(object):
# Extract any cookies sent on the response to the cookiejar
# in the new request. Because we've mutated our copied prepared
# request, use the old one that we haven't yet touched.
extract_cookies_to_jar(prepared_request._cookies, request, response.raw)
extract_cookies_to_jar(
prepared_request._cookies, request, response.raw
)
merge_cookies(prepared_request._cookies, self.cookies)
prepared_request.prepare_cookies(prepared_request._cookies)
# Rebuild auth and proxy information.
@@ -241,7 +247,9 @@ class SessionRedirectMixin(object):
response.history = history[:]
# append the new response to the history tracker for the next iteration
history.append(response)
extract_cookies_to_jar(self.cookies, prepared_request, response.raw)
extract_cookies_to_jar(
self.cookies, prepared_request, response.raw
)
# extract redirect url, if any, for the next loop
location_url = self.get_redirect_target(response)
yield response
@@ -298,7 +306,9 @@ class SessionRedirectMixin(object):
except KeyError:
username, password = None, None
if username and password:
headers['Proxy-Authorization'] = _basic_auth_str(username, password)
headers['Proxy-Authorization'] = _basic_auth_str(
username, password
)
return new_proxies
def rebuild_method(self, prepared_request, response):
@@ -317,7 +327,9 @@ class SessionRedirectMixin(object):
# of HTTP RFCs. While some browsers transform other methods to GET, little of
# that has been standardized. For that reason, we're using curl as a model
# which only supports POST->GET.
if response.status_code in (codes.found, codes.moved) and method == 'POST':
if response.status_code in (
codes.found, codes.moved
) and method == 'POST':
method = 'GET'
prepared_request.method = method
return method != original_method
@@ -642,7 +654,9 @@ class Session(SessionRedirectMixin):
if not allow_redirects:
try:
r._next = next(
self.resolve_redirects(r, request, yield_requests=True, **kwargs)
self.resolve_redirects(
r, request, yield_requests=True, **kwargs
)
)
except StopIteration:
pass
@@ -685,7 +699,12 @@ class Session(SessionRedirectMixin):
env_proxies = get_environ_proxies(url, no_proxy=no_proxy) or {}
new_proxies = merge_setting(self.proxies, env_proxies)
proxies = merge_setting(proxies, new_proxies)
return {'verify': verify, 'proxies': proxies, 'stream': stream, 'cert': cert}
return {
'verify': verify,
'proxies': proxies,
'stream': stream,
'cert': cert,
}
def get_adapter(self, url):
"""
+11 -3
View File
@@ -37,7 +37,9 @@ _codes = {
404: ('not_found', '-o-'),
405: ('method_not_allowed', 'not_allowed'),
406: ('not_acceptable',),
407: ('proxy_authentication_required', 'proxy_auth', 'proxy_authentication'),
407: (
'proxy_authentication_required', 'proxy_auth', 'proxy_authentication'
),
408: ('request_timeout', 'timeout'),
409: ('conflict',),
410: ('gone',),
@@ -47,7 +49,9 @@ _codes = {
414: ('request_uri_too_large',),
415: ('unsupported_media_type', 'unsupported_media', 'media_type'),
416: (
'requested_range_not_satisfiable', 'requested_range', 'range_not_satisfiable'
'requested_range_not_satisfiable',
'requested_range',
'range_not_satisfiable',
),
417: ('expectation_failed',),
418: ('im_a_teapot', 'teapot', 'i_am_a_teapot'),
@@ -76,7 +80,11 @@ _codes = {
507: ('insufficient_storage',),
509: ('bandwidth_limit_exceeded', 'bandwidth'),
510: ('not_extended',),
511: ('network_authentication_required', 'network_auth', 'network_authentication'),
511: (
'network_authentication_required',
'network_auth',
'network_authentication',
),
}
codes = LookupDict(name='status_codes')
for code, titles in _codes.items():
+3 -1
View File
@@ -61,7 +61,9 @@ class CaseInsensitiveDict(collections.MutableMapping):
def lower_items(self):
"""Like iteritems(), but with all lowercase keys."""
return ((lowerkey, keyval[1]) for (lowerkey, keyval) in self._store.items())
return (
(lowerkey, keyval[1]) for (lowerkey, keyval) in self._store.items()
)
def __eq__(self, other):
if isinstance(other, collections.Mapping):
+4 -1
View File
@@ -50,7 +50,10 @@ Headers = Optional[Union[None, MutableMapping[Text, Text]]]
Cookies = Optional[Union[None, RequestsCookieJar, MutableMapping[Text, Text]]]
Files = Optional[MutableMapping[Text, IO]]
Auth = Union[
None, Tuple[Text, Text], auth.AuthBase, Callable[[PreparedRequest], PreparedRequest]
None,
Tuple[Text, Text],
auth.AuthBase,
Callable[[PreparedRequest], PreparedRequest],
]
Timeout = Union[None, float, Tuple[float, float]]
AllowRedirects = Optional[bool]
+36 -11
View File
@@ -58,8 +58,14 @@ if platform.system() == 'Windows':
winreg.HKEY_CURRENT_USER,
r'Software\Microsoft\Windows\CurrentVersion\Internet Settings',
)
proxyEnable = winreg.QueryValueEx(internetSettings, 'ProxyEnable')[0]
proxyOverride = winreg.QueryValueEx(internetSettings, 'ProxyOverride')[0]
proxyEnable = winreg.QueryValueEx(internetSettings, 'ProxyEnable')[
0
]
proxyOverride = winreg.QueryValueEx(
internetSettings, 'ProxyOverride'
)[
0
]
except OSError:
return False
@@ -210,7 +216,12 @@ def get_netrc_auth(url, raise_errors=False):
def guess_filename(obj):
"""Tries to guess the filename of the given object."""
name = getattr(obj, 'name', None)
if (name and isinstance(name, basestring) and name[0] != '<' and name[-1] != '>'):
if (
name and
isinstance(name, basestring) and
name[0] != '<' and
name[-1] != '>'
):
return os.path.basename(name)
@@ -401,7 +412,9 @@ def get_encodings_from_content(content):
DeprecationWarning,
)
charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I)
pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I)
pragma_re = re.compile(
r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I
)
xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]')
return (
charset_re.findall(content) +
@@ -570,7 +583,9 @@ def address_in_network(ip, net):
"""
ipaddr = struct.unpack('=L', socket.inet_aton(ip))[0]
netaddr, bits = net.split('/')
netmask = struct.unpack('=L', socket.inet_aton(dotted_netmask(int(bits))))[0]
netmask = struct.unpack('=L', socket.inet_aton(dotted_netmask(int(bits))))[
0
]
network = struct.unpack('=L', socket.inet_aton(netaddr))[0] & netmask
return ( ipaddr & netmask) == ( network & netmask)
@@ -663,7 +678,9 @@ def should_bypass_proxies(url, no_proxy):
if no_proxy:
# We need to check whether we match here. We need to see if we match
# the end of the netloc, both with and without the port.
no_proxy = (host for host in no_proxy.replace(' ', '').split(',') if host)
no_proxy = (
host for host in no_proxy.replace(' ', '').split(',') if host
)
ip = netloc.split(':')[0]
if is_ipv4_address(ip):
for proxy_ip in no_proxy:
@@ -678,7 +695,9 @@ def should_bypass_proxies(url, no_proxy):
else:
for host in no_proxy:
if netloc.endswith(host) or netloc.split(':')[0].endswith(host):
if netloc.endswith(host) or netloc.split(':')[0].endswith(
host
):
# The URL does match something in no_proxy, so we don't want
# to apply the proxies on this URL.
return True
@@ -888,7 +907,8 @@ def check_header_validity(header):
try:
if not pat.match(value):
raise InvalidHeader(
"Invalid return character or leading space in header: %s" % name
"Invalid return character or leading space in header: %s" %
name
)
except TypeError:
@@ -924,15 +944,20 @@ def rewind_body(prepared_request):
body_seek(prepared_request._body_position)
except (IOError, OSError):
raise UnrewindableBodyError(
"An error occurred when rewinding request " "body for redirect."
"An error occurred when rewinding request "
"body for redirect."
)
else:
raise UnrewindableBodyError("Unable to rewind request body for redirect.")
raise UnrewindableBodyError(
"Unable to rewind request body for redirect."
)
def is_stream(data):
"""Given data, determines if it should be sent as a stream."""
is_iterable = getattr(data, '__iter__', False)
is_io_type = not isinstance(data, (basestring, list, tuple, collections.Mapping))
is_io_type = not isinstance(
data, (basestring, list, tuple, collections.Mapping)
)
return is_iterable and is_io_type
+3 -1
View File
@@ -111,6 +111,8 @@ setup(
extras_require={
'security': ['pyOpenSSL>=0.14', 'cryptography>=1.3.4', 'idna>=2.0.0'],
'socks': ['PySocks>=1.5.6, !=1.5.7'],
'socks:sys_platform == "win32" and python_version == "2.7"': ['win_inet_pton'],
'socks:sys_platform == "win32" and python_version == "2.7"': [
'win_inet_pton'
],
},
)
+6 -2
View File
@@ -6,7 +6,9 @@ import pytest
from requests.help import info
@pytest.mark.skipif(sys.version_info[:2] != (2, 6), reason="Only run on Python 2.6")
@pytest.mark.skipif(
sys.version_info[:2] != (2, 6), reason="Only run on Python 2.6"
)
def test_system_ssl_py26():
"""OPENSSL_VERSION_NUMBER isn't provided in Python 2.6, verify we don't
blow up in this case.
@@ -14,7 +16,9 @@ def test_system_ssl_py26():
assert info()['system_ssl'] == {'version': ''}
@pytest.mark.skipif(sys.version_info < (2, 7), reason="Only run on Python 2.7+")
@pytest.mark.skipif(
sys.version_info < (2, 7), reason="Only run on Python 2.7+"
)
def test_system_ssl():
"""Verify we're actually setting system_ssl when it should be available."""
assert info()['system_ssl']['version'] != ''
+3 -1
View File
@@ -12,7 +12,9 @@ def hook(value):
'hooks_list, result', ((hook, 'ata'), ([hook, lambda x: None, hook], 'ta'))
)
def test_hooks(hooks_list, result):
assert hooks.dispatch_hook('response', {'response': hooks_list}, 'Data') == result
assert hooks.dispatch_hook(
'response', {'response': hooks_list}, 'Data'
) == result
def test_default_hooks():
+11 -4
View File
@@ -51,7 +51,9 @@ def test_digestauth_401_count_reset_on_redirect():
b'realm="me@kennethreitz.com", qop=auth\r\n\r\n'
)
text_302 = (
b'HTTP/1.1 302 FOUND\r\n' b'Content-Length: 0\r\n' b'Location: /\r\n\r\n'
b'HTTP/1.1 302 FOUND\r\n'
b'Content-Length: 0\r\n'
b'Location: /\r\n\r\n'
)
text_200 = (b'HTTP/1.1 200 OK\r\n' b'Content-Length: 0\r\n\r\n')
expected_digest = (
@@ -130,7 +132,9 @@ def test_digestauth_401_only_sent_once():
return request_content
close_server = threading.Event()
server = Server(digest_failed_response_handler, wait_to_close_event=close_server)
server = Server(
digest_failed_response_handler, wait_to_close_event=close_server
)
with server as (host, port):
url = 'http://{0}:{1}/'.format(host, port)
r = requests.get(url, auth=auth)
@@ -188,7 +192,8 @@ _proxy_combos += [(var.upper(), scheme) for var, scheme in _proxy_combos]
@pytest.mark.parametrize("var,scheme", _proxy_combos)
def test_use_proxy_from_environment(httpbin, var, scheme):
url = "{0}://httpbin.org".format(scheme)
fake_proxy = Server() # do nothing with the requests; just close the socket
fake_proxy = Server(
) # do nothing with the requests; just close the socket
with fake_proxy as (host, port):
proxy_url = "socks5://{0}:{1}".format(host, port)
kwargs = {var: proxy_url}
@@ -227,6 +232,8 @@ def test_redirect_rfc1808_to_non_ascii_location():
assert r.status_code == 200
assert len(r.history) == 1
assert r.history[0].status_code == 301
assert redirect_request[0].startswith(b'GET /' + expected_path + b' HTTP/1.1')
assert redirect_request[0].startswith(
b'GET /' + expected_path + b' HTTP/1.1'
)
assert r.url == u'{0}/{1}'.format(url, expected_path.decode('ascii'))
close_server.set()
+178 -54
View File
@@ -15,7 +15,9 @@ import pytest
import pytest_httpbin
from requests.adapters import HTTPAdapter
from requests.auth import HTTPDigestAuth, _basic_auth_str
from requests.basics import ( Morsel, cookielib, getproxies, str, urlparse, builtin_str)
from requests.basics import (
Morsel, cookielib, getproxies, str, urlparse, builtin_str
)
from requests.cookies import ( cookiejar_from_dict, morsel_to_cookie)
from requests.exceptions import (
ConnectionError,
@@ -123,17 +125,24 @@ class TestRequests:
@pytest.mark.parametrize('method', ('POST', 'PUT', 'PATCH', 'OPTIONS'))
def test_empty_content_length(self, httpbin, method):
req = requests.Request(method, httpbin(method.lower()), data='').prepare()
req = requests.Request(
method, httpbin(method.lower()), data=''
).prepare(
)
assert req.headers['Content-Length'] == '0'
def test_override_content_length(self, httpbin):
headers = {'Content-Length': 'not zero'}
r = requests.Request('POST', httpbin('post'), headers=headers).prepare()
r = requests.Request('POST', httpbin('post'), headers=headers).prepare(
)
assert 'Content-Length' in r.headers
assert r.headers['Content-Length'] == 'not zero'
def test_path_is_not_double_encoded(self):
request = requests.Request('GET', "http://0.0.0.0/get/test case").prepare()
request = requests.Request(
'GET', "http://0.0.0.0/get/test case"
).prepare(
)
assert request.path_url == '/get/test%20case'
@pytest.mark.parametrize(
@@ -183,7 +192,9 @@ class TestRequests:
request = requests.Request('GET', ' http://example.com').prepare()
assert request.url == 'http://example.com/'
@pytest.mark.parametrize('scheme', ('http://', 'HTTP://', 'hTTp://', 'HttP://'))
@pytest.mark.parametrize(
'scheme', ('http://', 'HTTP://', 'hTTp://', 'HttP://')
)
def test_mixed_case_scheme_acceptable(self, httpbin, scheme):
s = requests.Session()
s.proxies = getproxies()
@@ -238,7 +249,9 @@ class TestRequests:
assert e.response.url == url
assert len(e.response.history) == 30
else:
pytest.fail('Expected redirect to raise TooManyRedirects but it did not')
pytest.fail(
'Expected redirect to raise TooManyRedirects but it did not'
)
def test_HTTP_302_TOO_MANY_REDIRECTS_WITH_PARAMS(self, httpbin):
s = requests.session()
@@ -266,7 +279,9 @@ class TestRequests:
('DELETE', '', 'DELETE'),
),
)
def test_http_301_for_redirectable_methods(self, httpbin, method, body, expected):
def test_http_301_for_redirectable_methods(
self, httpbin, method, body, expected
):
"""Tests all methods except OPTIONS for expected redirect behaviour.
OPTIONS responses can behave differently depending on the server, so
@@ -274,7 +289,9 @@ class TestRequests:
to them. For that reason they aren't included here.
"""
params = {'url': '/%s' % expected.lower(), 'status_code': '301'}
r = requests.request(method, httpbin('redirect-to'), data=body, params=params)
r = requests.request(
method, httpbin('redirect-to'), data=body, params=params
)
assert r.request.url == httpbin(expected.lower())
assert r.request.method == expected
assert r.history[0].status_code == 301
@@ -295,7 +312,9 @@ class TestRequests:
('DELETE', '', 'DELETE'),
),
)
def test_http_302_for_redirectable_methods(self, httpbin, method, body, expected):
def test_http_302_for_redirectable_methods(
self, httpbin, method, body, expected
):
"""Tests all methods except OPTIONS for expected redirect behaviour.
OPTIONS responses can behave differently depending on the server, so
@@ -303,7 +322,9 @@ class TestRequests:
to them. For that reason they aren't included here.
"""
params = {'url': '/%s' % expected.lower()}
r = requests.request(method, httpbin('redirect-to'), data=body, params=params)
r = requests.request(
method, httpbin('redirect-to'), data=body, params=params
)
assert r.request.url == httpbin(expected.lower())
assert r.request.method == expected
assert r.history[0].status_code == 302
@@ -324,7 +345,9 @@ class TestRequests:
('DELETE', '', 'GET'),
),
)
def test_http_303_for_redirectable_methods(self, httpbin, method, body, expected):
def test_http_303_for_redirectable_methods(
self, httpbin, method, body, expected
):
"""Tests all methods except OPTIONS for expected redirect behaviour.
OPTIONS responses can behave differently depending on the server, so
@@ -332,7 +355,9 @@ class TestRequests:
to them. For that reason they aren't included here.
"""
params = {'url': '/%s' % expected.lower(), 'status_code': '303'}
r = requests.request(method, httpbin('redirect-to'), data=body, params=params)
r = requests.request(
method, httpbin('redirect-to'), data=body, params=params
)
assert r.request.url == httpbin(expected.lower())
assert r.request.method == expected
assert r.history[0].status_code == 303
@@ -341,7 +366,8 @@ class TestRequests:
def test_multiple_location_headers(self, httpbin):
headers = [
('Location', 'http://example.com'), ('Location', 'https://example.com/1')
('Location', 'http://example.com'),
('Location', 'https://example.com/1'),
]
params = '&'.join(['%s=%s' % (k, v) for k, v in headers])
ses = requests.Session()
@@ -372,7 +398,9 @@ class TestRequests:
def test_transfer_enc_removal_on_redirect(self, httpbin):
purged_headers = ('Transfer-Encoding', 'Content-Type')
ses = requests.Session()
req = requests.Request('POST', httpbin('post'), data=(b'x' for x in range(1)))
req = requests.Request(
'POST', httpbin('post'), data=(b'x' for x in range(1))
)
prep = ses.prepare_request(req)
assert 'Transfer-Encoding' in prep.headers
# Create Response to avoid https://github.com/kevin1024/pytest-httpbin/issues/33
@@ -420,7 +448,9 @@ class TestRequests:
assert s.cookies['foo'] == 'bar'
s.get(
httpbin('response-headers'),
params={'Set-Cookie': 'foo=deleted; expires=Thu, 01-Jan-1970 00:00:01 GMT'},
params={
'Set-Cookie': 'foo=deleted; expires=Thu, 01-Jan-1970 00:00:01 GMT'
},
)
assert 'foo' not in s.cookies
@@ -489,7 +519,9 @@ class TestRequests:
# Verify CookieJar isn't being converted to RequestsCookieJar
assert isinstance(prep_req._cookies, cookielib.CookieJar)
assert isinstance(resp.request._cookies, cookielib.CookieJar)
assert not isinstance(resp.request._cookies, requests.cookies.RequestsCookieJar)
assert not isinstance(
resp.request._cookies, requests.cookies.RequestsCookieJar
)
cookies = {}
for c in resp.request._cookies:
cookies[c.name] = c.value
@@ -594,16 +626,23 @@ class TestRequests:
@pytest.mark.parametrize(
'username, password',
(('user', 'pass'), (u'имя'.encode('utf-8'), u'пароль'.encode('utf-8'))),
(
('user', 'pass'),
(u'имя'.encode('utf-8'), u'пароль'.encode('utf-8')),
),
)
def test_set_basicauth(self, httpbin, username, password):
auth = (username, password)
url = httpbin('get')
r = requests.Request('GET', url, auth=auth)
p = r.prepare()
assert p.headers['Authorization'] == _basic_auth_str(username, password)
assert p.headers['Authorization'] == _basic_auth_str(
username, password
)
@pytest.mark.parametrize('username, password', (('user', 1234), (None, 'test')))
@pytest.mark.parametrize(
'username, password', (('user', 1234), (None, 'test'))
)
def test_non_str_basicauth(self, username, password):
"""Ensure we only allow string or bytes values for basicauth"""
with pytest.raises(TypeError) as e:
@@ -634,7 +673,8 @@ class TestRequests:
# any proxy related error (address resolution, no route to host, etc) should result in a ProxyError
with pytest.raises(ProxyError):
requests.get(
'http://localhost:1', proxies={'http': 'non-resolvable-address'}
'http://localhost:1',
proxies={'http': 'non-resolvable-address'},
)
def test_basicauth_with_netrc(self, httpbin):
@@ -779,7 +819,9 @@ class TestRequests:
post1 = requests.post(url, data={'some': 'data'})
assert post1.status_code == 200
with open('Pipfile') as f:
post2 = requests.post(url, data={'some': 'data'}, files={'some': f})
post2 = requests.post(
url, data={'some': 'data'}, files={'some': f}
)
assert post2.status_code == 200
post4 = requests.post(url, data='[{"some": "json"}]')
assert post4.status_code == 200
@@ -904,17 +946,23 @@ class TestRequests:
warnings_expected = ('SubjectAltNameWarning',)
else:
warnings_expected = (
'SNIMissingWarning', 'InsecurePlatformWarning', 'SubjectAltNameWarning'
'SNIMissingWarning',
'InsecurePlatformWarning',
'SubjectAltNameWarning',
)
with pytest.warns(None) as warning_records:
warnings.simplefilter('always')
requests.get(httpbin_secure('status', '200'), verify=httpbin_ca_bundle)
requests.get(
httpbin_secure('status', '200'), verify=httpbin_ca_bundle
)
warning_records = [
item
for item in warning_records
if item.category.__name__ != 'ResourceWarning'
]
warnings_category = tuple(item.category.__name__ for item in warning_records)
warnings_category = tuple(
item.category.__name__ for item in warning_records
)
assert warnings_category == warnings_expected
def test_certificate_failure(self, httpbin_secure):
@@ -971,7 +1019,9 @@ class TestRequests:
def test_unicode_method_name(self, httpbin):
files = {'file': open(__file__, 'rb')}
r = requests.request(method=u('POST'), url=httpbin('post'), files=files)
r = requests.request(
method=u('POST'), url=httpbin('post'), files=files
)
assert r.status_code == 200
def test_unicode_method_name_with_request_object(self, httpbin):
@@ -998,7 +1048,9 @@ class TestRequests:
files={
'file1': ('test_requests.py', open(__file__, 'rb')),
'file2': (
'test_requests', open(__file__, 'rb'), 'text/py-content-type'
'test_requests',
open(__file__, 'rb'),
'text/py-content-type',
),
},
)
@@ -1062,7 +1114,9 @@ class TestRequests:
s.auth = DummyAuth()
prep = s.prepare_request(req)
resp = s.send(prep)
assert resp.json()['headers']['Dummy-Auth-Test'] == 'dummy-auth-test-ok'
assert resp.json()['headers'][
'Dummy-Auth-Test'
] == 'dummy-auth-test-ok'
def test_prepare_request_with_bytestring_url(self):
req = requests.Request('GET', b'https://httpbin.org/')
@@ -1223,7 +1277,9 @@ class TestRequests:
r = requests.get(httpbin('get'))
td = r.elapsed
total_seconds = (
(td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6) / 10 ** 6
(td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6) /
10 **
6
)
assert total_seconds > 0.0
@@ -1262,7 +1318,8 @@ class TestRequests:
assert all(isinstance(chunk, str) for chunk in chunks)
@pytest.mark.parametrize(
'encoding, exception', ((None, TypeError), ('invalid encoding', LookupError))
'encoding, exception',
((None, TypeError), ('invalid encoding', LookupError)),
)
def test_decode_unicode_encoding(self, encoding, exception):
# raise an exception if encoding isn't set
@@ -1352,7 +1409,9 @@ class TestRequests:
r._content_consumed = True
r.iter_content = mock_iter_content
# decode_unicode=None, output raw bytes
assert list(r.iter_lines(delimiter=b'\r\n')) == mock_data.split(b'\r\n')
assert list(r.iter_lines(delimiter=b'\r\n')) == mock_data.split(
b'\r\n'
)
# decode_unicode=True, output unicode strings
assert list(
r.iter_lines(decode_unicode=True, delimiter=u'\r\n')
@@ -1375,7 +1434,10 @@ class TestRequests:
assert list(r.iter_lines()) == mock_data.splitlines()
# decode_unicode=True, output unicode strings
unicode_mock_data = mock_data.decode('utf-8')
assert list(r.iter_lines(decode_unicode=True)) == unicode_mock_data.splitlines()
assert list(
r.iter_lines(decode_unicode=True)
) == unicode_mock_data.splitlines(
)
@pytest.mark.parametrize(
'content, expected_no_delimiter, expected_delimiter',
@@ -1568,7 +1630,10 @@ class TestRequests:
def test_header_validation(self, httpbin):
"""Ensure prepare_headers regex isn't flagging valid header contents."""
headers_ok = {
'foo': 'bar baz qux', 'bar': u'fbbq'.encode('utf8'), 'baz': '', 'qux': '1'
'foo': 'bar baz qux',
'bar': u'fbbq'.encode('utf8'),
'baz': '',
'qux': '1',
}
r = requests.get(httpbin('get'), headers=headers_ok)
assert r.request.headers['foo'] == headers_ok['foo']
@@ -1693,13 +1758,19 @@ class TestRequests:
def test_prepare_body_position_non_stream(self):
data = b'the data'
s = requests.Session()
prep = requests.Request('GET', 'http://example.com', data=data).prepare()
prep = requests.Request(
'GET', 'http://example.com', data=data
).prepare(
)
assert prep._body_position is None
def test_rewind_body(self):
data = io.BytesIO(b'the data')
s = requests.Session()
prep = requests.Request('GET', 'http://example.com', data=data).prepare()
prep = requests.Request(
'GET', 'http://example.com', data=data
).prepare(
)
assert prep._body_position == 0
assert prep.body.read() == b'the data'
# the data has all been read
@@ -1712,7 +1783,10 @@ class TestRequests:
data = io.BytesIO(b'the data')
s = requests.Session()
data.read(4) # read some data
prep = requests.Request('GET', 'http://example.com', data=data).prepare()
prep = requests.Request(
'GET', 'http://example.com', data=data
).prepare(
)
assert prep._body_position == 4
assert prep.body.read() == b'data'
# the data has all been read
@@ -1736,7 +1810,10 @@ class TestRequests:
data = BadFileObj('the data')
s = requests.Session()
prep = requests.Request('GET', 'http://example.com', data=data).prepare()
prep = requests.Request(
'GET', 'http://example.com', data=data
).prepare(
)
assert prep._body_position == 0
with pytest.raises(UnrewindableBodyError) as e:
requests.utils.rewind_body(prep)
@@ -1760,7 +1837,10 @@ class TestRequests:
data = BadFileObj('the data')
s = requests.Session()
prep = requests.Request('GET', 'http://example.com', data=data).prepare()
prep = requests.Request(
'GET', 'http://example.com', data=data
).prepare(
)
assert prep._body_position == 0
with pytest.raises(UnrewindableBodyError) as e:
requests.utils.rewind_body(prep)
@@ -1781,7 +1861,10 @@ class TestRequests:
data = BadFileObj('the data')
s = requests.Session()
prep = requests.Request('GET', 'http://example.com', data=data).prepare()
prep = requests.Request(
'GET', 'http://example.com', data=data
).prepare(
)
assert prep._body_position is not None
with pytest.raises(UnrewindableBodyError) as e:
requests.utils.rewind_body(prep)
@@ -1818,7 +1901,9 @@ class TestRequests:
),
),
)
def test_basic_auth_str_is_always_native(self, username, password, auth_str):
def test_basic_auth_str_is_always_native(
self, username, password, auth_str
):
s = _basic_auth_str(username, password)
assert isinstance(s, builtin_str)
assert s == auth_str
@@ -1863,7 +1948,9 @@ class TestRequests:
def test_unconsumed_session_response_closes_connection(self, httpbin):
s = requests.session()
with contextlib.closing(s.get(httpbin('stream/4'), stream=True)) as response:
with contextlib.closing(
s.get(httpbin('stream/4'), stream=True)
) as response:
pass
assert response._content_consumed is False
assert response.raw.closed
@@ -1987,7 +2074,9 @@ class TestRequests:
assert resp_with_cert.raw._pool.key_file == key
assert resp.raw._pool is not resp_with_cert.raw._pool
def test_empty_stream_with_auth_does_not_set_content_length_header(self, httpbin):
def test_empty_stream_with_auth_does_not_set_content_length_header(
self, httpbin
):
"""Ensure that a byte stream with size 0 will not set both a Content-Length
and Transfer-Encoding header.
"""
@@ -1999,7 +2088,9 @@ class TestRequests:
assert 'Transfer-Encoding' in prepared_request.headers
assert 'Content-Length' not in prepared_request.headers
def test_stream_with_auth_does_not_set_transfer_encoding_header(self, httpbin):
def test_stream_with_auth_does_not_set_transfer_encoding_header(
self, httpbin
):
"""Ensure that a byte stream with size > 0 will not set both a Content-Length
and Transfer-Encoding header.
"""
@@ -2031,7 +2122,9 @@ class TestRequests:
data = (i for i in [b'a', b'b', b'c'])
url = httpbin('post')
with pytest.raises(InvalidHeader):
r = requests.post(url, data=data, headers={'Content-Length': 'foo'})
r = requests.post(
url, data=data, headers={'Content-Length': 'foo'}
)
def test_content_length_with_manually_set_transfer_encoding_raises_error(
self, httpbin
@@ -2042,7 +2135,9 @@ class TestRequests:
data = 'test data'
url = httpbin('post')
with pytest.raises(InvalidHeader):
r = requests.post(url, data=data, headers={'Transfer-Encoding': 'chunked'})
r = requests.post(
url, data=data, headers={'Transfer-Encoding': 'chunked'}
)
def test_null_body_does_not_raise_error(self, httpbin):
url = httpbin('post')
@@ -2095,7 +2190,9 @@ class TestRequests:
"""
url_final = httpbin('html')
querystring_malformed = urlencode({'location': url_final})
url_redirect_malformed = httpbin('response-headers?%s' % querystring_malformed)
url_redirect_malformed = httpbin(
'response-headers?%s' % querystring_malformed
)
querystring_redirect = urlencode({'url': url_redirect_malformed})
url_redirect = httpbin('redirect-to?%s' % querystring_redirect)
urls_test = [url_redirect, url_redirect_malformed, url_final]
@@ -2333,14 +2430,19 @@ class TestTimeout:
@pytest.mark.parametrize(
'timeout, error_text',
(((3, 4, 5), '(connect, read)'), ('foo', 'must be an int, float or None')),
(
((3, 4, 5), '(connect, read)'),
('foo', 'must be an int, float or None'),
),
)
def test_invalid_timeout(self, httpbin, timeout, error_text):
with pytest.raises(ValueError) as e:
requests.get(httpbin('get'), timeout=timeout)
assert error_text in str(e)
@pytest.mark.parametrize('timeout', (None, Urllib3Timeout(connect=None, read=None)))
@pytest.mark.parametrize(
'timeout', (None, Urllib3Timeout(connect=None, read=None))
)
def test_none_timeout(self, httpbin, timeout):
"""Check that you can set None as a valid timeout value.
@@ -2489,7 +2591,10 @@ def test_data_argument_accepts_tuples(data):
"""
p = PreparedRequest()
p.prepare(
method='GET', url='http://www.example.com', data=data, hooks=default_hooks()
method='GET',
url='http://www.example.com',
data=data,
hooks=default_hooks(),
)
assert p.body == urlencode(data)
@@ -2536,7 +2641,10 @@ def test_urllib3_retries(httpbin):
from urllib3.util import Retry
s = requests.Session()
s.mount('http://', HTTPAdapter(max_retries=Retry(total=2, status_forcelist=[500])))
s.mount(
'http://',
HTTPAdapter(max_retries=Retry(total=2, status_forcelist=[500])),
)
with pytest.raises(RetryError):
s.get(httpbin('status/500'))
@@ -2558,8 +2666,14 @@ class TestPreparingURLs(object):
('http://google.com', 'http://google.com/'),
(u'http://ジェーピーニック.jp', u'http://xn--hckqz9bzb1cyrb.jp/'),
(u'http://xn--n3h.net/', u'http://xn--n3h.net/'),
(u'http://ジェーピーニック.jp'.encode('utf-8'), u'http://xn--hckqz9bzb1cyrb.jp/'),
(u'http://straße.de/straße', u'http://xn--strae-oqa.de/stra%C3%9Fe'),
(
u'http://ジェーピーニック.jp'.encode('utf-8'),
u'http://xn--hckqz9bzb1cyrb.jp/',
),
(
u'http://straße.de/straße',
u'http://xn--strae-oqa.de/stra%C3%9Fe',
),
(
u'http://straße.de/straße'.encode('utf-8'),
u'http://xn--strae-oqa.de/stra%C3%9Fe',
@@ -2643,8 +2757,16 @@ class TestPreparingURLs(object):
{"key": "value"},
u"http+unix://%2Fvar%2Frun%2Fsocket/path?key=value",
),
(b"mailto:user@example.org", {"key": "value"}, u"mailto:user@example.org"),
(u"mailto:user@example.org", {"key": "value"}, u"mailto:user@example.org"),
(
b"mailto:user@example.org",
{"key": "value"},
u"mailto:user@example.org",
),
(
u"mailto:user@example.org",
{"key": "value"},
u"mailto:user@example.org",
),
),
)
def test_parameters_for_nonstandard_schemes(self, input, params, expected):
@@ -2867,5 +2989,7 @@ class TestGetConnection(object):
"""
adapter = requests.adapters.HTTPAdapter()
with pytest.raises(IOError) as excinfo:
adapter.get_connection('https://example.com', verify=verify, cert=cert)
adapter.get_connection(
'https://example.com', verify=verify, cert=cert
)
excinfo.match('invalid path: a/path/that/does/not/exist')
+3 -1
View File
@@ -34,7 +34,9 @@ class TestCaseInsensitiveDict:
]
def test_repr(self):
assert repr(self.case_insensitive_dict) == "{'Accept': 'application/json'}"
assert repr(
self.case_insensitive_dict
) == "{'Accept': 'application/json'}"
def test_copy(self):
copy = self.case_insensitive_dict.copy()
+9 -3
View File
@@ -60,7 +60,9 @@ class TestTestServer:
def test_basic_waiting_server(self):
"""the server waits for the block_server event to be set before closing"""
block_server = threading.Event()
with Server.basic_response_server(wait_to_close_event=block_server) as (
with Server.basic_response_server(
wait_to_close_event=block_server
) as (
host, port
):
sock = socket.socket()
@@ -73,7 +75,9 @@ class TestTestServer:
def test_multiple_requests(self):
"""multiple requests can be served"""
requests_to_handle = 5
server = Server.basic_response_server(requests_to_handle=requests_to_handle)
server = Server.basic_response_server(
requests_to_handle=requests_to_handle
)
with server as (host, port):
server_url = 'http://{0}:{1}'.format(host, port)
for _ in range(requests_to_handle):
@@ -83,7 +87,9 @@ class TestTestServer:
with pytest.raises(requests.exceptions.ConnectionError):
r = requests.get(server_url)
@pytest.mark.skip(reason="this fails non-deterministically under pytest-xdist")
@pytest.mark.skip(
reason="this fails non-deterministically under pytest-xdist"
)
def test_request_recovery(self):
"""can check the requests content"""
# TODO: figure out why this sometimes fails when using pytest-xdist.
+55 -14
View File
@@ -171,7 +171,8 @@ class TestGetEnvironProxies:
@pytest.fixture(autouse=True, params=['no_proxy', 'NO_PROXY'])
def no_proxy(self, request, monkeypatch):
monkeypatch.setenv(
request.param, '192.168.0.0/24,127.0.0.1,localhost.localdomain,172.16.1.1'
request.param,
'192.168.0.0/24,127.0.0.1,localhost.localdomain,172.16.1.1',
)
@pytest.mark.parametrize(
@@ -189,14 +190,22 @@ class TestGetEnvironProxies:
@pytest.mark.parametrize(
'url',
('http://192.168.1.1:5000/', 'http://192.168.1.1/', 'http://www.requests.com/'),
(
'http://192.168.1.1:5000/',
'http://192.168.1.1/',
'http://www.requests.com/',
),
)
def test_not_bypass(self, url):
assert get_environ_proxies(url, no_proxy=None) != {}
@pytest.mark.parametrize(
'url',
('http://192.168.1.1:5000/', 'http://192.168.1.1/', 'http://www.requests.com/'),
(
'http://192.168.1.1:5000/',
'http://192.168.1.1/',
'http://www.requests.com/',
),
)
def test_bypass_no_proxy_keyword(self, url):
no_proxy = '192.168.1.1,requests.com'
@@ -260,7 +269,9 @@ class TestAddressInNetwork:
class TestGuessFilename:
@pytest.mark.parametrize('value', (1, type('Fake', (object,), {'name': 1})()))
@pytest.mark.parametrize(
'value', (1, type('Fake', (object,), {'name': 1})())
)
def test_guess_filename_invalid(self, value):
assert guess_filename(value) is None
@@ -362,7 +373,10 @@ ENCODED_PASSWORD = basics.quote(PASSWORD, '')
'http://user:pass%20pass@complex.url.com/path?query=yes',
('user', 'pass pass'),
),
('http://user:pass pass@complex.url.com/path?query=yes', ('user', 'pass pass')),
(
'http://user:pass pass@complex.url.com/path?query=yes',
('user', 'pass pass'),
),
(
'http://user%25user:pass@complex.url.com/path?query=yes',
('user%user', 'pass'),
@@ -400,7 +414,8 @@ def test_unquote_unreserved(uri, expected):
@pytest.mark.parametrize(
'mask, expected', ((8, '255.0.0.0'), (24, '255.255.255.0'), (25, '255.255.255.128'))
'mask, expected',
((8, '255.0.0.0'), (24, '255.255.255.0'), (25, '255.255.255.128')),
)
def test_dotted_netmask(mask, expected):
assert dotted_netmask(mask) == expected
@@ -432,7 +447,10 @@ def test_select_proxies(url, expected, proxies):
@pytest.mark.parametrize(
'value, expected',
(
('foo="is a fish", bar="as well"', {'foo': 'is a fish', 'bar': 'as well'}),
(
'foo="is a fish", bar="as well"',
{'foo': 'is a fish', 'bar': 'as well'},
),
('key_without_value', {'key_without_value': None}),
),
)
@@ -445,7 +463,9 @@ def test_parse_dict_header(value, expected):
(
(CaseInsensitiveDict(), None),
(
CaseInsensitiveDict({'content-type': 'application/json; charset=utf-8'}),
CaseInsensitiveDict(
{'content-type': 'application/json; charset=utf-8'}
),
'utf-8',
),
(CaseInsensitiveDict({'content-type': 'text/plain'}), 'ISO-8859-1'),
@@ -457,7 +477,14 @@ def test_get_encoding_from_headers(value, expected):
@pytest.mark.parametrize(
'value, length',
(('', 0), ('T', 1), ('Test', 4), ('Cont', 0), ('Other', -5), ('Content', None)),
(
('', 0),
('T', 1),
('Test', 4),
('Cont', 0),
('Other', -5),
('Content', None),
),
)
def test_iter_slices(value, length):
if length is None or (length <= 0 and len(value) > 0):
@@ -472,7 +499,13 @@ def test_iter_slices(value, length):
(
(
'<http:/.../front.jpeg>; rel=front; type="image/jpeg"',
[{'url': 'http:/.../front.jpeg', 'rel': 'front', 'type': 'image/jpeg'}],
[
{
'url': 'http:/.../front.jpeg',
'rel': 'front',
'type': 'image/jpeg',
}
],
),
('<http:/.../front.jpeg>', [{'url': 'http:/.../front.jpeg'}]),
('<http:/.../front.jpeg>;', [{'url': 'http:/.../front.jpeg'}]),
@@ -501,7 +534,9 @@ def test_prepend_scheme_if_needed(value, expected):
assert prepend_scheme_if_needed(value, 'http') == expected
@pytest.mark.parametrize('value, expected', (('T', 'T'), (b'T', 'T'), (u'T', 'T')))
@pytest.mark.parametrize(
'value, expected', (('T', 'T'), (b'T', 'T'), (u'T', 'T'))
)
def test_to_native_string(value, expected):
assert to_native_string(value) == expected
@@ -509,7 +544,10 @@ def test_to_native_string(value, expected):
@pytest.mark.parametrize(
'url, expected',
(
('http://u:p@example.com/path?a=1#test', 'http://example.com/path?a=1'),
(
'http://u:p@example.com/path?a=1#test',
'http://example.com/path?a=1',
),
('http://example.com/path', 'http://example.com/path'),
('//u:p@example.com/path', '//example.com/path'),
('//example.com/path', '//example.com/path'),
@@ -561,7 +599,8 @@ def test_add_dict_to_cookiejar(cookiejar):
@pytest.mark.parametrize(
'value, expected', ((u'test', True), (u'æíöû', False), (u'ジェーピーニック', False))
'value, expected',
((u'test', True), (u'æíöû', False), (u'ジェーピーニック', False)),
)
def test_unicode_is_ascii(value, expected):
assert unicode_is_ascii(value) is expected
@@ -605,7 +644,9 @@ def test_should_bypass_proxies_no_proxy(url, expected, monkeypatch):
('http://192.168.0.1/', False, ''),
),
)
def test_should_bypass_proxies_win_registry(url, expected, override, monkeypatch):
def test_should_bypass_proxies_win_registry(
url, expected, override, monkeypatch
):
"""Tests for function should_bypass_proxies to check if proxy
can be bypassed or not with Windows registry settings
"""
+3 -1
View File
@@ -46,7 +46,9 @@ class Server(threading.Thread):
def text_response_server(cls, text, request_timeout=0.5, **kwargs):
def text_response_handler(sock):
request_content = consume_socket_content(sock, timeout=request_timeout)
request_content = consume_socket_content(
sock, timeout=request_timeout
)
sock.send(text.encode('utf-8'))
return request_content