From 1da121356181844967fae02aedc17384e435581d Mon Sep 17 00:00:00 2001 From: Nate Prewitt Date: Mon, 12 Dec 2016 13:47:18 -0700 Subject: [PATCH] add socket tests for 401 redirect and 401 failure --- tests/test_lowlevel.py | 113 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 112 insertions(+), 1 deletion(-) diff --git a/tests/test_lowlevel.py b/tests/test_lowlevel.py index 126a3a3f..6bc948b4 100644 --- a/tests/test_lowlevel.py +++ b/tests/test_lowlevel.py @@ -5,7 +5,7 @@ import pytest import threading import requests -from tests.testserver.server import Server +from tests.testserver.server import Server, consume_socket_content from .utils import override_environ @@ -25,6 +25,117 @@ def test_chunked_upload(): assert r.request.headers['Transfer-Encoding'] == 'chunked' +def test_digestauth_401_count_reset_on_redirect(): + """Ensure we correctly reset num_401_calls after a successful digest auth, + followed by a 302 redirect to another digest auth prompt. + + See https://github.com/kennethreitz/requests/issues/1979. + """ + text_401 = (b'HTTP/1.1 401 UNAUTHORIZED\r\n' + b'Content-Length: 0\r\n' + b'WWW-Authenticate: Digest nonce="6bf5d6e4da1ce66918800195d6b9130d"' + b', opaque="372825293d1c26955496c80ed6426e9e", ' + b'realm="me@kennethreitz.com", qop=auth\r\n\r\n') + + text_302 = (b'HTTP/1.1 302 FOUND\r\n' + b'Content-Length: 0\r\n' + b'Location: /\r\n\r\n') + + text_200 = (b'HTTP/1.1 200 OK\r\n' + b'Content-Length: 0\r\n\r\n') + + expected_digest = (b'Authorization: Digest username="user", ' + b'realm="me@kennethreitz.com", ' + b'nonce="6bf5d6e4da1ce66918800195d6b9130d", uri="/"') + + auth = requests.auth.HTTPDigestAuth('user', 'pass') + + def digest_response_handler(sock): + # Respond to initial GET with a challenge. + request_content = consume_socket_content(sock, timeout=0.5) + assert request_content.startswith(b"GET / HTTP/1.1") + sock.send(text_401) + + # Verify we receive an Authorization header in response, then redirect. + request_content = consume_socket_content(sock, timeout=0.5) + assert expected_digest in request_content + sock.send(text_302) + + # Verify Authorization isn't sent to the redirected host, + # then send another challenge. + request_content = consume_socket_content(sock, timeout=0.5) + assert b'Authorization:' not in request_content + sock.send(text_401) + + # Verify Authorization is sent correctly again, and return 200 OK. + request_content = consume_socket_content(sock, timeout=0.5) + assert expected_digest in request_content + sock.send(text_200) + + return request_content + + close_server = threading.Event() + server = Server(digest_response_handler, wait_to_close_event=close_server) + + with server as (host, port): + url = 'http://{0}:{1}/'.format(host, port) + r = requests.get(url, auth=auth) + # Verify server succeeded in authenticating. + assert r.status_code == 200 + # Verify Authorization was sent in final request. + assert 'Authorization' in r.request.headers + assert r.request.headers['Authorization'].startswith('Digest ') + # Verify redirect happened as we expected. + assert r.history[0].status_code == 302 + close_server.set() + + +def test_digestauth_401_only_sent_once(): + """Ensure we correctly respond to a 401 challenge once, and then + stop responding if challenged again. + """ + text_401 = (b'HTTP/1.1 401 UNAUTHORIZED\r\n' + b'Content-Length: 0\r\n' + b'WWW-Authenticate: Digest nonce="6bf5d6e4da1ce66918800195d6b9130d"' + b', opaque="372825293d1c26955496c80ed6426e9e", ' + b'realm="me@kennethreitz.com", qop=auth\r\n\r\n') + + expected_digest = (b'Authorization: Digest username="user", ' + b'realm="me@kennethreitz.com", ' + b'nonce="6bf5d6e4da1ce66918800195d6b9130d", uri="/"') + + auth = requests.auth.HTTPDigestAuth('user', 'pass') + + def digest_failed_response_handler(sock): + # Respond to initial GET with a challenge. + request_content = consume_socket_content(sock, timeout=0.5) + assert request_content.startswith(b"GET / HTTP/1.1") + sock.send(text_401) + + # Verify we receive an Authorization header in response, then + # challenge again. + request_content = consume_socket_content(sock, timeout=0.5) + assert expected_digest in request_content + sock.send(text_401) + + # Verify the client didn't respond to second challenge. + request_content = consume_socket_content(sock, timeout=0.5) + assert request_content == b'' + + return request_content + + close_server = threading.Event() + server = Server(digest_failed_response_handler, wait_to_close_event=close_server) + + with server as (host, port): + url = 'http://{0}:{1}/'.format(host, port) + r = requests.get(url, auth=auth) + # Verify server didn't authenticate us. + assert r.status_code == 401 + assert r.history[0].status_code == 401 + close_server.set() + + _schemes_by_var_prefix = [ ('http', ['http']), ('https', ['https']),