mirror of
https://github.com/kennethreitz/httpbin.git
synced 2026-06-05 23:00:18 +00:00
Merge pull request #366 from MarekR22/DigestStaleScenerio
Digest authentication and expiried nonce - stale=true scenerio
This commit is contained in:
+48
-27
@@ -21,10 +21,13 @@ from six.moves import range as xrange
|
||||
from werkzeug.datastructures import WWWAuthenticate, MultiDict
|
||||
from werkzeug.http import http_date
|
||||
from werkzeug.wrappers import BaseResponse
|
||||
from werkzeug.http import parse_authorization_header
|
||||
from raven.contrib.flask import Sentry
|
||||
|
||||
from . import filters
|
||||
from .helpers import get_headers, status_code, get_dict, get_request_range, check_basic_auth, check_digest_auth, secure_cookie, H, ROBOT_TXT, ANGRY_ASCII, parse_multi_value_header
|
||||
from .helpers import get_headers, status_code, get_dict, get_request_range, check_basic_auth, check_digest_auth, \
|
||||
secure_cookie, H, ROBOT_TXT, ANGRY_ASCII, parse_multi_value_header, next_stale_after_value, \
|
||||
digest_challenge_response
|
||||
from .utils import weighted_choice
|
||||
from .structures import CaseInsensitiveDict
|
||||
|
||||
@@ -446,41 +449,59 @@ def hidden_basic_auth(user='user', passwd='passwd'):
|
||||
|
||||
@app.route('/digest-auth/<qop>/<user>/<passwd>')
|
||||
def digest_auth_md5(qop=None, user='user', passwd='passwd'):
|
||||
return digest_auth(qop, user, passwd, "MD5")
|
||||
return digest_auth(qop, user, passwd, "MD5", 'never')
|
||||
|
||||
|
||||
@app.route('/digest-auth/<qop>/<user>/<passwd>/<algorithm>')
|
||||
def digest_auth(qop=None, user='user', passwd='passwd', algorithm='MD5'):
|
||||
def digest_auth_nostale(qop=None, user='user', passwd='passwd', algorithm='MD5'):
|
||||
return digest_auth(qop, user, passwd, algorithm, 'never')
|
||||
|
||||
|
||||
@app.route('/digest-auth/<qop>/<user>/<passwd>/<algorithm>/<stale_after>')
|
||||
def digest_auth(qop=None, user='user', passwd='passwd', algorithm='MD5', stale_after='never'):
|
||||
"""Prompts the user for authorization using HTTP Digest auth"""
|
||||
if algorithm not in ('MD5', 'SHA-256'):
|
||||
algorithm = 'MD5'
|
||||
|
||||
if qop not in ('auth', 'auth-int'):
|
||||
qop = None
|
||||
if 'Authorization' not in request.headers or \
|
||||
not check_digest_auth(user, passwd) or \
|
||||
'Cookie' not in request.headers:
|
||||
response = app.make_response('')
|
||||
response.status_code = 401
|
||||
|
||||
# RFC2616 Section4.2: HTTP headers are ASCII. That means
|
||||
# request.remote_addr was originally ASCII, so I should be able to
|
||||
# encode it back to ascii. Also, RFC2617 says about nonces: "The
|
||||
# contents of the nonce are implementation dependent"
|
||||
nonce = H(b''.join([
|
||||
getattr(request,'remote_addr',u'').encode('ascii'),
|
||||
b':',
|
||||
str(time.time()).encode('ascii'),
|
||||
b':',
|
||||
os.urandom(10)
|
||||
]), "MD5")
|
||||
opaque = H(os.urandom(10), "MD5")
|
||||
|
||||
auth = WWWAuthenticate("digest")
|
||||
auth.set_digest('me@kennethreitz.com', nonce, opaque=opaque,
|
||||
qop=('auth', 'auth-int') if qop is None else (qop, ), algorithm=algorithm)
|
||||
response.headers['WWW-Authenticate'] = auth.to_header()
|
||||
response.headers['Set-Cookie'] = 'fake=fake_value'
|
||||
if 'Authorization' not in request.headers or \
|
||||
'Cookie' not in request.headers:
|
||||
response = digest_challenge_response(app, qop, algorithm)
|
||||
response.set_cookie('stale_after', value=stale_after)
|
||||
return response
|
||||
return jsonify(authenticated=True, user=user)
|
||||
|
||||
credentails = parse_authorization_header(request.headers.get('Authorization'))
|
||||
if not credentails :
|
||||
response = digest_challenge_response(app, qop, algorithm)
|
||||
response.set_cookie('stale_after', value=stale_after)
|
||||
return response
|
||||
|
||||
current_nonce = credentails.get('nonce')
|
||||
|
||||
stale_after_value = None
|
||||
if 'stale_after' in request.cookies :
|
||||
stale_after_value = request.cookies.get('stale_after')
|
||||
|
||||
if 'last_nonce' in request.cookies and current_nonce == request.cookies.get('last_nonce') or \
|
||||
stale_after_value == '0' :
|
||||
response = digest_challenge_response(app, qop, algorithm, True)
|
||||
response.set_cookie('stale_after', value=stale_after)
|
||||
response.set_cookie('last_nonce', value=current_nonce)
|
||||
return response
|
||||
|
||||
if not check_digest_auth(user, passwd) :
|
||||
response = digest_challenge_response(app, qop, algorithm, False)
|
||||
response.set_cookie('stale_after', value=stale_after)
|
||||
response.set_cookie('last_nonce', value=current_nonce)
|
||||
return response
|
||||
|
||||
response = jsonify(authenticated=True, user=user)
|
||||
if stale_after_value :
|
||||
response.set_cookie('stale_after', value=next_stale_after_value(stale_after_value))
|
||||
|
||||
return response
|
||||
|
||||
|
||||
@app.route('/delay/<delay>')
|
||||
|
||||
@@ -10,8 +10,11 @@ This module provides helper functions for httpbin.
|
||||
import json
|
||||
import base64
|
||||
import re
|
||||
import time
|
||||
import os
|
||||
from hashlib import md5, sha256
|
||||
from werkzeug.http import parse_authorization_header
|
||||
from werkzeug.datastructures import WWWAuthenticate
|
||||
|
||||
from flask import request, make_response
|
||||
from six.moves.urllib.parse import urlparse, urlunparse
|
||||
@@ -432,3 +435,36 @@ def parse_multi_value_header(header_str):
|
||||
if match is not None:
|
||||
parsed_parts.append(match.group(2))
|
||||
return parsed_parts
|
||||
|
||||
|
||||
def next_stale_after_value(stale_after):
|
||||
try:
|
||||
stal_after_count = int(stale_after) - 1
|
||||
return str(stal_after_count)
|
||||
except ValueError:
|
||||
return 'never'
|
||||
|
||||
|
||||
def digest_challenge_response(app, qop, algorithm, stale = False):
|
||||
response = app.make_response('')
|
||||
response.status_code = 401
|
||||
|
||||
# RFC2616 Section4.2: HTTP headers are ASCII. That means
|
||||
# request.remote_addr was originally ASCII, so I should be able to
|
||||
# encode it back to ascii. Also, RFC2617 says about nonces: "The
|
||||
# contents of the nonce are implementation dependent"
|
||||
nonce = H(b''.join([
|
||||
getattr(request, 'remote_addr', u'').encode('ascii'),
|
||||
b':',
|
||||
str(time.time()).encode('ascii'),
|
||||
b':',
|
||||
os.urandom(10)
|
||||
]), "MD5")
|
||||
opaque = H(os.urandom(10), "MD5")
|
||||
|
||||
auth = WWWAuthenticate("digest")
|
||||
auth.set_digest('me@kennethreitz.com', nonce, opaque=opaque,
|
||||
qop=('auth', 'auth-int') if qop is None else (qop,), algorithm=algorithm)
|
||||
auth.stale = stale
|
||||
response.headers['WWW-Authenticate'] = auth.to_header()
|
||||
return response
|
||||
|
||||
@@ -38,8 +38,8 @@
|
||||
<li><a href="{{ url_for('delete_cookies', k1='', k2='') }}"><code>/cookies/delete?name</code></a> Deletes one or more simple cookies.</li>
|
||||
<li><a href="{{ url_for('basic_auth', user='user', passwd='passwd') }}"><code>/basic-auth/:user/:passwd</code></a> Challenges HTTPBasic Auth.</li>
|
||||
<li><a href="{{ url_for('hidden_basic_auth', user='user', passwd='passwd') }}"><code>/hidden-basic-auth/:user/:passwd</code></a> 404'd BasicAuth.</li>
|
||||
<li><a href="{{ url_for('digest_auth', qop='auth', user='user', passwd='passwd', algorithm='MD5') }}"><code>/digest-auth/:qop/:user/:passwd/:algorithm</code></a> Challenges HTTP Digest Auth.</li>
|
||||
<li><a href="{{ url_for('digest_auth', qop='auth', user='user', passwd='passwd', algorithm='MD5') }}"><code>/digest-auth/:qop/:user/:passwd</code></a> Challenges HTTP Digest Auth.</li>
|
||||
<li><a href="{{ url_for('digest_auth', qop='auth', user='user', passwd='passwd', algorithm='MD5', stale_after='never') }}"><code>/digest-auth/:qop/:user/:passwd/:algorithm</code></a> Challenges HTTP Digest Auth.</li>
|
||||
<li><a href="{{ url_for('digest_auth', qop='auth', user='user', passwd='passwd', algorithm='MD5', stale_after='never') }}"><code>/digest-auth/:qop/:user/:passwd</code></a> Challenges HTTP Digest Auth.</li>
|
||||
<li><a href="{{ url_for('stream_n_messages', n=20) }}"><code>/stream/:n</code></a> Streams <em>min(n, 100)</em> lines.</li>
|
||||
<li><a href="{{ url_for('delay_response', delay=3) }}"><code>/delay/:n</code></a> Delays responding for <em>min(n, 10)</em> seconds.</li>
|
||||
<li><a href="{{ url_for('drip', numbytes=5, duration=5, code=200) }}"><code>/drip?numbytes=n&duration=s&delay=s&code=code</code></a> Drips data over a duration after an optional initial delay, then (optionally) returns with the given status code.</li>
|
||||
|
||||
+73
-12
@@ -284,13 +284,26 @@ class HttpbinTestCase(unittest.TestCase):
|
||||
for qop in None, 'auth', 'auth-int',:
|
||||
for algorithm in None, 'MD5', 'SHA-256':
|
||||
for body in None, b'', b'request payload':
|
||||
self._test_digest_auth(username, password, qop, algorithm, body)
|
||||
for stale_after in (None, 1, 4) if algorithm else (None,) :
|
||||
self._test_digest_auth(username, password, qop, algorithm, body, stale_after)
|
||||
|
||||
def _test_digest_auth(self, username, password, qop=None, algorithm=None, body=None):
|
||||
uri = '/digest-auth/{0}/{1}/{2}'.format(qop or 'wrong-qop', username, password)
|
||||
if algorithm:
|
||||
uri += '/' + algorithm
|
||||
def _test_digest_auth(self, username, password, qop, algorithm=None, body=None, stale_after=None):
|
||||
uri = self._digest_auth_create_uri(username, password, qop, algorithm, stale_after)
|
||||
|
||||
unauthorized_response = self._test_digest_auth_first_challenge(uri)
|
||||
|
||||
header = unauthorized_response.headers.get('WWW-Authenticate')
|
||||
|
||||
authorized_response, nonce = self._test_digest_response_for_auth_request(header, username, password, qop, uri, body)
|
||||
self.assertEqual(authorized_response.status_code, 200)
|
||||
|
||||
if None == stale_after :
|
||||
return
|
||||
|
||||
# test stale after scenerio
|
||||
self._digest_auth_stale_after_check(header, username, password, uri, body, qop, stale_after)
|
||||
|
||||
def _test_digest_auth_first_challenge(self, uri):
|
||||
unauthorized_response = self.app.get(
|
||||
uri,
|
||||
environ_base={
|
||||
@@ -300,22 +313,47 @@ class HttpbinTestCase(unittest.TestCase):
|
||||
)
|
||||
# make sure it returns a 401
|
||||
self.assertEqual(unauthorized_response.status_code, 401)
|
||||
header = unauthorized_response.headers.get('WWW-Authenticate')
|
||||
return unauthorized_response
|
||||
|
||||
def _digest_auth_create_uri(self, username, password, qop, algorithm, stale_after):
|
||||
uri = '/digest-auth/{0}/{1}/{2}'.format(qop or 'wrong-qop', username, password)
|
||||
if algorithm:
|
||||
uri += '/' + algorithm
|
||||
if stale_after:
|
||||
uri += '/{0}'.format(stale_after)
|
||||
return uri
|
||||
|
||||
def _digest_auth_stale_after_check(self, header, username, password, uri, body, qop, stale_after):
|
||||
for nc in range(2, stale_after + 1):
|
||||
authorized_response, nonce = self._test_digest_response_for_auth_request(header, username, password, qop, uri, \
|
||||
body, nc)
|
||||
self.assertEqual(authorized_response.status_code, 200)
|
||||
stale_response, nonce = self._test_digest_response_for_auth_request(header, username, password, qop, uri, \
|
||||
body, stale_after + 1)
|
||||
self.assertEqual(stale_response.status_code, 401)
|
||||
header = stale_response.headers.get('WWW-Authenticate')
|
||||
self.assertIn('stale=TRUE', header)
|
||||
|
||||
def _test_digest_response_for_auth_request(self, header, username, password, qop, uri, body, nc=1, nonce=None):
|
||||
auth_type, auth_info = header.split(None, 1)
|
||||
self.assertEqual(auth_type, 'Digest')
|
||||
|
||||
d = parse_dict_header(auth_info)
|
||||
|
||||
nonce = d['nonce']
|
||||
nonce = nonce or d['nonce']
|
||||
realm = d['realm']
|
||||
opaque = d['opaque']
|
||||
cnonce, nc = (_hash(os.urandom(10), "MD5"), '00000001') if qop in ('auth', 'auth-int') else (None, None)
|
||||
if qop :
|
||||
self.assertIn(qop, [x.strip() for x in d['qop'].split(',')], 'Challenge should contains expected qop')
|
||||
algorithm = d['algorithm']
|
||||
|
||||
cnonce, nc = (_hash(os.urandom(10), "MD5"), '{:08}'.format(nc)) if qop in ('auth', 'auth-int') else (None, None)
|
||||
|
||||
auth_header = _make_digest_auth_header(
|
||||
username, password, 'GET', uri, nonce, realm, opaque, algorithm, qop, cnonce, nc, body)
|
||||
|
||||
# make second request
|
||||
authorized_response = self.app.get(
|
||||
return self.app.get(
|
||||
uri,
|
||||
environ_base={
|
||||
# httpbin's digest auth implementation uses the remote addr to
|
||||
@@ -326,10 +364,33 @@ class HttpbinTestCase(unittest.TestCase):
|
||||
'Authorization': auth_header,
|
||||
},
|
||||
data=body
|
||||
)
|
||||
), nonce
|
||||
|
||||
# done!
|
||||
self.assertEqual(authorized_response.status_code, 200)
|
||||
def test_digest_auth_wrong_pass(self):
|
||||
"""Test different combinations of digest auth parameters"""
|
||||
username = 'user'
|
||||
password = 'passwd'
|
||||
for qop in None, 'auth', 'auth-int',:
|
||||
for algorithm in None, 'MD5', 'SHA-256':
|
||||
for body in None, b'', b'request payload':
|
||||
self._test_digest_auth_wrong_pass(username, password, qop, algorithm, body, 3)
|
||||
|
||||
def _test_digest_auth_wrong_pass(self, username, password, qop, algorithm=None, body=None, stale_after=None):
|
||||
uri = self._digest_auth_create_uri(username, password, qop, algorithm, stale_after)
|
||||
unauthorized_response = self._test_digest_auth_first_challenge(uri)
|
||||
|
||||
header = unauthorized_response.headers.get('WWW-Authenticate')
|
||||
|
||||
wrong_pass_response, nonce = self._test_digest_response_for_auth_request(header, username, "wrongPassword", qop, uri, body)
|
||||
self.assertEqual(wrong_pass_response.status_code, 401)
|
||||
header = wrong_pass_response.headers.get('WWW-Authenticate')
|
||||
self.assertNotIn('stale=TRUE', header)
|
||||
|
||||
reused_nonce_response, nonce = self._test_digest_response_for_auth_request(header, username, password, qop, uri, \
|
||||
body, nonce=nonce)
|
||||
self.assertEqual(reused_nonce_response.status_code, 401)
|
||||
header = reused_nonce_response.headers.get('WWW-Authenticate')
|
||||
self.assertIn('stale=TRUE', header)
|
||||
|
||||
def test_drip(self):
|
||||
response = self.app.get('/drip?numbytes=400&duration=2&delay=1')
|
||||
|
||||
Reference in New Issue
Block a user