From 5a1da4e32604c33e74f6444a402fff6b7394caef Mon Sep 17 00:00:00 2001 From: Kenneth Reitz Date: Sun, 21 Apr 2019 13:04:46 -0400 Subject: [PATCH] that approach works --- requests3/http_utils.py | 8 +- setup.py | 5 +- tests/test_help.py | 4 +- tests/test_lowlevel.py | 232 --- tests/test_requests.py | 3026 -------------------------------------- tests/test_testserver.py | 155 -- tests/test_utils.py | 1 + 7 files changed, 10 insertions(+), 3421 deletions(-) delete mode 100644 tests/test_lowlevel.py delete mode 100644 tests/test_requests.py delete mode 100644 tests/test_testserver.py diff --git a/requests3/http_utils.py b/requests3/http_utils.py index f30a2ce6..26d20442 100644 --- a/requests3/http_utils.py +++ b/requests3/http_utils.py @@ -657,12 +657,12 @@ def should_bypass_proxies(url: str, no_proxy: typing.Optional[str]) -> bool: no_proxy_arg = no_proxy if no_proxy is None: no_proxy = get_proxy("no_proxy") - netloc = urlparse(url).netloc + parsed = urlparse(url) if no_proxy: # We need to check whether we match here. We need to see if we match # the end of the hostname, both with and without the port. no_proxy = (host for host in no_proxy.replace(" ", "").split(",") if host) - ip = netloc.split(":")[0] + ip = parsed.netloc.split(":")[0] if is_ipv4_address(ip): for proxy_ip in no_proxy: if is_valid_cidr(proxy_ip): @@ -680,13 +680,13 @@ def should_bypass_proxies(url: str, no_proxy: typing.Optional[str]) -> bool: host_with_port += ":{}".format(parsed.port) for host in no_proxy: - if netloc.endswith(host) or netloc.split(":")[0].endswith(host): + if parsed.netloc.endswith(host) or parsed.netloc.split(":")[0].endswith(host): # The URL does match something in no_proxy, so we don't want # to apply the proxies on this URL. return True with set_environ("no_proxy", no_proxy_arg): - return bool(proxy_bypass(netloc)) + return bool(proxy_bypass(parsed.netloc)) def get_environ_proxies(url: str, no_proxy: typing.Optional[bool] = None) -> dict: diff --git a/setup.py b/setup.py index 2a09ce25..3b24d035 100644 --- a/setup.py +++ b/setup.py @@ -37,8 +37,9 @@ class PyTest(Command): def run(self): import pytest - errno = pytest.main(["-n", "auto"]) - # errno = pytest.main([]) + # errno = pytest.main(["-n", "auto"]) + + errno = pytest.main([]) sys.exit(errno) diff --git a/tests/test_help.py b/tests/test_help.py index d45fba72..8d41dca0 100644 --- a/tests/test_help.py +++ b/tests/test_help.py @@ -29,11 +29,11 @@ def test_idna_without_version_attribute(mocker): """Older versions of IDNA don't provide a __version__ attribute, verify that if we have such a package, we don't blow up. """ - mocker.patch("requests.help.idna", new=None) + mocker.patch("requests3.help.idna", new=None) assert info()["idna"] == {"version": ""} def test_idna_with_version_attribute(mocker): """Verify we're actually setting idna version when it should be available.""" - mocker.patch("requests.help.idna", new=VersionedPackage("2.6")) + mocker.patch("requests3.help.idna", new=VersionedPackage("2.6")) assert info()["idna"] == {"version": "2.6"} diff --git a/tests/test_lowlevel.py b/tests/test_lowlevel.py deleted file mode 100644 index d4c221ca..00000000 --- a/tests/test_lowlevel.py +++ /dev/null @@ -1,232 +0,0 @@ -# -*- coding: utf-8 -*- -import pytest -import threading -import requests3 as requests - -from tests.testserver.server import Server, consume_socket_content - -from .utils import override_environ - - -def test_chunked_upload(): - """can safely send generators""" - close_server = threading.Event() - server = Server.basic_response_server(wait_to_close_event=close_server) - data = iter([b"a", b"b", b"c"]) - with server as (host, port): - url = "http://{}:{}/".format(host, port) - r = requests.post(url, data=data, stream=True) - close_server.set() # release server block - assert r.status_code == 200 - assert r.request.headers["Transfer-Encoding"] == "chunked" - - -def test_incorrect_content_length(): - """Test ConnectionError raised for incomplete responses""" - close_server = threading.Event() - server = Server.text_response_server( - "HTTP/1.1 200 OK\r\n" + "Content-Length: 50\r\n\r\n" + "Hello World." - ) - with server as (host, port): - url = "http://{0}:{1}/".format(host, port) - r = requests.Request("GET", url).prepare() - s = requests.Session() - with pytest.raises(requests.exceptions.ConnectionError) as e: - resp = s.send(r) - assert "12 bytes read, 38 more expected" in str(e) - close_server.set() # release server block - - -def test_digestauth_401_count_reset_on_redirect(): - """Ensure we correctly reset num_401_calls after a successful digest auth, - followed by a 302 redirect to another digest auth prompt. - - See https://github.com/requests/requests/issues/1979. - """ - text_401 = ( - b"HTTP/1.1 401 UNAUTHORIZED\r\n" - b"Content-Length: 0\r\n" - b'WWW-Authenticate: Digest nonce="6bf5d6e4da1ce66918800195d6b9130d"' - b', opaque="372825293d1c26955496c80ed6426e9e", ' - b'realm="me@kennethreitz.com", qop=auth\r\n\r\n' - ) - text_302 = b"HTTP/1.1 302 FOUND\r\n" b"Content-Length: 0\r\n" b"Location: /\r\n\r\n" - text_200 = b"HTTP/1.1 200 OK\r\n" b"Content-Length: 0\r\n\r\n" - expected_digest = ( - b'Authorization: Digest username="user", ' - b'realm="me@kennethreitz.com", ' - b'nonce="6bf5d6e4da1ce66918800195d6b9130d", uri="/"' - ) - auth = requests.auth.HTTPDigestAuth("user", "pass") - - def digest_response_handler(sock): - # Respond to initial GET with a challenge. - request_content = consume_socket_content(sock, timeout=0.5) - assert request_content.startswith(b"GET / HTTP/1.1") - sock.send(text_401) - # Verify we receive an Authorization header in response, then redirect. - request_content = consume_socket_content(sock, timeout=0.5) - assert expected_digest in request_content - sock.send(text_302) - # Verify Authorization isn't sent to the redirected host, - # then send another challenge. - request_content = consume_socket_content(sock, timeout=0.5) - assert b"Authorization:" not in request_content - sock.send(text_401) - # Verify Authorization is sent correctly again, and return 200 OK. - request_content = consume_socket_content(sock, timeout=0.5) - assert expected_digest in request_content - sock.send(text_200) - return request_content - - close_server = threading.Event() - server = Server(digest_response_handler, wait_to_close_event=close_server) - with server as (host, port): - url = "http://{}:{}/".format(host, port) - r = requests.get(url, auth=auth) - # Verify server succeeded in authenticating. - assert r.status_code == 200 - # Verify Authorization was sent in final request. - assert "Authorization" in r.request.headers - assert r.request.headers["Authorization"].startswith("Digest ") - # Verify redirect happened as we expected. - assert r.history[0].status_code == 302 - close_server.set() - - -def test_digestauth_401_only_sent_once(): - """Ensure we correctly respond to a 401 challenge once, and then - stop responding if challenged again. - """ - text_401 = ( - b"HTTP/1.1 401 UNAUTHORIZED\r\n" - b"Content-Length: 0\r\n" - b'WWW-Authenticate: Digest nonce="6bf5d6e4da1ce66918800195d6b9130d"' - b', opaque="372825293d1c26955496c80ed6426e9e", ' - b'realm="me@kennethreitz.com", qop=auth\r\n\r\n' - ) - expected_digest = ( - b'Authorization: Digest username="user", ' - b'realm="me@kennethreitz.com", ' - b'nonce="6bf5d6e4da1ce66918800195d6b9130d", uri="/"' - ) - auth = requests.auth.HTTPDigestAuth("user", "pass") - - def digest_failed_response_handler(sock): - # Respond to initial GET with a challenge. - request_content = consume_socket_content(sock, timeout=0.5) - assert request_content.startswith(b"GET / HTTP/1.1") - sock.send(text_401) - # Verify we receive an Authorization header in response, then - # challenge again. - request_content = consume_socket_content(sock, timeout=0.5) - assert expected_digest in request_content - sock.send(text_401) - # Verify the client didn't respond to second challenge. - request_content = consume_socket_content(sock, timeout=0.5) - assert request_content == b"" - return request_content - - close_server = threading.Event() - server = Server(digest_failed_response_handler, wait_to_close_event=close_server) - with server as (host, port): - url = "http://{}:{}/".format(host, port) - r = requests.get(url, auth=auth) - # Verify server didn't authenticate us. - assert r.status_code == 401 - assert r.history[0].status_code == 401 - close_server.set() - - -def test_digestauth_only_on_4xx(): - """Ensure we only send digestauth on 4xx challenges. - - See https://github.com/requests/requests/issues/3772. - """ - text_200_chal = ( - b"HTTP/1.1 200 OK\r\n" - b"Content-Length: 0\r\n" - b'WWW-Authenticate: Digest nonce="6bf5d6e4da1ce66918800195d6b9130d"' - b', opaque="372825293d1c26955496c80ed6426e9e", ' - b'realm="me@kennethreitz.com", qop=auth\r\n\r\n' - ) - auth = requests.auth.HTTPDigestAuth("user", "pass") - - def digest_response_handler(sock): - # Respond to GET with a 200 containing www-authenticate header. - request_content = consume_socket_content(sock, timeout=0.5) - assert request_content.startswith(b"GET / HTTP/1.1") - sock.send(text_200_chal) - # Verify the client didn't respond with auth. - request_content = consume_socket_content(sock, timeout=0.5) - assert request_content == b"" - return request_content - - close_server = threading.Event() - server = Server(digest_response_handler, wait_to_close_event=close_server) - with server as (host, port): - url = "http://{}:{}/".format(host, port) - r = requests.get(url, auth=auth) - # Verify server didn't receive auth from us. - assert r.status_code == 200 - assert len(r.history) == 0 - close_server.set() - - -_schemes_by_var_prefix = [ - ("http", ["http"]), - ("https", ["https"]), - ("all", ["http", "https"]), -] -_proxy_combos = [] -for prefix, schemes in _schemes_by_var_prefix: - for scheme in schemes: - _proxy_combos.append(("{0}_proxy".format(prefix), scheme)) -_proxy_combos += [(var.upper(), scheme) for var, scheme in _proxy_combos] - - -@pytest.mark.parametrize("var,scheme", _proxy_combos) -def test_use_proxy_from_environment(httpbin, var, scheme): - url = "{0}://httpbin.org".format(scheme) - fake_proxy = Server() # do nothing with the requests; just close the socket - with fake_proxy as (host, port): - proxy_url = "socks5://{}:{}".format(host, port) - kwargs = {var: proxy_url} - with override_environ(**kwargs): - # fake proxy's lack of response will cause a ConnectionError - with pytest.raises(requests.exceptions.ConnectionError): - requests.get(url) - # the fake proxy received a request - assert len(fake_proxy.handler_results) == 1 - # it had actual content (not checking for SOCKS protocol for now) - assert len(fake_proxy.handler_results[0]) > 0 - - -def test_redirect_rfc1808_to_non_ascii_location(): - path = u"š" - expected_path = b"%C5%A1" - redirect_request = [] # stores the second request to the server - - def redirect_resp_handler(sock): - consume_socket_content(sock, timeout=0.5) - location = u"//{}:{}/{}".format(host, port, path) - sock.send( - b"HTTP/1.1 301 Moved Permanently\r\n" - b"Content-Length: 0\r\n" - b"Location: " + location.encode("utf8") + b"\r\n" - b"\r\n" - ) - redirect_request.append(consume_socket_content(sock, timeout=0.5)) - sock.send(b"HTTP/1.1 200 OK\r\n\r\n") - - close_server = threading.Event() - server = Server(redirect_resp_handler, wait_to_close_event=close_server) - with server as (host, port): - url = u"http://{}:{}".format(host, port) - r = requests.get(url=url, allow_redirects=True) - assert r.status_code == 200 - assert len(r.history) == 1 - assert r.history[0].status_code == 301 - assert redirect_request[0].startswith(b"GET /" + expected_path + b" HTTP/1.1") - assert r.url == u"{0}/{1}".format(url, expected_path.decode("ascii")) - close_server.set() diff --git a/tests/test_requests.py b/tests/test_requests.py deleted file mode 100644 index 786ac884..00000000 --- a/tests/test_requests.py +++ /dev/null @@ -1,3026 +0,0 @@ -# -*- coding: utf-8 -*- -"""Tests for Requests.""" - -from __future__ import division -import itertools -import json -import os -import pickle -import collections -import contextlib -import warnings -import re - -import io -import requests3 -import pytest -import pytest_httpbin -from requests3.http_adapters import HTTPAdapter -from requests3.http_auth import HTTPDigestAuth, _basic_auth_str -from requests3._basics import ( - Morsel, cookielib, getproxies, str, urlparse, builtin_str -) -from requests3.http_cookies import (cookiejar_from_dict, morsel_to_cookie) -from requests3.exceptions import ( - ConnectionError, - ConnectTimeout, - InvalidScheme, - InvalidURL, - MissingScheme, - ReadTimeout, - Timeout, - RetryError, - TooManyRedirects, - ProxyError, - InvalidHeader, - UnrewindableBodyError, - InvalidBodyError, - SSLError, -) -from requests3.http_models import PreparedRequest -from requests3._structures import CaseInsensitiveDict -from requests3.http_sessions import SessionRedirectMixin -from requests3.http_models import urlencode -from requests3._hooks import default_hooks -from requests3.http_utils import DEFAULT_CA_BUNDLE_PATH - -from .compat import StringIO, u -from .utils import override_environ -from urllib3.util import Timeout as Urllib3Timeout - - -class SendRecordingAdapter(HTTPAdapter): - """ - A basic subclass of the HTTPAdapter that records the arguments used to - ``send``. - """ - - def __init__(self, *args, **kwargs): - super(SendRecordingAdapter, self).__init__(*args, **kwargs) - self.send_calls = [] - - def send(self, *args, **kwargs): - self.send_calls.append((args, kwargs)) - return super(SendRecordingAdapter, self).send(*args, **kwargs) - - -# Requests to this URL should always fail with a connection timeout (nothing -# listening on that port) -TARPIT = 'http://10.255.255.1' -try: - from ssl import SSLContext - - del SSLContext - HAS_MODERN_SSL = True -except ImportError: - HAS_MODERN_SSL = False -try: - requests3.pyopenssl - HAS_PYOPENSSL = True -except AttributeError: - HAS_PYOPENSSL = False - -@pytest.fixture -def s(request, *args, **kwargs): - return requests.Session() - - -class TestRequests: - - digest_auth_algo = ('MD5', 'SHA-256', 'SHA-512') - - def test_entry_points(self): - requests.Session().get - requests.Session().head - requests.get - requests.head - requests.put - requests.patch - requests.post - - @pytest.mark.parametrize( - 'exception, url', - ( - (MissingScheme, 'hiwpefhipowhefopw'), - (InvalidScheme, 'localhost:3128'), - (InvalidScheme, 'localhost.localdomain:3128/'), - (InvalidScheme, '10.122.1.1:3128/'), - (InvalidURL, 'http://'), - ), - ) - def test_invalid_url(self, exception, url): - with pytest.raises(exception): - requests.get(url) - - def test_basic_building(self): - req = requests.Request(method='GET') - req.url = 'http://kennethreitz.org/' - req.data = {'life': '42'} - pr = req.prepare() - assert pr.url == req.url - assert pr.body == 'life=42' - - @pytest.mark.parametrize('method', ('GET', 'HEAD')) - def test_no_content_length(self, httpbin, method): - req = requests.Request(method, httpbin(method.lower())).prepare() - assert 'Content-Length' not in req.headers - - @pytest.mark.parametrize('method', ('POST', 'PUT', 'PATCH', 'OPTIONS')) - def test_no_body_content_length(self, httpbin, method): - req = requests.Request(method, httpbin(method.lower())).prepare() - assert req.headers['Content-Length'] == '0' - - @pytest.mark.parametrize('method', ('POST', 'PUT', 'PATCH', 'OPTIONS')) - def test_empty_content_length(self, httpbin, method): - req = requests.Request( - method, httpbin(method.lower()), data='' - ).prepare( - ) - assert req.headers['Content-Length'] == '0' - - def test_override_content_length(self, httpbin): - headers = {'Content-Length': 'not zero'} - r = requests.Request('POST', httpbin('post'), headers=headers).prepare( - ) - assert 'Content-Length' in r.headers - assert r.headers['Content-Length'] == 'not zero' - - def test_path_is_not_double_encoded(self): - request = requests.Request( - 'GET', "http://0.0.0.0/get/test case" - ).prepare( - ) - assert request.path_url == '/get/test%20case' - - @pytest.mark.parametrize( - 'url, expected', - ( - ( - 'http://example.com/path#fragment', - 'http://example.com/path?a=b#fragment', - ), - ( - 'http://example.com/path?key=value#fragment', - 'http://example.com/path?key=value&a=b#fragment', - ), - ), - ) - def test_params_are_added_before_fragment(self, url, expected): - request = requests.Request('GET', url, params={"a": "b"}).prepare() - assert request.url == expected - - def test_params_original_order_is_preserved_by_default(self, s): - param_ordered_dict = collections.OrderedDict( - (('z', 1), ('a', 1), ('k', 1), ('d', 1)) - ) - request = requests.Request( - 'GET', 'http://example.com/', params=param_ordered_dict - ) - prep = s.prepare_request(request) - assert prep.url == 'http://example.com/?z=1&a=1&k=1&d=1' - - - # def test_params_bytes_are_encoded(self): - # request = requests.Request( - # 'GET', 'http://example.com', params=b'test=foo' - # ).prepare( - # ) - # assert request.url == 'http://example.com/?test=foo' - def test_binary_put(self): - request = requests.Request( - 'PUT', 'http://example.com', data=u"ööö".encode("utf-8") - ).prepare( - ) - assert isinstance(request.body, bytes) - - def test_whitespaces_are_removed_from_url(self): - # Test for issue #3696 - request = requests.Request('GET', ' http://example.com').prepare() - assert request.url == 'http://example.com/' - - @pytest.mark.parametrize( - 'scheme', ('http://', 'HTTP://', 'hTTp://', 'HttP://') - ) - def test_mixed_case_scheme_acceptable(self, s, httpbin, scheme): - s.proxies = getproxies() - parts = urlparse(httpbin('get')) - url = scheme + parts.netloc + parts.path - r = requests.Request('GET', url) - r = s.send(r.prepare()) - assert r.status_code == 200, 'failed for scheme {}'.format(scheme) - - def test_HTTP_200_OK_GET_ALTERNATIVE(self, s, httpbin): - r = requests.Request('GET', httpbin('get')) - s.proxies = getproxies() - r = s.send(r.prepare()) - assert r.status_code == 200 - - def test_HTTP_302_ALLOW_REDIRECT_GET(self, httpbin): - r = requests.get(httpbin('redirect', '1')) - assert r.status_code == 200 - assert r.history[0].status_code == 302 - assert r.history[0].is_redirect - - def test_HTTP_307_ALLOW_REDIRECT_POST(self, httpbin): - r = requests.post( - httpbin('redirect-to'), - data='test', - params={'url': 'post', 'status_code': 307}, - ) - assert r.status_code == 200 - assert r.history[0].status_code == 307 - assert r.history[0].is_redirect - assert r.json()['data'] == 'test' - - def test_HTTP_307_ALLOW_REDIRECT_POST_WITH_SEEKABLE(self, httpbin): - byte_str = b'test' - r = requests.post( - httpbin('redirect-to'), - data=io.BytesIO(byte_str), - params={'url': 'post', 'status_code': 307}, - ) - assert r.status_code == 200 - assert r.history[0].status_code == 307 - assert r.history[0].is_redirect - assert r.json()['data'] == byte_str.decode('utf-8') - - def test_HTTP_302_TOO_MANY_REDIRECTS(self, httpbin): - try: - requests.get(httpbin('relative-redirect', '50')) - except TooManyRedirects as e: - url = httpbin('relative-redirect', '20') - assert e.request.url == url - assert e.response.url == url - assert len(e.response.history) == 30 - else: - pytest.fail( - 'Expected redirect to raise TooManyRedirects but it did not' - ) - - def test_HTTP_302_TOO_MANY_REDIRECTS_WITH_PARAMS(self, s, httpbin): - s.max_redirects = 5 - try: - s.get(httpbin('relative-redirect', '50')) - except TooManyRedirects as e: - url = httpbin('relative-redirect', '45') - assert e.request.url == url - assert e.response.url == url - assert len(e.response.history) == 5 - else: - pytest.fail( - 'Expected custom max number of redirects to be respected but was not' - ) - - @pytest.mark.parametrize( - 'method, body, expected', - ( - ('GET', None, 'GET'), - ('HEAD', None, 'HEAD'), - ('POST', 'test', 'GET'), - ('PUT', 'put test', 'PUT'), - ('PATCH', 'patch test', 'PATCH'), - ('DELETE', '', 'DELETE'), - ), - ) - def test_http_301_for_redirectable_methods( - self, httpbin, method, body, expected - ): - """Tests all methods except OPTIONS for expected redirect behaviour. - - OPTIONS responses can behave differently depending on the server, so - we don't have anything uniform to test except how httpbin responds - to them. For that reason they aren't included here. - """ - params = {'url': '/%s' % expected.lower(), 'status_code': '301'} - r = requests.request( - method, httpbin('redirect-to'), data=body, params=params - ) - assert r.request.url == httpbin(expected.lower()) - assert r.request.method == expected - assert r.history[0].status_code == 301 - assert r.history[0].is_redirect - if expected in ('GET', 'HEAD'): - assert r.request.body is None - else: - assert r.json()['data'] == body - - @pytest.mark.parametrize( - 'method, body, expected', - ( - ('GET', None, 'GET'), - ('HEAD', None, 'HEAD'), - ('POST', 'test', 'GET'), - ('PUT', 'put test', 'PUT'), - ('PATCH', 'patch test', 'PATCH'), - ('DELETE', '', 'DELETE'), - ), - ) - def test_http_302_for_redirectable_methods( - self, httpbin, method, body, expected - ): - """Tests all methods except OPTIONS for expected redirect behaviour. - - OPTIONS responses can behave differently depending on the server, so - we don't have anything uniform to test except how httpbin responds - to them. For that reason they aren't included here. - """ - params = {'url': '/%s' % expected.lower()} - r = requests.request( - method, httpbin('redirect-to'), data=body, params=params - ) - assert r.request.url == httpbin(expected.lower()) - assert r.request.method == expected - assert r.history[0].status_code == 302 - assert r.history[0].is_redirect - if expected in ('GET', 'HEAD'): - assert r.request.body is None - else: - assert r.json()['data'] == body - - @pytest.mark.parametrize( - 'method, body, expected', - ( - ('GET', None, 'GET'), - ('HEAD', None, 'HEAD'), - ('POST', 'test', 'GET'), - ('PUT', 'put test', 'GET'), - ('PATCH', 'patch test', 'GET'), - ('DELETE', '', 'GET'), - ), - ) - def test_http_303_for_redirectable_methods( - self, httpbin, method, body, expected - ): - """Tests all methods except OPTIONS for expected redirect behaviour. - - OPTIONS responses can behave differently depending on the server, so - we don't have anything uniform to test except how httpbin responds - to them. For that reason they aren't included here. - """ - params = {'url': '/%s' % expected.lower(), 'status_code': '303'} - r = requests.request( - method, httpbin('redirect-to'), data=body, params=params - ) - assert r.request.url == httpbin(expected.lower()) - assert r.request.method == expected - assert r.history[0].status_code == 303 - assert r.history[0].is_redirect - assert r.request.body is None - - def test_multiple_location_headers(self, s, httpbin): - headers = [ - ('Location', 'http://example.com'), - ('Location', 'https://example.com/1'), - ] - params = '&'.join(['%s=%s' % (k, v) for k, v in headers]) - - req = requests.Request('GET', httpbin('response-headers?%s' % params)) - prep = s.prepare_request(req) - resp = s.send(prep) - # change response to redirect - resp.status_code = 302 - with pytest.raises(InvalidHeader): - # next triggers yield on generator - next(s.resolve_redirects(resp, prep)) - - def test_header_and_body_removal_on_redirect(self, s, httpbin): - purged_headers = ('Content-Length', 'Content-Type') - - req = requests.Request('POST', httpbin('post'), data={'test': 'data'}) - prep = s.prepare_request(req) - resp = s.send(prep) - # Mimic a redirect response - resp.status_code = 302 - resp.headers['location'] = 'get' - # Run request through resolve_redirects - next_resp = next(s.resolve_redirects(resp, prep)) - assert next_resp.request.body is None - for header in purged_headers: - assert header not in next_resp.request.headers - - def test_transfer_enc_removal_on_redirect(self, s, httpbin): - purged_headers = ('Transfer-Encoding', 'Content-Type') - - req = requests.Request( - 'POST', httpbin('post'), data=(b'x' for x in range(1)) - ) - prep = s.prepare_request(req) - assert 'Transfer-Encoding' in prep.headers - # Create Response to avoid https://github.com/kevin1024/pytest-httpbin/issues/33 - resp = requests.Response() - resp.raw = io.BytesIO(b'the content') - resp.request = prep - setattr(resp.raw, 'release_conn', lambda *args: args) - # Mimic a redirect response - resp.status_code = 302 - resp.headers['location'] = httpbin('get') - # Run request through resolve_redirect - next_resp = next(s.resolve_redirects(resp, prep)) - assert next_resp.request.body is None - for header in purged_headers: - assert header not in next_resp.request.headers - - def test_fragment_maintained_on_redirect(self, httpbin): - fragment = "#view=edit&token=hunter2" - r = requests.get(httpbin('redirect-to?url=get')+fragment) - - assert len(r.history) > 0 - assert r.history[0].request.url == httpbin('redirect-to?url=get')+fragment - assert r.url == httpbin('get')+fragment - - def test_HTTP_200_OK_GET_WITH_PARAMS(self, httpbin): - heads = {'User-agent': 'Mozilla/5.0'} - r = requests.get(httpbin('user-agent'), headers=heads) - assert heads['User-agent'] in r.text - assert r.status_code == 200 - - def test_HTTP_200_OK_GET_WITH_MIXED_PARAMS(self, httpbin): - heads = {'User-agent': 'Mozilla/5.0'} - r = requests.get( - httpbin('get') + '?test=true', params={'q': 'test'}, headers=heads - ) - assert r.status_code == 200 - - def test_set_cookie_on_301(self, s, httpbin): - url = httpbin('cookies/set?foo=bar') - s.get(url) - assert s.cookies['foo'] == 'bar' - - def test_cookie_sent_on_redirect(self, s, httpbin): - s.get(httpbin('cookies/set?foo=bar')) - r = s.get(httpbin('redirect/1')) # redirects to httpbin('get') - assert 'Cookie' in r.json()['headers'] - - def test_cookie_removed_on_expire(self, s, httpbin): - s.get(httpbin('cookies/set?foo=bar')) - assert s.cookies['foo'] == 'bar' - s.get( - httpbin('response-headers'), - params={ - 'Set-Cookie': 'foo=deleted; expires=Thu, 01-Jan-1970 00:00:01 GMT' - }, - ) - assert 'foo' not in s.cookies - - def test_cookie_quote_wrapped(self, s, httpbin): - s.get(httpbin('cookies/set?foo="bar:baz"')) - assert s.cookies['foo'] == '"bar:baz"' - - def test_cookie_persists_via_api(self, s, httpbin): - r = s.get(httpbin('redirect/1'), cookies={'foo': 'bar'}) - assert 'foo' in r.request.headers['Cookie'] - assert 'foo' in r.history[0].request.headers['Cookie'] - - def test_request_cookie_overrides_session_cookie(self, s, httpbin): - s.cookies['foo'] = 'bar' - r = s.get(httpbin('cookies'), cookies={'foo': 'baz'}) - assert r.json()['cookies']['foo'] == 'baz' - # Session cookie should not be modified - assert s.cookies['foo'] == 'bar' - - def test_request_cookies_not_persisted(self, s, httpbin): - s.get(httpbin('cookies'), cookies={'foo': 'baz'}) - # Sending a request with cookies should not add cookies to the session - assert not s.cookies - - def test_generic_cookiejar_works(self, s, httpbin): - cj = cookielib.CookieJar() - cookiejar_from_dict({'foo': 'bar'}, cj) - s.cookies = cj - r = s.get(httpbin('cookies')) - # Make sure the cookie was sent - assert r.json()['cookies']['foo'] == 'bar' - # Make sure the session cj is still the custom one - assert s.cookies is cj - - def test_param_cookiejar_works(self, s, httpbin): - cj = cookielib.CookieJar() - cookiejar_from_dict({'foo': 'bar'}, cj) - r = s.get(httpbin('cookies'), cookies=cj) - # Make sure the cookie was sent - assert r.json()['cookies']['foo'] == 'bar' - - def test_cookielib_cookiejar_on_redirect(self, s, httpbin): - """Tests resolve_redirect doesn't fail when merging cookies - with non-RequestsCookieJar cookiejar. - - See GH #3579 - """ - cj = cookiejar_from_dict({'foo': 'bar'}, cookielib.CookieJar()) - s.cookies = cookiejar_from_dict({'cookie': 'tasty'}) - # Prepare request without using Session - req = requests3.Request('GET', httpbin('headers'), cookies=cj) - prep_req = req.prepare() - # Send request and simulate redirect - resp = s.send(prep_req) - resp.status_code = 302 - resp.headers['location'] = httpbin('get') - redirects = s.resolve_redirects(resp, prep_req) - resp = next(redirects) - # Verify CookieJar isn't being converted to RequestsCookieJar - assert isinstance(prep_req._cookies, cookielib.CookieJar) - assert isinstance(resp.request._cookies, cookielib.CookieJar) - assert not isinstance( - resp.request._cookies, requests3.cookies.RequestsCookieJar - ) - cookies = {} - for c in resp.request._cookies: - cookies[c.name] = c.value - assert cookies['foo'] == 'bar' - assert cookies['cookie'] == 'tasty' - - @pytest.mark.parametrize( - 'jar', (requests3.http_cookies.RequestsCookieJar(), cookielib.CookieJar()) - ) - def test_custom_cookie_policy_persistence(self, s, httpbin, jar): - """Verify a custom CookiePolicy is propagated on each session request.""" - - class TestCookiePolicy(cookielib.DefaultCookiePolicy): - """Policy to restrict all cookies from localhost (127.0.0.1).""" - - def __init__(self): - cookielib.DefaultCookiePolicy.__init__( - self, blocked_domains=['127.0.0.1'] - ) - - # Establish session with jar and set some cookies. - s.cookies = jar - s.get(httpbin('cookies/set?k1=v1&k2=v2')) - assert len(s.cookies) == 2 - # Set different policy. - s.cookies.set_policy(TestCookiePolicy()) - assert isinstance(s.cookies._policy, TestCookiePolicy) - # No cookies were sent to our blocked domain and none were set. - resp = s.get(httpbin('cookies/set?k3=v3')) - assert 'Cookie' not in resp.request.headers - assert len(s.cookies) == 2 - assert 'k3' not in s.cookies - - def test_requests_in_history_are_not_overridden(self, httpbin): - resp = requests.get(httpbin('redirect/3')) - urls = [r.url for r in resp.history] - req_urls = [r.request.url for r in resp.history] - assert urls == req_urls - - def test_history_is_always_a_list(self, httpbin): - """Show that even with redirects, Response.history is always a list.""" - resp = requests.get(httpbin('get')) - assert isinstance(resp.history, list) - resp = requests.get(httpbin('redirect/1')) - assert isinstance(resp.history, list) - assert not isinstance(resp.history, tuple) - - def test_headers_on_session_with_None_are_not_sent(self, httpbin, s): - """Do not send headers in Session.headers with None values.""" - s.headers['Accept-Encoding'] = None - req = requests.Request('GET', httpbin('get')) - prep = s.prepare_request(req) - assert 'Accept-Encoding' not in prep.headers - - def test_headers_preserve_order(self, s, httpbin): - """Preserve order when headers provided as OrderedDict.""" - s.headers = collections.OrderedDict() - s.headers['Accept-Encoding'] = 'identity' - s.headers['First'] = '1' - s.headers['Second'] = '2' - headers = collections.OrderedDict([('Third', '3'), ('Fourth', '4')]) - headers['Fifth'] = '5' - headers['Second'] = '222' - req = requests.Request('GET', httpbin('get'), headers=headers) - prep = s.prepare_request(req) - items = list(prep.headers.items()) - assert items[0] == ('Accept-Encoding', 'identity') - assert items[1] == ('First', '1') - assert items[2] == ('Second', '222') - assert items[3] == ('Third', '3') - assert items[4] == ('Fourth', '4') - assert items[5] == ('Fifth', '5') - - @pytest.mark.parametrize('key', ('User-agent', 'user-agent')) - def test_user_agent_transfers(self, httpbin, key): - heads = {key: 'Mozilla/5.0 (github.com/requests/requests)'} - r = requests.get(httpbin('user-agent'), headers=heads) - assert heads[key] in r.text - - def test_HTTP_200_OK_HEAD(self, httpbin): - r = requests.head(httpbin('get')) - assert r.status_code == 200 - - def test_HTTP_200_OK_PUT(self, httpbin): - r = requests.put(httpbin('put')) - assert r.status_code == 200 - - def test_BASICAUTH_TUPLE_HTTP_200_OK_GET(self, httpbin, s): - auth = ('user', 'pass') - url = httpbin('basic-auth', 'user', 'pass') - r = requests.get(url, auth=auth) - assert r.status_code == 200 - r = requests.get(url) - assert r.status_code == 401 - s.auth = auth - r = s.get(url) - assert r.status_code == 200 - - @pytest.mark.parametrize( - 'username, password', - ( - ('user', 'pass'), - (u'имя'.encode('utf-8'), u'пароль'.encode('utf-8')), - ), - ) - def test_set_basicauth(self, httpbin, username, password): - auth = (username, password) - url = httpbin('get') - r = requests.Request('GET', url, auth=auth) - p = r.prepare() - assert p.headers['Authorization'] == _basic_auth_str( - username, password - ) - - @pytest.mark.parametrize( - 'username, password', (('user', 1234), (None, 'test')) - ) - def test_non_str_basicauth(self, username, password): - """Ensure we only allow string or bytes values for basicauth""" - with pytest.raises(TypeError) as e: - requests.auth._basic_auth_str(username, password) - assert 'must be of type str or bytes' in str(e) - - - # def test_basicauth_encodes_byte_strings(self): - # """Ensure b'test' formats as the byte string "test" rather - # than the unicode string "b'test'" in Python 3. - # """ - # auth = (b'\xc5\xafsername', b'test\xc6\xb6') - # r = requests.Request('GET', 'http://localhost', auth=auth) - # p = r.prepare() - # assert p.headers['Authorization'] == 'Basic xa9zZXJuYW1lOnRlc3TGtg==' - @pytest.mark.parametrize( - 'url, exception', - (('http://doesnotexist.google.com', ConnectionError), ('http://localhost:1', ConnectionError), ('http://fe80::5054:ff:fe5a:fc0', InvalidURL)), - # Connecting to an unknown domain should raise a ConnectionError - # Connecting to an invalid port should raise a ConnectionError - # Inputing a URL that cannot be parsed should raise an InvalidURL error - ) - def test_errors(self, url, exception): - with pytest.raises(exception): - requests.get(url, timeout=1) - - def test_proxy_error(self): - # any proxy related error (address resolution, no route to host, etc) should result in a ProxyError - with pytest.raises(ProxyError): - requests.get( - 'http://localhost:1', - proxies={'http': 'non-resolvable-address'}, - ) - - def test_basicauth_with_netrc(self, httpbin, s): - auth = ('user', 'pass') - wrong_auth = ('wronguser', 'wrongpass') - url = httpbin('basic-auth', 'user', 'pass') - old_auth = requests.sessions.get_netrc_auth - try: - - def get_netrc_auth_mock(url): - return auth - - requests.sessions.get_netrc_auth = get_netrc_auth_mock - # Should use netrc and work. - r = requests.get(url) - assert r.status_code == 200 - # Given auth should override and fail. - r = requests.get(url, auth=wrong_auth) - assert r.status_code == 401 - - # Should use netrc and work. - r = s.get(url) - assert r.status_code == 200 - # Given auth should override and fail. - s.auth = wrong_auth - r = s.get(url) - assert r.status_code == 401 - finally: - requests.sessions.get_netrc_auth = old_auth - - def test_DIGEST_HTTP_200_OK_GET(self, httpbin, s): - auth = HTTPDigestAuth('user', 'pass') - url = httpbin('digest-auth', 'auth', 'user', 'pass') - r = requests.get(url, auth=auth) - assert r.status_code == 200 - r = requests.get(url) - assert r.status_code == 401 - - s.auth = HTTPDigestAuth('user', 'pass') - r = s.get(url) - assert r.status_code == 200 - - def test_DIGEST_AUTH_RETURNS_COOKIE(self, httpbin): - url = httpbin('digest-auth', 'auth', 'user', 'pass') - auth = HTTPDigestAuth('user', 'pass') - r = requests.get(url) - assert r.cookies['fake'] == 'fake_value' - r = requests.get(url, auth=auth) - assert r.status_code == 200 - - def test_DIGEST_AUTH_SETS_SESSION_COOKIES(self, httpbin, s): - url = httpbin('digest-auth', 'auth', 'user', 'pass') - auth = HTTPDigestAuth('user', 'pass') - - s.get(url, auth=auth) - assert s.cookies['fake'] == 'fake_value' - - def test_DIGEST_STREAM(self, httpbin): - auth = HTTPDigestAuth('user', 'pass') - url = httpbin('digest-auth', 'auth', 'user', 'pass') - r = requests.get(url, auth=auth, stream=True) - assert r.raw.read() != b'' - r = requests.get(url, auth=auth, stream=False) - assert r.raw.read() == b'' - - def test_DIGESTAUTH_WRONG_HTTP_401_GET(self, httpbin, s): - auth = HTTPDigestAuth('user', 'wrongpass') - url = httpbin('digest-auth', 'auth', 'user', 'pass') - r = requests.get(url, auth=auth) - assert r.status_code == 401 - r = requests.get(url) - assert r.status_code == 401 - s.auth = auth - r = s.get(url) - assert r.status_code == 401 - - def test_DIGESTAUTH_QUOTES_QOP_VALUE(self, httpbin): - auth = HTTPDigestAuth('user', 'pass') - url = httpbin('digest-auth', 'auth', 'user', 'pass') - r = requests.get(url, auth=auth) - assert '"auth"' in r.request.headers['Authorization'] - - def test_POSTBIN_GET_POST_FILES(self, httpbin): - url = httpbin('post') - requests.post(url).raise_for_status() - post1 = requests.post(url, data={'some': 'data'}) - assert post1.status_code == 200 - with open('Pipfile') as f: - post2 = requests.post(url, files={'some': f}) - assert post2.status_code == 200 - post4 = requests.post(url, data='[{"some": "json"}]') - assert post4.status_code == 200 - with pytest.raises(ValueError): - requests.post(url, files=['bad file data']) - - def test_invalid_files_input(self, httpbin): - - url = httpbin('post') - post = requests.post(url, - files={"random-file-1": None, "random-file-2": 1}) - assert b'name="random-file-1"' not in post.request.body - assert b'name="random-file-2"' in post.request.body - - def test_POSTBIN_SEEKED_OBJECT_WITH_NO_ITER(self, httpbin): - - class TestStream(object): - - def __init__(self, data): - self.data = data.encode() - self.length = len(self.data) - self.index = 0 - - def __len__(self): - return self.length - - def read(self, size=None): - if size: - ret = self.data[self.index: self.index + size] - self.index += size - else: - ret = self.data[self.index:] - self.index = self.length - return ret - - def tell(self): - return self.index - - def seek(self, offset, where=0): - if where == 0: - self.index = offset - elif where == 1: - self.index += offset - elif where == 2: - self.index = self.length + offset - - test = TestStream('test') - post1 = requests.post(httpbin('post'), data=test) - assert post1.status_code == 200 - assert post1.json()['data'] == 'test' - test = TestStream('test') - test.seek(2) - post2 = requests.post(httpbin('post'), data=test) - assert post2.status_code == 200 - assert post2.json()['data'] == 'st' - - def test_POSTBIN_GET_POST_FILES_WITH_DATA(self, httpbin): - url = httpbin('post') - requests.post(url).raise_for_status() - post1 = requests.post(url, data={'some': 'data'}) - assert post1.status_code == 200 - with open('Pipfile') as f: - post2 = requests.post( - url, data={'some': 'data'}, files={'some': f} - ) - assert post2.status_code == 200 - post4 = requests.post(url, data='[{"some": "json"}]') - assert post4.status_code == 200 - with pytest.raises(ValueError): - requests.post(url, files=['bad file data']) - - def test_post_with_custom_mapping(self, httpbin): - - class CustomMapping(collections.MutableMapping): - - def __init__(self, *args, **kwargs): - self.data = dict(*args, **kwargs) - - def __delitem__(self, key): - del self.data[key] - - def __getitem__(self, key): - return self.data[key] - - def __setitem__(self, key, value): - self.data[key] = value - - def __iter__(self): - return iter(self.data) - - def __len__(self): - return len(self.data) - - data = CustomMapping({'some': 'data'}) - url = httpbin('post') - found_json = requests.post(url, data=data).json().get('form') - assert found_json == {'some': 'data'} - - def test_conflicting_post_params(self, httpbin): - url = httpbin('post') - with open('Pipfile') as f: - pytest.raises( - ValueError, - "requests.post(url, data='[{\"some\": \"data\"}]', files={'some': f})", - ) - pytest.raises( - ValueError, - "requests.post(url, data=u('[{\"some\": \"data\"}]'), files={'some': f})", - ) - - def test_request_ok_set(self, httpbin): - r = requests.get(httpbin('status', '404')) - assert not r.ok - - def test_status_raising(self, httpbin): - r = requests.get(httpbin('status', '404')) - with pytest.raises(requests.exceptions.HTTPError): - r.raise_for_status() - r = requests.get(httpbin('status', '500')) - assert not r.ok - - def test_raise_for_status_returns_self(self, httpbin): - r = requests.get(httpbin('status', '200')) - assert r.raise_for_status() is r - - def test_decompress_gzip(self, httpbin): - r = requests.get(httpbin('gzip')) - r.content.decode('ascii') - - @pytest.mark.parametrize( - 'url, params', - ( - ('/get', {'foo': 'føø'}), - ('/get', {'føø': 'føø'}), - ('/get', {'føø': 'føø'}), - ('/get', {'foo': 'foo'}), - ('ø', {'foo': 'foo'}), - ), - ) - def test_unicode_get(self, httpbin, url, params): - requests.get(httpbin(url), params=params) - - def test_unicode_header_name(self, httpbin): - requests.put( - httpbin('put'), - headers={str('Content-Type'): 'application/octet-stream'}, - data='\xff', - ) # compat.str is unicode. - - def test_pyopenssl_redirect(self, httpbin_secure, httpbin_ca_bundle): - requests.get(httpbin_secure('status', '301'), verify=httpbin_ca_bundle) - - def test_invalid_ca_certificate_path(self, httpbin_secure): - INVALID_PATH = '/garbage' - with pytest.raises(IOError) as e: - requests.get(httpbin_secure(), verify=INVALID_PATH) - assert str( - e.value - ) == 'Could not find a suitable TLS CA certificate bundle, invalid path: {0}'.format( - INVALID_PATH - ) - - def test_invalid_ssl_certificate_files(self, httpbin_secure): - INVALID_PATH = '/garbage' - with pytest.raises(IOError) as e: - requests.get(httpbin_secure(), cert=INVALID_PATH) - assert str( - e.value - ) == 'Could not find the TLS certificate file, invalid path: {0}'.format( - INVALID_PATH - ) - with pytest.raises(IOError) as e: - requests.get(httpbin_secure(), cert=('.', INVALID_PATH)) - assert str( - e.value - ) == 'Could not find the TLS key file, invalid path: {0}'.format( - INVALID_PATH - ) - - def test_http_with_certificate(self, httpbin): - r = requests.get(httpbin(), cert='.') - assert r.status_code == 200 - - def test_https_warnings(self, httpbin_secure, httpbin_ca_bundle): - """warnings are emitted with requests.get""" - if HAS_MODERN_SSL or HAS_PYOPENSSL: - warnings_expected = ('SubjectAltNameWarning',) - else: - warnings_expected = ( - 'SNIMissingWarning', - 'InsecurePlatformWarning', - 'SubjectAltNameWarning', - ) - with pytest.warns(None) as warning_records: - warnings.simplefilter('always') - requests.get( - httpbin_secure('status', '200'), verify=httpbin_ca_bundle - ) - warning_records = [ - item - for item in warning_records - if item.category.__name__ != 'ResourceWarning' - ] - warnings_category = tuple( - item.category.__name__ for item in warning_records - ) - assert warnings_category == warnings_expected - - def test_certificate_failure(self, httpbin_secure): - """ - When underlying SSL problems occur, an SSLError is raised. - """ - with pytest.raises(SSLError): - # Our local httpbin does not have a trusted CA, so this call will - # fail if we use our default trust bundle. - requests.get(httpbin_secure('status', '200')) - - def test_urlencoded_get_query_multivalued_param(self, httpbin): - r = requests.get(httpbin('get'), params={'test': ['foo', 'baz']}) - assert r.status_code == 200 - assert r.url == httpbin('get?test=foo&test=baz') - - def test_form_encoded_post_query_multivalued_element(self, httpbin): - r = requests.Request(method='POST', url=httpbin('post'), - data=dict(test=['foo', 'baz'])) - prep = r.prepare() - assert prep.body == 'test=foo&test=baz' - - def test_different_encodings_dont_break_post(self, httpbin): - r = requests.post( - httpbin('post'), - data={'stuff': json.dumps({'a': 123})}, - params={'blah': 'asdf1234'}, - files={'file': ('test_requests.py', open(__file__, 'rb'))}, - ) - assert r.status_code == 200 - - @pytest.mark.parametrize( - 'data', - ( - {'stuff': u('ëlïxr')}, - {'stuff': u('ëlïxr').encode('utf-8')}, - {'stuff': 'elixr'}, - {'stuff': 'elixr'.encode('utf-8')}, - ), - ) - def test_unicode_multipart_post(self, httpbin, data): - r = requests.post( - httpbin('post'), - data=data, - files={'file': ('test_requests.py', open(__file__, 'rb'))}, - ) - assert r.status_code == 200 - - def test_unicode_multipart_post_fieldnames(self, httpbin): - filename = os.path.splitext(__file__)[0] + '.py' - r = requests.Request( - method='POST', - url=httpbin('post'), - data={'stuff'.encode('utf-8'): 'elixr'}, - files={'file': ('test_requests.py', open(filename, 'rb'))}, - ) - prep = r.prepare() - assert b'name="stuff"' in prep.body - assert b'name="b\'stuff\'"' not in prep.body - - def test_unicode_method_name(self, httpbin): - files = {'file': open(__file__, 'rb')} - r = requests.request( - method=u('POST'), url=httpbin('post'), files=files - ) - assert r.status_code == 200 - - def test_unicode_method_name_with_request_object(self, httpbin, s): - files = {'file': open(__file__, 'rb')} - - req = requests.Request(u('POST'), httpbin('post'), files=files) - prep = s.prepare_request(req) - assert isinstance(prep.method, builtin_str) - assert prep.method == 'POST' - resp = s.send(prep) - assert resp.status_code == 200 - - def test_non_prepared_request_error(self, s): - req = requests.Request(u('POST'), '/') - with pytest.raises(ValueError) as e: - s.send(req) - assert str(e.value) == 'You can only send PreparedRequests.' - - def test_custom_content_type(self, httpbin): - r = requests.post( - httpbin('post'), - data={'stuff': json.dumps({'a': 123})}, - files={ - 'file1': ('test_requests.py', open(__file__, 'rb')), - 'file2': ( - 'test_requests', - open(__file__, 'rb'), - 'text/py-content-type', - ), - }, - ) - assert r.status_code == 200 - assert b"text/py-content-type" in r.request.body - - def test_hook_receives_request_arguments(self, httpbin, s): - - def hook(resp, **kwargs): - assert resp is not None - assert kwargs != {} - - r = requests.Request('GET', httpbin(), hooks={'response': hook}) - prep = s.prepare_request(r) - s.send(prep) - - def test_session_hooks_are_used_with_no_request_hooks(self, httpbin, s): - hook = lambda x, *args, **kwargs: x - - s.hooks['response'].append(hook) - r = requests.Request('GET', httpbin()) - prep = s.prepare_request(r) - assert prep.hooks['response'] != [] - assert prep.hooks['response'] == [hook] - - def test_session_hooks_are_overridden_by_request_hooks(self, httpbin, s): - hook1 = lambda x, *args, **kwargs: x - hook2 = lambda x, *args, **kwargs: x - assert hook1 is not hook2 - - s.hooks['response'].append(hook2) - r = requests.Request('GET', httpbin(), hooks={'response': [hook1]}) - prep = s.prepare_request(r) - assert prep.hooks['response'] == [hook1] - - def test_prepared_request_hook(self, httpbin, s): - - def hook(resp, **kwargs): - resp.headers['hook-working'] = 'True' - return resp - - req = requests.Request('GET', httpbin(), hooks={'response': hook}) - prep = req.prepare() - s.proxies = getproxies() - resp = s.send(prep) - assert resp.headers['hook-working'] - - def test_prepared_from_session(self, httpbin, s): - - class DummyAuth(requests.auth.AuthBase): - - def __call__(self, r): - r.headers['Dummy-Auth-Test'] = 'dummy-auth-test-ok' - return r - - req = requests.Request('GET', httpbin('headers')) - assert not req.auth - - s.auth = DummyAuth() - prep = s.prepare_request(req) - resp = s.send(prep) - assert resp.json()['headers'][ - 'Dummy-Auth-Test' - ] == 'dummy-auth-test-ok' - - - # def test_prepare_request_with_bytestring_url(self): - # req = requests.Request('GET', b'https://httpbin.org/') - # s = requests.Session() - # prep = s.prepare_request(req) - # assert prep.url == "https://httpbin.org/" - # def test_request_with_bytestring_host(self, httpbin): - # s = requests.Session() - # resp = s.request( - # 'GET', - # httpbin('cookies/set?cookie=value'), - # allow_redirects=False, - # headers={'Host': b'httpbin.org'}, - # ) - # assert resp.cookies.get('cookie') == 'value' - def test_links(self): - r = requests.Response() - r.headers = { - 'cache-control': 'public, max-age=60, s-maxage=60', - 'connection': 'keep-alive', - 'content-encoding': 'gzip', - 'content-type': 'application/json; charset=utf-8', - 'date': 'Sat, 26 Jan 2013 16:47:56 GMT', - 'etag': '"6ff6a73c0e446c1f61614769e3ceb778"', - 'last-modified': 'Sat, 26 Jan 2013 16:22:39 GMT', - 'link': ( - '; rel="next", ; ' - ' rel="last"' - ), - 'server': 'GitHub.com', - 'status': '200 OK', - 'vary': 'Accept', - 'x-content-type-options': 'nosniff', - 'x-github-media-type': 'github.beta', - 'x-ratelimit-limit': '60', - 'x-ratelimit-remaining': '57', - } - assert r.links['next']['rel'] == 'next' - - def test_cookie_parameters(self): - key = 'some_cookie' - value = 'some_value' - secure = True - domain = 'test.com' - rest = {'HttpOnly': True} - jar = requests.cookies.RequestsCookieJar() - jar.set(key, value, secure=secure, domain=domain, rest=rest) - assert len(jar) == 1 - assert 'some_cookie' in jar - cookie = list(jar)[0] - assert cookie.secure == secure - assert cookie.domain == domain - assert cookie._rest['HttpOnly'] == rest['HttpOnly'] - - def test_cookie_as_dict_keeps_len(self): - key = 'some_cookie' - value = 'some_value' - key1 = 'some_cookie1' - value1 = 'some_value1' - jar = requests.cookies.RequestsCookieJar() - jar.set(key, value) - jar.set(key1, value1) - d1 = dict(jar) - d2 = dict(jar.iteritems()) - d3 = dict(jar.items()) - assert len(jar) == 2 - assert len(d1) == 2 - assert len(d2) == 2 - assert len(d3) == 2 - - def test_cookie_as_dict_keeps_items(self): - key = 'some_cookie' - value = 'some_value' - key1 = 'some_cookie1' - value1 = 'some_value1' - jar = requests.cookies.RequestsCookieJar() - jar.set(key, value) - jar.set(key1, value1) - d1 = dict(jar) - d2 = dict(jar.iteritems()) - d3 = dict(jar.items()) - assert d1['some_cookie'] == 'some_value' - assert d2['some_cookie'] == 'some_value' - assert d3['some_cookie1'] == 'some_value1' - - def test_cookie_as_dict_keys(self): - key = 'some_cookie' - value = 'some_value' - key1 = 'some_cookie1' - value1 = 'some_value1' - jar = requests.cookies.RequestsCookieJar() - jar.set(key, value) - jar.set(key1, value1) - keys = jar.keys() - assert keys == list(keys) - # make sure one can use keys multiple times - assert list(keys) == list(keys) - - def test_cookie_as_dict_values(self): - key = 'some_cookie' - value = 'some_value' - key1 = 'some_cookie1' - value1 = 'some_value1' - jar = requests.cookies.RequestsCookieJar() - jar.set(key, value) - jar.set(key1, value1) - values = jar.values() - assert values == list(values) - # make sure one can use values multiple times - assert list(values) == list(values) - - def test_cookie_as_dict_items(self): - key = 'some_cookie' - value = 'some_value' - key1 = 'some_cookie1' - value1 = 'some_value1' - jar = requests.cookies.RequestsCookieJar() - jar.set(key, value) - jar.set(key1, value1) - items = jar.items() - assert items == list(items) - # make sure one can use items multiple times - assert list(items) == list(items) - - def test_cookie_duplicate_names_different_domains(self): - key = 'some_cookie' - value = 'some_value' - domain1 = 'test1.com' - domain2 = 'test2.com' - jar = requests.cookies.RequestsCookieJar() - jar.set(key, value, domain=domain1) - jar.set(key, value, domain=domain2) - assert key in jar - items = jar.items() - assert len(items) == 2 - # Verify that CookieConflictError is raised if domain is not specified - with pytest.raises(requests.cookies.CookieConflictError): - jar.get(key) - # Verify that CookieConflictError is not raised if domain is specified - cookie = jar.get(key, domain=domain1) - assert cookie == value - - def test_cookie_duplicate_names_raises_cookie_conflict_error(self): - key = 'some_cookie' - value = 'some_value' - path = 'some_path' - jar = requests.cookies.RequestsCookieJar() - jar.set(key, value, path=path) - jar.set(key, value) - with pytest.raises(requests.cookies.CookieConflictError): - jar.get(key) - - def test_cookie_policy_copy(self): - class MyCookiePolicy(cookielib.DefaultCookiePolicy): - pass - - jar = requests.cookies.RequestsCookieJar() - jar.set_policy(MyCookiePolicy()) - assert isinstance(jar.copy().get_policy(), MyCookiePolicy) - - def test_time_elapsed_blank(self, httpbin): - r = requests.get(httpbin('get')) - td = r.elapsed - total_seconds = ( - (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6) / - 10 ** - 6 - ) - assert total_seconds > 0.0 - - def test_empty_response_has_content_none(self): - r = requests.Response() - assert r.content is None - - def test_response_is_iterable(self): - r = requests.Response() - io = StringIO.StringIO('abc') - read_ = io.read - - def read_mock(amt, decode_content=None): - return read_(amt) - - setattr(io, 'read', read_mock) - r.raw = io - assert next(iter(r)) - io.close() - - def test_response_decode_unicode(self): - """When called with decode_unicode, Response.iter_content should always - return unicode. - """ - r = requests.Response() - r._content_consumed = True - r._content = b'the content' - r.encoding = 'ascii' - chunks = r.iter_content(decode_unicode=True) - assert all(isinstance(chunk, str) for chunk in chunks) - # also for streaming - r = requests.Response() - r.raw = io.BytesIO(b'the content') - r.encoding = 'ascii' - chunks = r.iter_content(decode_unicode=True) - assert all(isinstance(chunk, str) for chunk in chunks) - - @pytest.mark.parametrize( - 'encoding, exception', - ((None, TypeError), ('invalid encoding', LookupError)), - ) - def test_decode_unicode_encoding(self, encoding, exception): - # raise an exception if encoding isn't set - r = requests.Response() - r.raw = io.BytesIO(b'the content') - r.encoding = encoding - with pytest.raises(exception): - chunks = r.iter_content(decode_unicode=True) - - def test_response_reason_unicode(self): - # check for unicode HTTP status - r = requests.Response() - r.url = u'unicode URL' - r.reason = u'Komponenttia ei löydy'.encode('utf-8') - r.status_code = 404 - r.encoding = None - assert not r.ok # old behaviour - crashes here - - def test_response_reason_unicode_fallback(self): - # check raise_status falls back to ISO-8859-1 - r = requests.Response() - r.url = 'some url' - reason = u'Komponenttia ei löydy' - r.reason = reason.encode('latin-1') - r.status_code = 500 - r.encoding = None - with pytest.raises(requests.exceptions.HTTPError) as e: - r.raise_for_status() - assert reason in e.value.args[0] - - def test_response_chunk_size_type(self): - """Ensure that chunk_size is passed as None or an integer, otherwise - raise a TypeError. - """ - r = requests.Response() - r.raw = io.BytesIO(b'the content') - chunks = r.iter_content(1) - assert all(len(chunk) == 1 for chunk in chunks) - r = requests.Response() - r.raw = io.BytesIO(b'the content') - chunks = r.iter_content(None) - assert list(chunks) == [b'the content'] - r = requests.Response() - r.raw = io.BytesIO(b'the content') - with pytest.raises(TypeError): - chunks = r.iter_content("1024") - - def test_request_and_response_are_pickleable(self, httpbin): - r = requests.get(httpbin('get')) - # verify we can pickle the original request - assert pickle.loads(pickle.dumps(r.request)) - # verify we can pickle the response and that we have access to - # the original request. - pr = pickle.loads(pickle.dumps(r)) - assert r.request.url == pr.request.url - assert r.request.headers == pr.request.headers - - @pytest.mark.skip(reason="TODO: Doesn't work with __slots__.") - def test_response_lines(self): - """ - iter_lines should be able to handle data dribbling in which delimiters - might not be lined up ideally. - """ - mock_chunks = [ - b'This \r\n', - b'', - b'is\r', - b'\n', - b'a', - b' ', - b'', - b'', - b'test.', - b'\r', - b'\n', - b'end.', - ] - mock_data = b''.join(mock_chunks) - unicode_mock_data = mock_data.decode('utf-8') - - def mock_iter_content(*args, **kwargs): - if kwargs.get("decode_unicode"): - return (e.decode('utf-8') for e in mock_chunks) - - return (e for e in mock_chunks) - - r = requests.Response() - r._content_consumed = True - r.iter_content = mock_iter_content - # decode_unicode=None, output raw bytes - assert list(r.iter_lines(delimiter=b'\r\n')) == mock_data.split( - b'\r\n' - ) - # decode_unicode=True, output unicode strings - assert list( - r.iter_lines(decode_unicode=True, delimiter=u'\r\n') - ) == unicode_mock_data.split( - u'\r\n' - ) - # When delimiter is None, we should yield the same result as splitlines() - # which supports the universal newline. - # '\r', '\n', and '\r\n' are all treated as one line break. - # decode_unicode=None, output raw bytes - result = list(r.iter_lines()) - assert result == mock_data.splitlines() - # decode_unicode=True, output unicode strings - result = list(r.iter_lines(decode_unicode=True)) - assert result == unicode_mock_data.splitlines() - # If we change all the line breaks to `\r`, we should be okay. - # decode_unicode=None, output raw bytes - mock_chunks = [chunk.replace(b'\n', b'\r') for chunk in mock_chunks] - mock_data = b''.join(mock_chunks) - assert list(r.iter_lines()) == mock_data.splitlines() - # decode_unicode=True, output unicode strings - unicode_mock_data = mock_data.decode('utf-8') - assert list( - r.iter_lines(decode_unicode=True) - ) == unicode_mock_data.splitlines( - ) - - @pytest.mark.parametrize( - 'content, expected_no_delimiter, expected_delimiter', - (([b''], [], []), ([b'line\n'], [u'line'], [u'line\n']), ([b'line', b'\n'], [u'line'], [u'line\n']), ([b'line\r\n'], [u'line'], [u'line', u'']), ([b'line\r\n', b''], [u'line'], [u'line', u'']), ([b'line', b'\r\n'], [u'line'], [u'line', u'']), ([b'a\r', b'\nb\r'], [u'a', u'b'], [u'a', u'b\r']), ([b'a\r', b'\n', b'\nb'], [u'a', u'', u'b'], [u'a', u'\nb']), ([b'a\n', b'\nb'], [u'a', u'', u'b'], [u'a\n\nb']), ([b'a\r\n', b'\rb\n'], [u'a', u'', u'b'], [u'a', u'\rb\n']), ([b'a\nb', b'c'], [u'a', u'bc'], [u'a\nbc']), ([b'a\n', b'\rb', b'\r\nc'], [u'a', u'', u'b', u'c'], [u'a\n\rb', u'c']), ([b'a\r\nb', b'', b'c'], [u'a', u'bc'], [u'a', u'bc'])), - # Empty chunk in the end of stream, same behavior as the previous # Empty chunk with pending data - ) - @pytest.mark.skip(reason="TODO: Doesn't work with __slots__") - def test_response_lines_parametrized( - self, content, expected_no_delimiter, expected_delimiter - ): - """ - Test a lot of potential chunk splits to ensure consistency of - iter_lines(delimiter=x), as well as the legacy behavior of - iter_lines() without delimiter - https://github.com/kennethreitz/requests/pull/2431#issuecomment-72333964 - """ - mock_chunks = content - - def mock_iter_content(*args, **kwargs): - if kwargs.get("decode_unicode"): - return (e.decode('utf-8') for e in mock_chunks) - - return (e for e in mock_chunks) - - r = requests.Response() - r._content_consumed = True - r.iter_content = mock_iter_content - # decode_unicode=True, output unicode strings - assert list(r.iter_lines(decode_unicode=True)) == expected_no_delimiter - assert list( - r.iter_lines(decode_unicode=True, delimiter='\r\n') - ) == expected_delimiter - # decode_unicode=None, output raw bytes - assert list(r.iter_lines()) == [ - line.encode('utf-8') for line in expected_no_delimiter - ] - assert list(r.iter_lines(delimiter=b'\r\n')) == [ - line.encode('utf-8') for line in expected_delimiter - ] - - def test_prepared_request_is_pickleable(self, httpbin, s): - p = requests.Request('GET', httpbin('get')).prepare() - # Verify PreparedRequest can be pickled and unpickled - r = pickle.loads(pickle.dumps(p)) - assert r.url == p.url - assert r.headers == p.headers - assert r.body == p.body - # Verify unpickled PreparedRequest sends properly - - resp = s.send(r) - assert resp.status_code == 200 - - def test_prepared_request_with_file_is_pickleable(self, httpbin, s): - files = {'file': open(__file__, 'rb')} - r = requests.Request('POST', httpbin('post'), files=files) - p = r.prepare() - # Verify PreparedRequest can be pickled and unpickled - r = pickle.loads(pickle.dumps(p)) - assert r.url == p.url - assert r.headers == p.headers - assert r.body == p.body - # Verify unpickled PreparedRequest sends properly - - resp = s.send(r) - assert resp.status_code == 200 - - def test_prepared_request_with_hook_is_pickleable(self, httpbin, s): - r = requests.Request('GET', httpbin('get'), hooks=default_hooks()) - p = r.prepare() - # Verify PreparedRequest can be pickled - r = pickle.loads(pickle.dumps(p)) - assert r.url == p.url - assert r.headers == p.headers - assert r.body == p.body - assert r.hooks == p.hooks - # Verify unpickled PreparedRequest sends properly - - resp = s.send(r) - assert resp.status_code == 200 - - def test_cannot_send_unprepared_requests(self, httpbin): - r = requests.Request(url=httpbin()) - with pytest.raises(ValueError): - requests.Session().send(r) - - def test_http_error(self): - error = requests.exceptions.HTTPError() - assert not error.response - response = requests.Response() - error = requests.exceptions.HTTPError(response=response) - assert error.response == response - error = requests.exceptions.HTTPError('message', response=response) - assert str(error) == 'message' - assert error.response == response - - def test_session_pickling(self, httpbin, s): - r = requests.Request('GET', httpbin('get')) - - s = pickle.loads(pickle.dumps(s)) - s.proxies = getproxies() - r = s.send(r.prepare()) - assert r.status_code == 200 - - def test_fixes_1329(self, httpbin, s): - """Ensure that header updates are done case-insensitively.""" - - s.headers.update({'ACCEPT': 'BOGUS'}) - s.headers.update({'accept': 'application/json'}) - r = s.get(httpbin('get')) - headers = r.request.headers - assert headers['accept'] == 'application/json' - assert headers['Accept'] == 'application/json' - assert headers['ACCEPT'] == 'application/json' - - def test_uppercase_scheme_redirect(self, httpbin): - parts = urlparse(httpbin('html')) - url = "HTTP://" + parts.netloc + parts.path - r = requests.get(httpbin('redirect-to'), params={'url': url}) - assert r.status_code == 200 - assert r.url.lower() == url.lower() - - def test_transport_adapter_ordering(self, s): - - order = ['https://', 'http://'] - assert order == list(s.adapters) - s.mount('http://git', HTTPAdapter()) - s.mount('http://github', HTTPAdapter()) - s.mount('http://github.com', HTTPAdapter()) - s.mount('http://github.com/about/', HTTPAdapter()) - order = [ - 'http://github.com/about/', - 'http://github.com', - 'http://github', - 'http://git', - 'https://', - 'http://', - ] - assert order == list(s.adapters) - s.mount('http://gittip', HTTPAdapter()) - s.mount('http://gittip.com', HTTPAdapter()) - s.mount('http://gittip.com/about/', HTTPAdapter()) - order = [ - 'http://github.com/about/', - 'http://gittip.com/about/', - 'http://github.com', - 'http://gittip.com', - 'http://github', - 'http://gittip', - 'http://git', - 'https://', - 'http://', - ] - assert order == list(s.adapters) - s2 = requests.Session() - s2.adapters = {'http://': HTTPAdapter()} - s2.mount('https://', HTTPAdapter()) - assert 'http://' in s2.adapters - assert 'https://' in s2.adapters - - def test_header_remove_is_case_insensitive(self, httpbin, s): - # From issue #1321 - s.headers['foo'] = 'bar' - r = s.get(httpbin('get'), headers={'FOO': None}) - assert 'foo' not in r.request.headers - - def test_params_are_merged_case_sensitive(self, httpbin, s): - s.params['foo'] = 'bar' - r = s.get(httpbin('get'), params={'FOO': 'bar'}) - assert r.json()['args'] == {'foo': 'bar', 'FOO': 'bar'} - - def test_long_authinfo_in_url(self): - url = 'http://{}:{}@{}:9000/path?query#frag'.format( - 'E8A3BE87-9E3F-4620-8858-95478E385B5B', - 'EA770032-DA4D-4D84-8CE9-29C6D910BF1E', - 'exactly-------------sixty-----------three------------characters', - ) - r = requests.Request('GET', url).prepare() - assert r.url == url - - def test_header_keys_are_native(self, httpbin): - headers = {u('unicode'): 'blah', 'byte'.encode('ascii'): 'blah'} - r = requests.Request('GET', httpbin('get'), headers=headers) - p = r.prepare() - # This is testing that they are builtin strings. A bit weird, but there - # we go. - assert 'unicode' in p.headers.keys() - assert 'byte' in p.headers.keys() - - def test_header_validation(self, httpbin): - """Ensure prepare_headers regex isn't flagging valid header contents.""" - headers_ok = { - 'foo': 'bar baz qux', 'bar': 'fbbq', 'baz': '', 'qux': '1' - } - r = requests.get(httpbin('get'), headers=headers_ok) - assert r.request.headers['foo'] == headers_ok['foo'] - - def test_header_value_not_str(self, httpbin): - """Ensure the header value is of type string or bytes as - per discussion in GH issue #3386 - """ - headers_int = {'foo': 3} - headers_dict = {'bar': {'foo': 'bar'}} - headers_list = {'baz': ['foo', 'bar']} - # Test for int - with pytest.raises(InvalidHeader) as excinfo: - r = requests.get(httpbin('get'), headers=headers_int) - assert 'foo' in str(excinfo.value) - # Test for dict - with pytest.raises(InvalidHeader) as excinfo: - r = requests.get(httpbin('get'), headers=headers_dict) - assert 'bar' in str(excinfo.value) - # Test for list - with pytest.raises(InvalidHeader) as excinfo: - r = requests.get(httpbin('get'), headers=headers_list) - assert 'baz' in str(excinfo.value) - - def test_header_no_return_chars(self, httpbin): - """Ensure that a header containing return character sequences raise an - exception. Otherwise, multiple headers are created from single string. - """ - headers_ret = {'foo': 'bar\r\nbaz: qux'} - headers_lf = {'foo': 'bar\nbaz: qux'} - headers_cr = {'foo': 'bar\rbaz: qux'} - # Test for newline - with pytest.raises(InvalidHeader): - r = requests.get(httpbin('get'), headers=headers_ret) - # Test for line feed - with pytest.raises(InvalidHeader): - r = requests.get(httpbin('get'), headers=headers_lf) - # Test for carriage return - with pytest.raises(InvalidHeader): - r = requests.get(httpbin('get'), headers=headers_cr) - - def test_header_no_leading_space(self, httpbin): - """Ensure headers containing leading whitespace raise - InvalidHeader Error before sending. - """ - headers_space = {'foo': ' bar'} - headers_tab = {'foo': ' bar'} - # Test for whitespace - with pytest.raises(InvalidHeader): - r = requests.get(httpbin('get'), headers=headers_space) - # Test for tab - with pytest.raises(InvalidHeader): - r = requests.get(httpbin('get'), headers=headers_tab) - - @pytest.mark.parametrize('files', ('foo', b'foo', bytearray(b'foo'))) - def test_can_send_objects_with_files(self, httpbin, files): - data = {'a': 'this is a string'} - files = {'b': files} - r = requests.Request('POST', httpbin('post'), data=data, files=files) - p = r.prepare() - assert 'multipart/form-data' in p.headers['Content-Type'] - - def test_can_send_file_object_with_non_string_filename(self, httpbin): - f = io.BytesIO() - f.name = 2 - r = requests.Request('POST', httpbin('post'), files={'f': f}) - p = r.prepare() - assert 'multipart/form-data' in p.headers['Content-Type'] - - def test_autoset_header_values_are_native(self, httpbin): - data = 'this is a string' - length = '16' - req = requests.Request('POST', httpbin('post'), data=data) - p = req.prepare() - assert p.headers['Content-Length'] == length - - def test_nonhttp_schemes_dont_check_URLs(self): - test_urls = ( - 'data:image/gif;base64,R0lGODlhAQABAHAAACH5BAUAAAAALAAAAAABAAEAAAICRAEAOw==', - 'file:///etc/passwd', - 'magnet:?xt=urn:btih:be08f00302bc2d1d3cfa3af02024fa647a271431', - ) - for test_url in test_urls: - req = requests.Request('GET', test_url) - preq = req.prepare() - assert test_url == preq.url - - def test_auth_is_stripped_on_http_downgrade(self, httpbin, httpbin_secure, httpbin_ca_bundle): - r = requests.get( - httpbin_secure('redirect-to'), - params={'url': httpbin('get')}, - auth=('user', 'pass'), - verify=httpbin_ca_bundle - ) - assert r.history[0].request.headers['Authorization'] - assert 'Authorization' not in r.request.headers - - def test_auth_is_retained_for_redirect_on_host(self, httpbin): - r = requests.get(httpbin('redirect/1'), auth=('user', 'pass')) - h1 = r.history[0].request.headers['Authorization'] - h2 = r.request.headers['Authorization'] - assert h1 == h2 - - def test_manual_redirect_with_partial_body_read(self, httpbin, s): - - req = requests.Request('GET', httpbin('redirect/2')).prepare() - r1 = s.send(req, allow_redirects=False, stream=True) - assert r1.is_redirect - rg = s.resolve_redirects(r1, req, stream=True) - # read only the first eight bytes of the response body, - # then follow the redirect - r1.iter_content(8) - r2 = next(rg) - assert r2.is_redirect - # read all of the response via iter_content, - # then follow the redirect - for _ in r2.iter_content(): - pass - r3 = next(rg) - assert not r3.is_redirect - - def test_prepare_body_position_non_stream(self, s): - data = b'the data' - prep = requests.Request( - 'GET', 'http://example.com', data=data - ).prepare( - ) - assert prep._body_position is None - - def test_rewind_body(self, s): - data = io.BytesIO(b'the data') - prep = requests.Request( - 'GET', 'http://example.com', data=data - ).prepare( - ) - assert prep._body_position == 0 - assert prep.body.read() == b'the data' - # the data has all been read - assert prep.body.read() == b'' - # rewind it back - requests.utils.rewind_body(prep) - assert prep.body.read() == b'the data' - - def test_rewind_partially_read_body(self, s): - data = io.BytesIO(b'the data') - data.read(4) # read some data - prep = requests.Request( - 'GET', 'http://example.com', data=data - ).prepare( - ) - assert prep._body_position == 4 - assert prep.body.read() == b'data' - # the data has all been read - assert prep.body.read() == b'' - # rewind it back - requests.utils.rewind_body(prep) - assert prep.body.read() == b'data' - - def test_rewind_body_no_seek(self, s): - - class BadFileObj: - - def __init__(self, data): - self.data = data - - def tell(self): - return 0 - - def __iter__(self): - return - - data = BadFileObj('the data') - - prep = requests.Request( - 'GET', 'http://example.com', data=data - ).prepare( - ) - assert prep._body_position == 0 - with pytest.raises(UnrewindableBodyError) as e: - requests.utils.rewind_body(prep) - assert 'Unable to rewind request body' in str(e) - - def test_rewind_body_failed_seek(self, s): - - class BadFileObj: - - def __init__(self, data): - self.data = data - - def tell(self): - return 0 - - def seek(self, pos, whence=0): - raise OSError() - - def __iter__(self): - return - - data = BadFileObj('the data') - prep = requests.Request( - 'GET', 'http://example.com', data=data - ).prepare( - ) - assert prep._body_position == 0 - with pytest.raises(UnrewindableBodyError) as e: - requests.utils.rewind_body(prep) - assert 'error occurred when rewinding request body' in str(e) - - def test_rewind_body_failed_tell(self, s): - - class BadFileObj: - - def __init__(self, data): - self.data = data - - def tell(self): - raise OSError() - - def __iter__(self): - return - - data = BadFileObj('the data') - prep = requests.Request( - 'GET', 'http://example.com', data=data - ).prepare( - ) - assert prep._body_position is not None - with pytest.raises(UnrewindableBodyError) as e: - requests.utils.rewind_body(prep) - assert 'Unable to rewind request body' in str(e) - - def _patch_adapter_gzipped_redirect(self, session, url): - adapter = session.get_adapter(url=url) - org_build_response = adapter.build_response - self._patched_response = False - - def build_response(*args, **kwargs): - resp = org_build_response(*args, **kwargs) - if not self._patched_response: - resp.raw.headers['content-encoding'] = 'gzip' - self._patched_response = True - return resp - - adapter.build_response = build_response - - def test_redirect_with_wrong_gzipped_header(self, httpbin, s): - url = httpbin('redirect/1') - self._patch_adapter_gzipped_redirect(s, url) - s.get(url) - - @pytest.mark.parametrize( - 'username, password, auth_str', - ( - ('test', 'test', 'Basic dGVzdDp0ZXN0'), - ( - u'имя'.encode('utf-8'), - u'пароль'.encode('utf-8'), - 'Basic 0LjQvNGPOtC/0LDRgNC+0LvRjA==', - ), - ), - ) - def test_basic_auth_str_is_always_native( - self, username, password, auth_str - ): - s = _basic_auth_str(username, password) - assert isinstance(s, builtin_str) - assert s == auth_str - - def test_requests_history_is_saved(self, httpbin): - r = requests.get(httpbin('redirect/5')) - total = r.history[-1].history - i = 0 - for item in r.history: - assert item.history == total[0:i] - i += 1 - - def test_json_param_post_content_type_works(self, httpbin): - r = requests.post(httpbin('post'), json={'life': 42}) - assert r.status_code == 200 - assert 'application/json' in r.request.headers['Content-Type'] - assert {'life': 42} == r.json()['json'] - - def test_json_param_post_should_not_override_data_param(self, httpbin): - r = requests.Request( - method='POST', - url=httpbin('post'), - data={'stuff': 'elixr'}, - json={'music': 'flute'}, - ) - prep = r.prepare() - assert 'stuff=elixr' == prep.body - - @pytest.mark.parametrize('decode_unicode', (True, False)) - def test_response_iter_lines(self, httpbin, decode_unicode): - r = requests.get(httpbin('stream/4'), stream=True) - assert r.status_code == 200 - r.encoding = 'utf-8' - it = r.iter_lines(decode_unicode=decode_unicode) - next(it) - assert len(list(it)) == 3 - - def test_response_context_manager(self, httpbin): - with requests.get(httpbin('stream/4'), stream=True) as response: - assert isinstance(response, requests.Response) - assert response.raw.closed - - def test_unconsumed_session_response_closes_connection(self, httpbin, s): - with contextlib.closing( - s.get(httpbin('stream/4'), stream=True) - ) as response: - pass - assert response._content_consumed is False - assert response.raw.closed - - @pytest.mark.xfail - def test_response_iter_lines_reentrant(self, httpbin): - """Response.iter_lines() is not reentrant safe""" - r = requests.get(httpbin('stream/4'), stream=True) - assert r.status_code == 200 - next(r.iter_lines()) - assert len(list(r.iter_lines())) == 3 - - def test_environment_comes_after_session(self, httpbin, s): - """The Session arguments should come before environment arguments.""" - # We get proxies from the environment and verify from the argument. - a = SendRecordingAdapter() - s.mount('http://', a) - # Both of these arguments are safe fallbacks that we can easily - # detect, but which will allow the request to succeed. - s.verify = False - s.proxies = {'http': None} - old_proxy = os.environ.get('HTTP_PROXY') - old_bundle = os.environ.get('REQUESTS_CA_BUNDLE') - try: - os.environ['HTTP_PROXY'] = '10.10.10.10:3128' - os.environ['REQUESTS_CA_BUNDLE'] = '/path/to/nowhere' - s.get(httpbin('get'), timeout=5) - finally: - if old_proxy is not None: - os.environ['HTTP_PROXY'] = old_proxy - else: - del os.environ['HTTP_PROXY'] - if old_bundle is not None: - os.environ['REQUESTS_CA_BUNDLE'] = old_bundle - else: - del os.environ['REQUESTS_CA_BUNDLE'] - call = a.send_calls[0] - assert call[1]['verify'] == False - proxies = call[1]['proxies'] - with pytest.raises(KeyError): - proxies['http'] - - @pytest.fixture(autouse=True) - def test_merge_environment_settings_verify(self, monkeypatch, s): - """Assert CA environment settings are merged as expected when missing""" - monkeypatch.delenv('CURL_CA_BUNDLE', raising=False) - monkeypatch.delenv('REQUESTS_CA_BUNDLE', raising=False) - assert s.trust_env is True - assert s.verify is True - assert 'REQUESTS_CA_BUNDLE' not in os.environ - assert 'CURL_CA_BUNDLE' not in os.environ - merged_settings = s.merge_environment_settings( - 'http://example.com', {}, False, True, None - ) - assert merged_settings['verify'] is True - - def test_session_close_proxy_clear(self, mocker, s): - proxies = {'one': mocker.Mock(), 'two': mocker.Mock()} - - mocker.patch.dict(s.adapters['http://'].proxy_manager, proxies) - s.close() - proxies['one'].clear.assert_called_once_with() - proxies['two'].clear.assert_called_once_with() - - def test_proxy_auth(self): - adapter = HTTPAdapter() - headers = adapter.proxy_headers("http://user:pass@httpbin.org") - assert headers == {'Proxy-Authorization': 'Basic dXNlcjpwYXNz'} - - def test_proxy_auth_empty_pass(self): - adapter = HTTPAdapter() - headers = adapter.proxy_headers("http://user:@httpbin.org") - assert headers == {'Proxy-Authorization': 'Basic dXNlcjo='} - - def test_response_json_when_content_is_None(self, httpbin): - r = requests.get(httpbin('/status/204')) - # Make sure r.content is None - r.status_code = 0 - r._content = False - r._content_consumed = False - assert r.content is None - with pytest.raises(ValueError): - r.json() - - def test_response_without_release_conn(self): - """Test `close` call for non-urllib3-like raw objects. - Should work when `release_conn` attr doesn't exist on `response.raw`. - """ - resp = requests.Response() - resp.raw = StringIO.StringIO('test') - assert not resp.raw.closed - resp.close() - assert resp.raw.closed - - def test_updating_ca_cert(self, httpbin_secure, s): - """Assert that requests use the latest configured CA certificates.""" - s.verify = pytest_httpbin.certs.where() - s.get(httpbin_secure('/')) - s.verify = True - with pytest.raises(requests.exceptions.SSLError) as e: - s.get(httpbin_secure('/')) - assert 'certificate verify failed' in str(e) - - def test_updating_client_cert(self, httpbin_secure, s): - """Assert that requests use the latest configured client certificates.""" - ca_file = pytest_httpbin.certs.where() - cert_dir = os.path.dirname(ca_file) - # All we need is a valid certificate and key to make a request. httpbin_secure - # won't check the signature or subject name, so it's okay that these happen to - # be the server's certificate and key. - cert = os.path.join(cert_dir, 'cert.pem') - key = os.path.join(cert_dir, 'key.pem') - - s.verify = ca_file - resp = s.get(httpbin_secure('/')) - resp_with_cert = s.get(httpbin_secure('/'), cert=(cert, key)) - assert resp_with_cert.raw._pool.cert_file == cert - assert resp_with_cert.raw._pool.key_file == key - assert resp.raw._pool is not resp_with_cert.raw._pool - - def test_empty_stream_with_auth_does_not_set_content_length_header( - self, httpbin - ): - """Ensure that a byte stream with size 0 will not set both a Content-Length - and Transfer-Encoding header. - """ - auth = ('user', 'pass') - url = httpbin('post') - file_obj = io.BytesIO(b'') - r = requests.Request('POST', url, auth=auth, data=file_obj) - prepared_request = r.prepare() - assert 'Transfer-Encoding' in prepared_request.headers - assert 'Content-Length' not in prepared_request.headers - - def test_stream_with_auth_does_not_set_transfer_encoding_header( - self, httpbin - ): - """Ensure that a byte stream with size > 0 will not set both a Content-Length - and Transfer-Encoding header. - """ - auth = ('user', 'pass') - url = httpbin('post') - file_obj = io.BytesIO(b'test data') - r = requests.Request('POST', url, auth=auth, data=file_obj) - prepared_request = r.prepare() - assert 'Transfer-Encoding' not in prepared_request.headers - assert 'Content-Length' in prepared_request.headers - - def test_chunked_upload_does_not_set_content_length_header(self, httpbin): - """Ensure that requests with a generator body stream using - Transfer-Encoding: chunked, not a Content-Length header. - """ - data = (i for i in [b'a', b'b', b'c']) - url = httpbin('post') - r = requests.Request('POST', url, data=data) - prepared_request = r.prepare() - assert 'Transfer-Encoding' in prepared_request.headers - assert 'Content-Length' not in prepared_request.headers - - def test_chunked_upload_with_manually_set_content_length_header_raises_error( - self, httpbin - ): - """Ensure that if a user manually sets a content length header, when - the data is chunked, that an InvalidHeader error is raised. - """ - data = (i for i in [b'a', b'b', b'c']) - url = httpbin('post') - with pytest.raises(InvalidHeader): - r = requests.post( - url, data=data, headers={'Content-Length': 'foo'} - ) - - def test_content_length_with_manually_set_transfer_encoding_raises_error( - self, httpbin - ): - """Ensure that if a user manually sets a Transfer-Encoding header when - data is not chunked that an InvalidHeader error is raised. - """ - data = 'test data' - url = httpbin('post') - with pytest.raises(InvalidHeader): - r = requests.post( - url, data=data, headers={'Transfer-Encoding': 'chunked'} - ) - - def test_null_body_does_not_raise_error(self, httpbin): - url = httpbin('post') - try: - requests.post(url, data=None) - except InvalidHeader: - pytest.fail('InvalidHeader error raised unexpectedly.') - - @pytest.mark.parametrize( - 'body, expected', - ( - (None, ('Content-Length', '0')), - ('test_data', ('Content-Length', '9')), - (io.BytesIO(b'test_data'), ('Content-Length', '9')), - (StringIO.StringIO(''), ('Transfer-Encoding', 'chunked')), - ), - ) - def test_prepare_content_length(self, httpbin, body, expected): - """Test prepare_content_length creates expected header.""" - prep = requests.PreparedRequest() - prep.headers = {} - prep.method = 'POST' - # Ensure Content-Length is set appropriately. - key, value = expected - prep.prepare_content_length(body) - assert prep.headers[key] == value - - def test_prepare_content_length_with_bad_body(self, httpbin): - """Test prepare_content_length raises exception with unsendable body.""" - # Initialize minimum required PreparedRequest. - prep = requests.PreparedRequest() - prep.headers = {} - prep.method = 'POST' - with pytest.raises(InvalidBodyError) as e: - # Send object that isn't iterable and has no accessible content. - prep.prepare_content_length(object()) - assert "Non-null body must have length or be streamable." in str(e) - - def test_custom_redirect_mixin(self, httpbin): - """Tests a custom mixin to overwrite ``get_redirect_target``. - - Ensures a subclassed ``requests.Session`` can handle a certain type of - malformed redirect responses. - - 1. original request receives a proper response: 302 redirect - 2. following the redirect, a malformed response is given: - status code = HTTP 200 - location = alternate url - 3. the custom session catches the edge case and follows the redirect - """ - url_final = httpbin('html') - querystring_malformed = urlencode({'location': url_final}) - url_redirect_malformed = httpbin( - 'response-headers?%s' % querystring_malformed - ) - querystring_redirect = urlencode({'url': url_redirect_malformed}) - url_redirect = httpbin('redirect-to?%s' % querystring_redirect) - urls_test = [url_redirect, url_redirect_malformed, url_final] - - class CustomRedirectSession(requests.Session): - - def get_redirect_target(self, resp): - # default behavior - if resp.is_redirect: - return resp.headers['location'] - - # edge case - check to see if 'location' is in headers anyways - location = resp.headers.get('location') - if location and (location != resp.url): - return location - - return None - - session = CustomRedirectSession() - r = session.get(urls_test[0]) - assert len(r.history) == 2 - assert r.status_code == 200 - assert r.history[0].status_code == 302 - assert r.history[0].is_redirect - assert r.history[1].status_code == 200 - assert not r.history[1].is_redirect - assert r.url == urls_test[2] - - def test_multiple_response_headers_with_same_name_same_case(self, httpbin): - qs = 'Fruit=Apple&Fruit=Blood+Orange&Fruit=Banana&Fruit=Berry,+Blue' - resp = requests.get(httpbin('response-headers?' + qs)) - fruits = resp.headers['fruit'] - assert fruits == 'Apple, Blood Orange, Banana, Berry, Blue' - # As we are using HTTPHeaderDict, we should be able to extract the - # individual header values too. - assert resp.headers.getlist('fruit') == [ - 'Apple', 'Blood Orange', 'Banana', 'Berry, Blue' - ] - - def test_multiple_response_headers_with_same_name_diff_case(self, httpbin): - # urllib3 seems to have trouble guaranteeing the order of the items when - # the case is different, so we just need to make sure all of the items - # are there, rather than asserting a particular order. - qs = 'Fruit=Apple&Fruit=Blood+Orange&Fruit=Banana&Fruit=Berry,+Blue' - resp = requests.get(httpbin('response-headers?' + qs)) - # These are all possible acceptable combinations for the header. - fruit_choices = ['Apple', 'Blood Orange', 'Banana', 'Berry, Blue'] - fruit_permutations = itertools.permutations(fruit_choices) - fruit_multiheaders = [list(fp) for fp in fruit_permutations] - fruit_headers = set(', '.join(fp) for fp in fruit_multiheaders) - assert resp.headers['fruit'] in fruit_headers - # As we are using HTTPHeaderDict, we should be able to extract the - # individual header values too. - assert resp.headers.getlist('fruit') in fruit_multiheaders - - -class TestCaseInsensitiveDict: - - @pytest.mark.parametrize( - 'cid', - ( - CaseInsensitiveDict({'Foo': 'foo', 'BAr': 'bar'}), - CaseInsensitiveDict([('Foo', 'foo'), ('BAr', 'bar')]), - CaseInsensitiveDict(FOO='foo', BAr='bar'), - ), - ) - def test_init(self, cid): - assert len(cid) == 2 - assert 'foo' in cid - assert 'bar' in cid - - def test_docstring_example(self): - cid = CaseInsensitiveDict() - cid['Accept'] = 'application/json' - assert cid['aCCEPT'] == 'application/json' - assert list(cid) == ['Accept'] - - def test_len(self): - cid = CaseInsensitiveDict({'a': 'a', 'b': 'b'}) - cid['A'] = 'a' - assert len(cid) == 2 - - def test_getitem(self): - cid = CaseInsensitiveDict({'Spam': 'blueval'}) - assert cid['spam'] == 'blueval' - assert cid['SPAM'] == 'blueval' - - def test_fixes_649(self): - """__setitem__ should behave case-insensitively.""" - cid = CaseInsensitiveDict() - cid['spam'] = 'oneval' - cid['Spam'] = 'twoval' - cid['sPAM'] = 'redval' - cid['SPAM'] = 'blueval' - assert cid['spam'] == 'blueval' - assert cid['SPAM'] == 'blueval' - assert list(cid.keys()) == ['SPAM'] - - def test_delitem(self): - cid = CaseInsensitiveDict() - cid['Spam'] = 'someval' - del cid['sPam'] - assert 'spam' not in cid - assert len(cid) == 0 - - def test_contains(self): - cid = CaseInsensitiveDict() - cid['Spam'] = 'someval' - assert 'Spam' in cid - assert 'spam' in cid - assert 'SPAM' in cid - assert 'sPam' in cid - assert 'notspam' not in cid - - def test_get(self): - cid = CaseInsensitiveDict() - cid['spam'] = 'oneval' - cid['SPAM'] = 'blueval' - assert cid.get('spam') == 'blueval' - assert cid.get('SPAM') == 'blueval' - assert cid.get('sPam') == 'blueval' - assert cid.get('notspam', 'default') == 'default' - - def test_update(self): - cid = CaseInsensitiveDict() - cid['spam'] = 'blueval' - cid.update({'sPam': 'notblueval'}) - assert cid['spam'] == 'notblueval' - cid = CaseInsensitiveDict({'Foo': 'foo', 'BAr': 'bar'}) - cid.update({'fOO': 'anotherfoo', 'bAR': 'anotherbar'}) - assert len(cid) == 2 - assert cid['foo'] == 'anotherfoo' - assert cid['bar'] == 'anotherbar' - - def test_update_retains_unchanged(self): - cid = CaseInsensitiveDict({'foo': 'foo', 'bar': 'bar'}) - cid.update({'foo': 'newfoo'}) - assert cid['bar'] == 'bar' - - def test_iter(self): - cid = CaseInsensitiveDict({'Spam': 'spam', 'Eggs': 'eggs'}) - keys = frozenset(['Spam', 'Eggs']) - assert frozenset(iter(cid)) == keys - - def test_equality(self): - cid = CaseInsensitiveDict({'SPAM': 'blueval', 'Eggs': 'redval'}) - othercid = CaseInsensitiveDict({'spam': 'blueval', 'eggs': 'redval'}) - assert cid == othercid - del othercid['spam'] - assert cid != othercid - assert cid == {'spam': 'blueval', 'eggs': 'redval'} - assert cid != object() - - def test_setdefault(self): - cid = CaseInsensitiveDict({'Spam': 'blueval'}) - assert cid.setdefault('spam', 'notblueval') == 'blueval' - assert cid.setdefault('notspam', 'notblueval') == 'notblueval' - - def test_lower_items(self): - cid = CaseInsensitiveDict( - {'Accept': 'application/json', 'user-Agent': 'requests'} - ) - keyset = frozenset(lowerkey for lowerkey, v in cid.lower_items()) - lowerkeyset = frozenset(['accept', 'user-agent']) - assert keyset == lowerkeyset - - def test_preserve_key_case(self): - cid = CaseInsensitiveDict( - {'Accept': 'application/json', 'user-Agent': 'requests'} - ) - keyset = frozenset(['Accept', 'user-Agent']) - assert frozenset(i[0] for i in cid.items()) == keyset - assert frozenset(cid.keys()) == keyset - assert frozenset(cid) == keyset - - def test_preserve_last_key_case(self): - cid = CaseInsensitiveDict( - {'Accept': 'application/json', 'user-Agent': 'requests'} - ) - cid.update({'ACCEPT': 'application/json'}) - cid['USER-AGENT'] = 'requests' - keyset = frozenset(['ACCEPT', 'USER-AGENT']) - assert frozenset(i[0] for i in cid.items()) == keyset - assert frozenset(cid.keys()) == keyset - assert frozenset(cid) == keyset - - def test_copy(self): - cid = CaseInsensitiveDict( - {'Accept': 'application/json', 'user-Agent': 'requests'} - ) - cid_copy = cid.copy() - assert cid == cid_copy - cid['changed'] = True - assert cid != cid_copy - - def test_url_surrounding_whitespace(self, httpbin): - """Test case with URLs surrounded by whitespace characters.""" - get_url = httpbin('get') - # All surrounding whitespaces are supposed to be ignored: - assert requests.get(get_url + ' ').status_code == 200 - assert requests.get(' ' + get_url).status_code == 200 - assert requests.get(get_url + ' \t ').status_code == 200 - assert requests.get(' \t' + get_url).status_code == 200 - assert requests.get(get_url + '\n').status_code == 200 - # The whitespaces can't be in the middle of the URL though: - assert requests.get(get_url + ' abc').status_code == 404 - - -class TestMorselToCookieExpires: - """Tests for morsel_to_cookie when morsel contains expires.""" - - def test_expires_valid_str(self): - """Test case where we convert expires from string time.""" - morsel = Morsel() - morsel['expires'] = 'Thu, 01-Jan-1970 00:00:01 GMT' - cookie = morsel_to_cookie(morsel) - assert cookie.expires == 1 - - @pytest.mark.parametrize( - 'value, exception', ((100, TypeError), ('woops', ValueError)) - ) - def test_expires_invalid_int(self, value, exception): - """Test case where an invalid type is passed for expires.""" - morsel = Morsel() - morsel['expires'] = value - with pytest.raises(exception): - morsel_to_cookie(morsel) - - def test_expires_none(self): - """Test case where expires is None.""" - morsel = Morsel() - morsel['expires'] = None - cookie = morsel_to_cookie(morsel) - assert cookie.expires is None - - -class TestMorselToCookieMaxAge: - """Tests for morsel_to_cookie when morsel contains max-age.""" - - def test_max_age_valid_int(self): - """Test case where a valid max age in seconds is passed.""" - morsel = Morsel() - morsel['max-age'] = 60 - cookie = morsel_to_cookie(morsel) - assert isinstance(cookie.expires, int) - - def test_max_age_invalid_str(self): - """Test case where a invalid max age is passed.""" - morsel = Morsel() - morsel['max-age'] = 'woops' - with pytest.raises(TypeError): - morsel_to_cookie(morsel) - - -class TestTimeout: - - def test_stream_timeout(self, httpbin): - try: - requests.get(httpbin('delay/10'), timeout=2.0) - except requests.exceptions.Timeout as e: - assert 'Read timed out' in e.args[0].args[0] - - @pytest.mark.parametrize( - 'timeout, error_text', - ( - ((3, 4, 5), '(connect, read)'), - ('foo', 'must be an int, float or None'), - ), - ) - def test_invalid_timeout(self, httpbin, timeout, error_text): - with pytest.raises(ValueError) as e: - requests.get(httpbin('get'), timeout=timeout) - assert error_text in str(e) - - @pytest.mark.parametrize( - 'timeout', (None, Urllib3Timeout(connect=None, read=None)) - ) - def test_none_timeout(self, httpbin, timeout): - """Check that you can set None as a valid timeout value. - - To actually test this behavior, we'd want to check that setting the - timeout to None actually lets the request block past the system default - timeout. However, this would make the test suite unbearably slow. - Instead we verify that setting the timeout to None does not prevent the - request from succeeding. - """ - r = requests.get(httpbin('get'), timeout=timeout) - assert r.status_code == 200 - - @pytest.mark.parametrize( - 'timeout', ((None, 0.1), Urllib3Timeout(connect=None, read=0.1)) - ) - def test_read_timeout(self, httpbin, timeout): - try: - requests.get(httpbin('delay/10'), timeout=timeout) - pytest.fail('The recv() request should time out.') - except ReadTimeout: - pass - - @pytest.mark.parametrize( - 'timeout', ((0.1, None), Urllib3Timeout(connect=0.1, read=None)) - ) - def test_connect_timeout(self, timeout): - try: - requests.get(TARPIT, timeout=timeout) - pytest.fail('The connect() request should time out.') - except ConnectTimeout as e: - assert isinstance(e, ConnectionError) - assert isinstance(e, Timeout) - - @pytest.mark.parametrize( - 'timeout', ((0.1, 0.1), Urllib3Timeout(connect=0.1, read=0.1)) - ) - def test_total_timeout_connect(self, timeout): - try: - requests.get(TARPIT, timeout=timeout) - pytest.fail('The connect() request should time out.') - except ConnectTimeout: - pass - - def test_encoded_methods(self, httpbin): - """See: https://github.com/requests/requests/issues/2316""" - r = requests.request(b'GET', httpbin('get')) - assert r.ok - - -SendCall = collections.namedtuple('SendCall', ('args', 'kwargs')) - - -class RedirectSession(SessionRedirectMixin): - - def __init__(self, order_of_redirects): - self.redirects = order_of_redirects - self.calls = [] - self.max_redirects = 30 - self.cookies = {} - self.trust_env = False - self.location = '/' - - def send(self, *args, **kwargs): - self.calls.append(SendCall(args, kwargs)) - return self.build_response() - - def build_response(self): - request = self.calls[-1].args[0] - r = requests.Response() - try: - r.status_code = int(self.redirects.pop(0)) - except IndexError: - r.status_code = 200 - r.headers = CaseInsensitiveDict({'Location': self.location}) - r.raw = self._build_raw() - r.request = request - return r - - def _build_raw(self): - string = StringIO.StringIO('') - setattr(string, 'release_conn', lambda *args: args) - return string - - -def test_json_encodes_as_bytes(): - # urllib3 expects bodies as bytes-like objects - body = {"key": "value"} - p = PreparedRequest() - p.prepare(method='GET', url='https://www.example.com/', json=body) - assert isinstance(p.body, bytes) - - -def test_requests_are_updated_each_time(httpbin): - session = RedirectSession([303, 307]) - prep = requests.Request('POST', httpbin('post')).prepare() - r0 = session.send(prep) - assert r0.request.method == 'POST' - assert session.calls[-1] == SendCall((r0.request,), {}) - redirect_generator = session.resolve_redirects(r0, prep) - default_keyword_args = { - 'stream': False, - 'verify': True, - 'cert': None, - 'timeout': None, - 'allow_redirects': False, - 'proxies': {}, - } - for response in redirect_generator: - assert response.request.method == 'GET' - send_call = SendCall((response.request,), default_keyword_args) - assert session.calls[-1] == send_call - - -@pytest.mark.parametrize( - "var,url,proxy", - [ - ('http_proxy', 'http://example.com', 'socks5://proxy.com:9876'), - ('https_proxy', 'https://example.com', 'socks5://proxy.com:9876'), - ('all_proxy', 'http://example.com', 'socks5://proxy.com:9876'), - ('all_proxy', 'https://example.com', 'socks5://proxy.com:9876'), - ], -) -def test_proxy_env_vars_override_default(var, url, proxy, s): - - prep = PreparedRequest() - prep.prepare(method='GET', url=url) - kwargs = {var: proxy} - scheme = urlparse(url).scheme - with override_environ(**kwargs): - proxies = s.rebuild_proxies(prep, {}) - assert scheme in proxies - assert proxies[scheme] == proxy - - -@pytest.mark.parametrize( - 'data', - ( - (('a', 'b'), ('c', 'd')), - (('c', 'd'), ('a', 'b')), - (('a', 'b'), ('c', 'd'), ('e', 'f')), - ), -) -def test_data_argument_accepts_tuples(data): - """Ensure that the data argument will accept tuples of strings - and properly encode them. - """ - p = PreparedRequest() - p.prepare( - method='GET', - url='http://www.example.com', - data=data, - hooks=default_hooks(), - ) - assert p.body == urlencode(data) - - -@pytest.mark.parametrize( - 'kwargs', - ( - None, - { - 'method': 'GET', - 'url': 'http://www.example.com', - 'data': 'foo=bar', - 'hooks': default_hooks(), - }, - { - 'method': 'GET', - 'url': 'http://www.example.com', - 'data': 'foo=bar', - 'hooks': default_hooks(), - 'cookies': {'foo': 'bar'}, - }, - {'method': 'GET', 'url': u('http://www.example.com/üniçø∂é')}, - ), -) -def test_prepared_copy(kwargs): - p = PreparedRequest() - if kwargs: - p.prepare(**kwargs) - copy = p.copy() - for attr in ('method', 'url', 'headers', '_cookies', 'body', 'hooks'): - assert getattr(p, attr) == getattr(copy, attr) - - -def test_prepare_requires_a_request_method(): - req = requests.Request() - with pytest.raises(ValueError): - req.prepare() - prepped = PreparedRequest() - with pytest.raises(ValueError): - prepped.prepare() - - -def test_urllib3_retries(httpbin, s): - from urllib3.util import Retry - - s.mount( - 'http://', - HTTPAdapter(max_retries=Retry(total=2, status_forcelist=[500])), - ) - with pytest.raises(RetryError): - s.get(httpbin('status/500')) - - -def test_urllib3_pool_connection_closed(httpbin, s): - s.mount('http://', HTTPAdapter(pool_connections=0, pool_maxsize=0)) - try: - s.get(httpbin('status/200')) - except ConnectionError as e: - assert u"Pool is closed." in str(e) - - -class TestPreparingURLs(object): - - @pytest.mark.parametrize( - 'url,expected', - ( - ('http://google.com', 'http://google.com/'), - (u'http://ジェーピーニック.jp', u'http://xn--hckqz9bzb1cyrb.jp/'), - (u'http://xn--n3h.net/', u'http://xn--n3h.net/'), - ( - u'http://ジェーピーニック.jp'.encode('utf-8'), - u'http://xn--hckqz9bzb1cyrb.jp/', - ), - ( - u'http://straße.de/straße', - u'http://xn--strae-oqa.de/stra%C3%9Fe', - ), - ( - u'http://straße.de/straße'.encode('utf-8'), - u'http://xn--strae-oqa.de/stra%C3%9Fe', - ), - ( - u'http://Königsgäßchen.de/straße', - u'http://xn--knigsgchen-b4a3dun.de/stra%C3%9Fe', - ), - ( - u'http://Königsgäßchen.de/straße'.encode('utf-8'), - u'http://xn--knigsgchen-b4a3dun.de/stra%C3%9Fe', - ), - (b'http://xn--n3h.net/', u'http://xn--n3h.net/'), - ( - b'http://[1200:0000:ab00:1234:0000:2552:7777:1313]:12345/', - u'http://[1200:0000:ab00:1234:0000:2552:7777:1313]:12345/', - ), - ( - u'http://[1200:0000:ab00:1234:0000:2552:7777:1313]:12345/', - u'http://[1200:0000:ab00:1234:0000:2552:7777:1313]:12345/', - ), - ), - ) - def test_preparing_url(self, url, expected): - - def normalize_percent_encode(x): - # Helper function that normalizes equivalent - # percent-encoded bytes before comparisons - for c in re.findall(r'%[a-fA-F0-9]{2}', x): - x = x.replace(c, c.upper()) - return x - - r = requests.Request('GET', url=url) - p = r.prepare() - assert normalize_percent_encode(p.url) == expected - - @pytest.mark.parametrize( - 'url', - ( - b"http://*.google.com", - b"http://*", - u"http://*.google.com", - u"http://*", - u"http://☃.net/", - ), - ) - def test_preparing_bad_url(self, url): - r = requests.Request('GET', url=url) - with pytest.raises(requests.exceptions.InvalidURL): - r.prepare() - - @pytest.mark.parametrize( - 'url, exception', - ( - ('http://localhost:-1', InvalidURL), - ) - ) - def test_redirecting_to_bad_url(self, httpbin, url, exception): - with pytest.raises(exception): - r = requests.get(httpbin('redirect-to'), params={'url': url}) - - @pytest.mark.parametrize( - 'input, expected', - ((b"mailto:user@example.org", u"mailto:user@example.org"), (u"mailto:user@example.org", u"mailto:user@example.org"), (b"data:SSDimaUgUHl0aG9uIQ==", u"data:SSDimaUgUHl0aG9uIQ==")), - # TODO: Bugs in rfc3986, apparently. - # ( - # b"http+unix://%2Fvar%2Frun%2Fsocket/path%7E", - # u"http+unix://%2Fvar%2Frun%2Fsocket/path~", - # ), - # ( - # u"http+unix://%2Fvar%2Frun%2Fsocket/path%7E", - # u"http+unix://%2Fvar%2Frun%2Fsocket/path~", - # ), - ) - def test_url_mutation(self, input, expected): - """ - This test validates that we correctly exclude some URLs from - preparation, and that we handle others. Specifically, it tests that - any URL whose scheme doesn't begin with "http" is left alone, and - those whose scheme *does* begin with "http" are mutated. - """ - r = requests.Request('GET', url=input) - p = r.prepare() - assert p.url == expected - - @pytest.mark.parametrize( - 'input, params, expected', - ((b"mailto:user@example.org", {"key": "value"}, u"mailto:user@example.org"), (u"mailto:user@example.org", {"key": "value"}, u"mailto:user@example.org")), - # TODO: - # ( - # b"http+unix://%2Fvar%2Frun%2Fsocket/path", - # {"key": "value"}, - # u"http+unix://%2Fvar%2Frun%2Fsocket/path?key=value", - # ), - # ( - # u"http+unix://%2Fvar%2Frun%2Fsocket/path", - # {"key": "value"}, - # u"http+unix://%2Fvar%2Frun%2Fsocket/path?key=value", - # ), - ) - def test_parameters_for_nonstandard_schemes(self, input, params, expected): - """ - Setting parameters for nonstandard schemes is allowed if those schemes - begin with "http", and is forbidden otherwise. - """ - r = requests.Request('GET', url=input, params=params) - p = r.prepare() - assert p.url == expected - - -class TestGetConnection(object): - """ - Tests for the :meth:`requests.adapters.HTTPAdapter.get_connection` that assert - the connections are correctly configured. - """ - - @pytest.mark.parametrize( - 'proxies, verify, cert, expected', - ( - ( - {}, - True, - None, - { - 'cert_reqs': 'CERT_REQUIRED', - 'ca_certs': DEFAULT_CA_BUNDLE_PATH, - 'ca_cert_dir': None, - 'cert_file': None, - 'key_file': None, - }, - ), - ( - {}, - False, - None, - { - 'cert_reqs': 'CERT_NONE', - 'ca_certs': None, - 'ca_cert_dir': None, - 'cert_file': None, - 'key_file': None, - }, - ), - ( - {}, - __file__, - None, - { - 'cert_reqs': 'CERT_REQUIRED', - 'ca_certs': __file__, - 'ca_cert_dir': None, - 'cert_file': None, - 'key_file': None, - }, - ), - ( - {}, - os.path.dirname(__file__), - None, - { - 'cert_reqs': 'CERT_REQUIRED', - 'ca_certs': None, - 'ca_cert_dir': os.path.dirname(__file__), - 'cert_file': None, - 'key_file': None, - }, - ), - ( - {}, - True, - None, - { - 'cert_reqs': 'CERT_REQUIRED', - 'ca_certs': DEFAULT_CA_BUNDLE_PATH, - 'ca_cert_dir': None, - 'cert_file': None, - 'key_file': None, - }, - ), - ( - {}, - True, - __file__, - { - 'cert_reqs': 'CERT_REQUIRED', - 'ca_certs': DEFAULT_CA_BUNDLE_PATH, - 'ca_cert_dir': None, - 'cert_file': __file__, - 'key_file': None, - }, - ), - ( - {}, - True, - (__file__, __file__), - { - 'cert_reqs': 'CERT_REQUIRED', - 'ca_certs': DEFAULT_CA_BUNDLE_PATH, - 'ca_cert_dir': None, - 'cert_file': __file__, - 'key_file': __file__, - }, - ), - ( - {}, - True, - (__file__, __file__), - { - 'cert_reqs': 'CERT_REQUIRED', - 'ca_certs': DEFAULT_CA_BUNDLE_PATH, - 'ca_cert_dir': None, - 'cert_file': __file__, - 'key_file': __file__, - }, - ), - ( - { - 'http': 'http://proxy.example.com', - 'https': 'http://proxy.example.com', - }, - True, - None, - { - 'cert_reqs': 'CERT_REQUIRED', - 'ca_certs': DEFAULT_CA_BUNDLE_PATH, - 'ca_cert_dir': None, - 'cert_file': None, - 'key_file': None, - }, - ), - ( - { - 'http': 'http://proxy.example.com', - 'https': 'http://proxy.example.com', - }, - os.path.dirname(__file__), - None, - { - 'cert_reqs': 'CERT_REQUIRED', - 'ca_certs': None, - 'ca_cert_dir': os.path.dirname(__file__), - 'cert_file': None, - 'key_file': None, - }, - ), - ( - { - 'http': 'http://proxy.example.com', - 'https': 'http://proxy.example.com', - }, - __file__, - None, - { - 'cert_reqs': 'CERT_REQUIRED', - 'ca_certs': __file__, - 'ca_cert_dir': None, - 'cert_file': None, - 'key_file': None, - }, - ), - ( - { - 'http': 'http://proxy.example.com', - 'https': 'http://proxy.example.com', - }, - True, - __file__, - { - 'cert_reqs': 'CERT_REQUIRED', - 'ca_certs': DEFAULT_CA_BUNDLE_PATH, - 'ca_cert_dir': None, - 'cert_file': __file__, - 'key_file': None, - }, - ), - ( - { - 'http': 'http://proxy.example.com', - 'https': 'http://proxy.example.com', - }, - True, - (__file__, __file__), - { - 'cert_reqs': 'CERT_REQUIRED', - 'ca_certs': DEFAULT_CA_BUNDLE_PATH, - 'ca_cert_dir': None, - 'cert_file': __file__, - 'key_file': __file__, - }, - ), - ), - ) - def test_get_https_connection(self, proxies, verify, cert, expected): - """Assert connections are configured correctly.""" - adapter = requests.adapters.HTTPAdapter() - connection = adapter.get_connection( - 'https://example.com', proxies=proxies, verify=verify, cert=cert - ) - actual_config = {} - for key, value in connection.__dict__.items(): - if key in expected: - actual_config[key] = value - assert actual_config == expected - - @pytest.mark.parametrize( - 'verify, cert', - ( - ('a/path/that/does/not/exist', None), - (True, 'a/path/that/does/not/exist'), - (True, (__file__, 'a/path/that/does/not/exist')), - (True, ('a/path/that/does/not/exist', __file__)), - ), - ) - def test_cert_files_missing(self, verify, cert): - """ - Assert an IOError is raised when one of the certificate files or - directories can't be found. - """ - adapter = requests.adapters.HTTPAdapter() - with pytest.raises(IOError) as excinfo: - adapter.get_connection( - 'https://example.com', verify=verify, cert=cert - ) - excinfo.match('invalid path: a/path/that/does/not/exist') diff --git a/tests/test_testserver.py b/tests/test_testserver.py deleted file mode 100644 index 74cc0bad..00000000 --- a/tests/test_testserver.py +++ /dev/null @@ -1,155 +0,0 @@ -# -*- coding: utf-8 -*- -import threading -import socket -import time - -import pytest -import requests3 as requests -from tests.testserver.server import Server - - -class TestTestServer: - - def test_basic(self): - """messages are sent and received properly""" - question = b"success?" - answer = b"yeah, success" - - def handler(sock): - text = sock.recv(1000) - assert text == question - sock.sendall(answer) - - with Server(handler) as (host, port): - sock = socket.socket() - sock.connect((host, port)) - sock.sendall(question) - text = sock.recv(1000) - assert text == answer - sock.close() - - def test_server_closes(self): - """the server closes when leaving the context manager""" - with Server.basic_response_server() as (host, port): - sock = socket.socket() - sock.connect((host, port)) - sock.close() - with pytest.raises(socket.error): - new_sock = socket.socket() - new_sock.connect((host, port)) - - def test_text_response(self): - """the text_response_server sends the given text""" - server = Server.text_response_server( - "HTTP/1.1 200 OK\r\n" + "Content-Length: 6\r\n" + "\r\nroflol" - ) - with server as (host, port): - r = requests.get('http://{0}:{1}'.format(host, port)) - assert r.status_code == 200 - assert r.text == u'roflol' - assert r.headers['Content-Length'] == '6' - - def test_basic_response(self): - """the basic response server returns an empty http response""" - with Server.basic_response_server() as (host, port): - r = requests.get('http://{}:{}'.format(host, port)) - assert r.status_code == 200 - assert r.text == u'' - assert r.headers['Content-Length'] == '0' - - def test_basic_waiting_server(self): - """the server waits for the block_server event to be set before closing""" - block_server = threading.Event() - with Server.basic_response_server( - wait_to_close_event=block_server - ) as ( - host, port - ): - sock = socket.socket() - sock.connect((host, port)) - sock.sendall(b'send something') - time.sleep(2.5) - sock.sendall(b'still alive') - block_server.set() # release server block - - def test_multiple_requests(self): - """multiple requests can be served""" - requests_to_handle = 5 - server = Server.basic_response_server( - requests_to_handle=requests_to_handle - ) - with server as (host, port): - server_url = 'http://{}:{}'.format(host, port) - for _ in range(requests_to_handle): - r = requests.get(server_url) - assert r.status_code == 200 - # the (n+1)th request fails - with pytest.raises(requests.exceptions.ConnectionError): - r = requests.get(server_url) - - @pytest.mark.skip( - reason="this fails non-deterministically under pytest-xdist" - ) - def test_request_recovery(self): - """can check the requests content""" - # TODO: figure out why this sometimes fails when using pytest-xdist. - server = Server.basic_response_server(requests_to_handle=2) - first_request = b'put your hands up in the air' - second_request = b'put your hand down in the floor' - with server as address: - sock1 = socket.socket() - sock2 = socket.socket() - sock1.connect(address) - sock1.sendall(first_request) - sock1.close() - sock2.connect(address) - sock2.sendall(second_request) - sock2.close() - assert server.handler_results[0] == first_request - assert server.handler_results[1] == second_request - - def test_requests_after_timeout_are_not_received(self): - """the basic response handler times out when receiving requests""" - server = Server.basic_response_server(request_timeout=1) - with server as address: - sock = socket.socket() - sock.connect(address) - time.sleep(1.5) - sock.sendall(b'hehehe, not received') - sock.close() - assert server.handler_results[0] == b'' - - def test_request_recovery_with_bigger_timeout(self): - """a biggest timeout can be specified""" - server = Server.basic_response_server(request_timeout=3) - data = b'bananadine' - with server as address: - sock = socket.socket() - sock.connect(address) - time.sleep(1.5) - sock.sendall(data) - sock.close() - assert server.handler_results[0] == data - - def test_server_finishes_on_error(self): - """the server thread exits even if an exception exits the context manager""" - server = Server.basic_response_server() - with pytest.raises(Exception): - with server: - raise Exception() - - assert len(server.handler_results) == 0 - - - # if the server thread fails to finish, the test suite will hang - # and get killed by the jenkins timeout. - def test_server_finishes_when_no_connections(self): - """the server thread exits even if there are no connections""" - server = Server.basic_response_server() - with server: - pass - assert len(server.handler_results) == 0 - - -# if the server thread fails to finish, the test suite will hang -# and get killed by the jenkins timeout. diff --git a/tests/test_utils.py b/tests/test_utils.py index 11440cfc..4385a5ba 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -35,6 +35,7 @@ from requests3.http_utils import ( urldefragauth, add_dict_to_cookiejar, set_environ, + extract_zipped_paths ) from requests3._internal_utils import unicode_is_ascii