mirror of
https://github.com/kennethreitz/requests.git
synced 2026-06-05 06:46:15 +00:00
67a7b2e833
#4965 Fix
351 lines
12 KiB
Python
351 lines
12 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import pytest
|
|
import threading
|
|
import requests
|
|
from requests.exceptions import ChunkedEncodingError
|
|
|
|
from tests.testserver.server import Server, consume_socket_content
|
|
|
|
from .utils import override_environ
|
|
|
|
|
|
def test_chunked_upload():
|
|
"""can safely send generators"""
|
|
close_server = threading.Event()
|
|
server = Server.basic_response_server(wait_to_close_event=close_server)
|
|
data = iter([b'a', b'b', b'c'])
|
|
|
|
with server as (host, port):
|
|
url = 'http://{}:{}/'.format(host, port)
|
|
r = requests.post(url, data=data, stream=True)
|
|
close_server.set() # release server block
|
|
|
|
assert r.status_code == 200
|
|
assert r.request.headers['Transfer-Encoding'] == 'chunked'
|
|
|
|
|
|
def test_digestauth_401_count_reset_on_redirect():
|
|
"""Ensure we correctly reset num_401_calls after a successful digest auth,
|
|
followed by a 302 redirect to another digest auth prompt.
|
|
|
|
See https://github.com/psf/requests/issues/1979.
|
|
"""
|
|
text_401 = (b'HTTP/1.1 401 UNAUTHORIZED\r\n'
|
|
b'Content-Length: 0\r\n'
|
|
b'WWW-Authenticate: Digest nonce="6bf5d6e4da1ce66918800195d6b9130d"'
|
|
b', opaque="372825293d1c26955496c80ed6426e9e", '
|
|
b'realm="me@kennethreitz.com", qop=auth\r\n\r\n')
|
|
|
|
text_302 = (b'HTTP/1.1 302 FOUND\r\n'
|
|
b'Content-Length: 0\r\n'
|
|
b'Location: /\r\n\r\n')
|
|
|
|
text_200 = (b'HTTP/1.1 200 OK\r\n'
|
|
b'Content-Length: 0\r\n\r\n')
|
|
|
|
expected_digest = (b'Authorization: Digest username="user", '
|
|
b'realm="me@kennethreitz.com", '
|
|
b'nonce="6bf5d6e4da1ce66918800195d6b9130d", uri="/"')
|
|
|
|
auth = requests.auth.HTTPDigestAuth('user', 'pass')
|
|
|
|
def digest_response_handler(sock):
|
|
# Respond to initial GET with a challenge.
|
|
request_content = consume_socket_content(sock, timeout=0.5)
|
|
assert request_content.startswith(b"GET / HTTP/1.1")
|
|
sock.send(text_401)
|
|
|
|
# Verify we receive an Authorization header in response, then redirect.
|
|
request_content = consume_socket_content(sock, timeout=0.5)
|
|
assert expected_digest in request_content
|
|
sock.send(text_302)
|
|
|
|
# Verify Authorization isn't sent to the redirected host,
|
|
# then send another challenge.
|
|
request_content = consume_socket_content(sock, timeout=0.5)
|
|
assert b'Authorization:' not in request_content
|
|
sock.send(text_401)
|
|
|
|
# Verify Authorization is sent correctly again, and return 200 OK.
|
|
request_content = consume_socket_content(sock, timeout=0.5)
|
|
assert expected_digest in request_content
|
|
sock.send(text_200)
|
|
|
|
return request_content
|
|
|
|
close_server = threading.Event()
|
|
server = Server(digest_response_handler, wait_to_close_event=close_server)
|
|
|
|
with server as (host, port):
|
|
url = 'http://{}:{}/'.format(host, port)
|
|
r = requests.get(url, auth=auth)
|
|
# Verify server succeeded in authenticating.
|
|
assert r.status_code == 200
|
|
# Verify Authorization was sent in final request.
|
|
assert 'Authorization' in r.request.headers
|
|
assert r.request.headers['Authorization'].startswith('Digest ')
|
|
# Verify redirect happened as we expected.
|
|
assert r.history[0].status_code == 302
|
|
close_server.set()
|
|
|
|
|
|
def test_digestauth_401_only_sent_once():
|
|
"""Ensure we correctly respond to a 401 challenge once, and then
|
|
stop responding if challenged again.
|
|
"""
|
|
text_401 = (b'HTTP/1.1 401 UNAUTHORIZED\r\n'
|
|
b'Content-Length: 0\r\n'
|
|
b'WWW-Authenticate: Digest nonce="6bf5d6e4da1ce66918800195d6b9130d"'
|
|
b', opaque="372825293d1c26955496c80ed6426e9e", '
|
|
b'realm="me@kennethreitz.com", qop=auth\r\n\r\n')
|
|
|
|
expected_digest = (b'Authorization: Digest username="user", '
|
|
b'realm="me@kennethreitz.com", '
|
|
b'nonce="6bf5d6e4da1ce66918800195d6b9130d", uri="/"')
|
|
|
|
auth = requests.auth.HTTPDigestAuth('user', 'pass')
|
|
|
|
def digest_failed_response_handler(sock):
|
|
# Respond to initial GET with a challenge.
|
|
request_content = consume_socket_content(sock, timeout=0.5)
|
|
assert request_content.startswith(b"GET / HTTP/1.1")
|
|
sock.send(text_401)
|
|
|
|
# Verify we receive an Authorization header in response, then
|
|
# challenge again.
|
|
request_content = consume_socket_content(sock, timeout=0.5)
|
|
assert expected_digest in request_content
|
|
sock.send(text_401)
|
|
|
|
# Verify the client didn't respond to second challenge.
|
|
request_content = consume_socket_content(sock, timeout=0.5)
|
|
assert request_content == b''
|
|
|
|
return request_content
|
|
|
|
close_server = threading.Event()
|
|
server = Server(digest_failed_response_handler, wait_to_close_event=close_server)
|
|
|
|
with server as (host, port):
|
|
url = 'http://{}:{}/'.format(host, port)
|
|
r = requests.get(url, auth=auth)
|
|
# Verify server didn't authenticate us.
|
|
assert r.status_code == 401
|
|
assert r.history[0].status_code == 401
|
|
close_server.set()
|
|
|
|
|
|
def test_digestauth_only_on_4xx():
|
|
"""Ensure we only send digestauth on 4xx challenges.
|
|
|
|
See https://github.com/psf/requests/issues/3772.
|
|
"""
|
|
text_200_chal = (b'HTTP/1.1 200 OK\r\n'
|
|
b'Content-Length: 0\r\n'
|
|
b'WWW-Authenticate: Digest nonce="6bf5d6e4da1ce66918800195d6b9130d"'
|
|
b', opaque="372825293d1c26955496c80ed6426e9e", '
|
|
b'realm="me@kennethreitz.com", qop=auth\r\n\r\n')
|
|
|
|
auth = requests.auth.HTTPDigestAuth('user', 'pass')
|
|
|
|
def digest_response_handler(sock):
|
|
# Respond to GET with a 200 containing www-authenticate header.
|
|
request_content = consume_socket_content(sock, timeout=0.5)
|
|
assert request_content.startswith(b"GET / HTTP/1.1")
|
|
sock.send(text_200_chal)
|
|
|
|
# Verify the client didn't respond with auth.
|
|
request_content = consume_socket_content(sock, timeout=0.5)
|
|
assert request_content == b''
|
|
|
|
return request_content
|
|
|
|
close_server = threading.Event()
|
|
server = Server(digest_response_handler, wait_to_close_event=close_server)
|
|
|
|
with server as (host, port):
|
|
url = 'http://{}:{}/'.format(host, port)
|
|
r = requests.get(url, auth=auth)
|
|
# Verify server didn't receive auth from us.
|
|
assert r.status_code == 200
|
|
assert len(r.history) == 0
|
|
close_server.set()
|
|
|
|
|
|
_schemes_by_var_prefix = [
|
|
('http', ['http']),
|
|
('https', ['https']),
|
|
('all', ['http', 'https']),
|
|
]
|
|
|
|
_proxy_combos = []
|
|
for prefix, schemes in _schemes_by_var_prefix:
|
|
for scheme in schemes:
|
|
_proxy_combos.append(("{}_proxy".format(prefix), scheme))
|
|
|
|
_proxy_combos += [(var.upper(), scheme) for var, scheme in _proxy_combos]
|
|
|
|
|
|
@pytest.mark.parametrize("var,scheme", _proxy_combos)
|
|
def test_use_proxy_from_environment(httpbin, var, scheme):
|
|
url = "{}://httpbin.org".format(scheme)
|
|
fake_proxy = Server() # do nothing with the requests; just close the socket
|
|
with fake_proxy as (host, port):
|
|
proxy_url = "socks5://{}:{}".format(host, port)
|
|
kwargs = {var: proxy_url}
|
|
with override_environ(**kwargs):
|
|
# fake proxy's lack of response will cause a ConnectionError
|
|
with pytest.raises(requests.exceptions.ConnectionError):
|
|
requests.get(url)
|
|
|
|
# the fake proxy received a request
|
|
assert len(fake_proxy.handler_results) == 1
|
|
|
|
# it had actual content (not checking for SOCKS protocol for now)
|
|
assert len(fake_proxy.handler_results[0]) > 0
|
|
|
|
|
|
def test_redirect_rfc1808_to_non_ascii_location():
|
|
path = u'š'
|
|
expected_path = b'%C5%A1'
|
|
redirect_request = [] # stores the second request to the server
|
|
|
|
def redirect_resp_handler(sock):
|
|
consume_socket_content(sock, timeout=0.5)
|
|
location = u'//{}:{}/{}'.format(host, port, path)
|
|
sock.send(
|
|
b'HTTP/1.1 301 Moved Permanently\r\n'
|
|
b'Content-Length: 0\r\n'
|
|
b'Location: ' + location.encode('utf8') + b'\r\n'
|
|
b'\r\n'
|
|
)
|
|
redirect_request.append(consume_socket_content(sock, timeout=0.5))
|
|
sock.send(b'HTTP/1.1 200 OK\r\n\r\n')
|
|
|
|
close_server = threading.Event()
|
|
server = Server(redirect_resp_handler, wait_to_close_event=close_server)
|
|
|
|
with server as (host, port):
|
|
url = u'http://{}:{}'.format(host, port)
|
|
r = requests.get(url=url, allow_redirects=True)
|
|
assert r.status_code == 200
|
|
assert len(r.history) == 1
|
|
assert r.history[0].status_code == 301
|
|
assert redirect_request[0].startswith(b'GET /' + expected_path + b' HTTP/1.1')
|
|
assert r.url == u'{}/{}'.format(url, expected_path.decode('ascii'))
|
|
|
|
close_server.set()
|
|
|
|
def test_fragment_not_sent_with_request():
|
|
"""Verify that the fragment portion of a URI isn't sent to the server."""
|
|
def response_handler(sock):
|
|
req = consume_socket_content(sock, timeout=0.5)
|
|
sock.send(
|
|
b'HTTP/1.1 200 OK\r\n'
|
|
b'Content-Length: '+bytes(len(req))+b'\r\n'
|
|
b'\r\n'+req
|
|
)
|
|
|
|
close_server = threading.Event()
|
|
server = Server(response_handler, wait_to_close_event=close_server)
|
|
|
|
with server as (host, port):
|
|
url = 'http://{}:{}/path/to/thing/#view=edit&token=hunter2'.format(host, port)
|
|
r = requests.get(url)
|
|
raw_request = r.content
|
|
|
|
assert r.status_code == 200
|
|
headers, body = raw_request.split(b'\r\n\r\n', 1)
|
|
status_line, headers = headers.split(b'\r\n', 1)
|
|
|
|
assert status_line == b'GET /path/to/thing/ HTTP/1.1'
|
|
for frag in (b'view', b'edit', b'token', b'hunter2'):
|
|
assert frag not in headers
|
|
assert frag not in body
|
|
|
|
close_server.set()
|
|
|
|
def test_fragment_update_on_redirect():
|
|
"""Verify we only append previous fragment if one doesn't exist on new
|
|
location. If a new fragment is encountered in a Location header, it should
|
|
be added to all subsequent requests.
|
|
"""
|
|
|
|
def response_handler(sock):
|
|
consume_socket_content(sock, timeout=0.5)
|
|
sock.send(
|
|
b'HTTP/1.1 302 FOUND\r\n'
|
|
b'Content-Length: 0\r\n'
|
|
b'Location: /get#relevant-section\r\n\r\n'
|
|
)
|
|
consume_socket_content(sock, timeout=0.5)
|
|
sock.send(
|
|
b'HTTP/1.1 302 FOUND\r\n'
|
|
b'Content-Length: 0\r\n'
|
|
b'Location: /final-url/\r\n\r\n'
|
|
)
|
|
consume_socket_content(sock, timeout=0.5)
|
|
sock.send(
|
|
b'HTTP/1.1 200 OK\r\n\r\n'
|
|
)
|
|
|
|
close_server = threading.Event()
|
|
server = Server(response_handler, wait_to_close_event=close_server)
|
|
|
|
with server as (host, port):
|
|
url = 'http://{}:{}/path/to/thing/#view=edit&token=hunter2'.format(host, port)
|
|
r = requests.get(url)
|
|
raw_request = r.content
|
|
|
|
assert r.status_code == 200
|
|
assert len(r.history) == 2
|
|
assert r.history[0].request.url == url
|
|
|
|
# Verify we haven't overwritten the location with our previous fragment.
|
|
assert r.history[1].request.url == 'http://{}:{}/get#relevant-section'.format(host, port)
|
|
# Verify previous fragment is used and not the original.
|
|
assert r.url == 'http://{}:{}/final-url/#relevant-section'.format(host, port)
|
|
|
|
close_server.set()
|
|
|
|
|
|
def test_response_content_retains_error():
|
|
"""Verify that accessing response.content retains an error.
|
|
|
|
See https://github.com/kennethreitz/requests/issues/4965
|
|
"""
|
|
|
|
data = "Some random stuff to read from remove server.\n"
|
|
|
|
def response_handler(sock):
|
|
req = consume_socket_content(sock, timeout=0.5)
|
|
|
|
# Send invalid chunked data (length mismatch)
|
|
sock.send(
|
|
b'HTTP/1.1 200 OK\r\n'
|
|
b'Transfer-Encoding: chunked\r\n'
|
|
b'\r\n2\r\n42\r\n8\r\n123\r\n' # 5 bytes missing
|
|
)
|
|
|
|
close_server = threading.Event()
|
|
server = Server(response_handler, wait_to_close_event=close_server)
|
|
|
|
with server as (host, port):
|
|
url = 'http://{}:{}/path'.format(host, port)
|
|
r = requests.post(url, stream=True)
|
|
with pytest.raises(ChunkedEncodingError):
|
|
r.content
|
|
|
|
# Access the bad response data again, I would expect the same
|
|
# error again.
|
|
|
|
try:
|
|
content = r.content
|
|
except ChunkedEncodingError:
|
|
pass # fine, same exception
|
|
else:
|
|
assert False, "error response has content: {0!r}".format(content)
|
|
close_server.set()
|
|
|