Move stale functionality to main digest url

- now digest url will contain number of request after which request with proper credentials
  will be rejected since nonce value expiries
This commit is contained in:
Marek Ruszczak
2017-06-25 19:59:18 +02:00
parent f8a9c56384
commit 791e31cba4
3 changed files with 67 additions and 94 deletions
+29 -92
View File
@@ -24,7 +24,9 @@ from werkzeug.wrappers import BaseResponse
from raven.contrib.flask import Sentry
from . import filters
from .helpers import get_headers, status_code, get_dict, get_request_range, check_basic_auth, check_digest_auth, secure_cookie, H, ROBOT_TXT, ANGRY_ASCII, parse_multi_value_header
from .helpers import get_headers, status_code, get_dict, get_request_range, check_basic_auth, check_digest_auth, \
secure_cookie, H, ROBOT_TXT, ANGRY_ASCII, parse_multi_value_header, next_stale_after_value, \
digest_challenge_response
from .utils import weighted_choice
from .structures import CaseInsensitiveDict
@@ -446,110 +448,45 @@ def hidden_basic_auth(user='user', passwd='passwd'):
@app.route('/digest-auth/<qop>/<user>/<passwd>')
def digest_auth_md5(qop=None, user='user', passwd='passwd'):
return digest_auth(qop, user, passwd, "MD5")
return digest_auth(qop, user, passwd, "MD5", 'never')
@app.route('/digest-auth/<qop>/<user>/<passwd>/<algorithm>')
def digest_auth(qop=None, user='user', passwd='passwd', algorithm='MD5'):
def digest_auth_nostale(qop=None, user='user', passwd='passwd', algorithm='MD5'):
return digest_auth(qop, user, passwd, algorithm, 'never')
@app.route('/digest-auth/<qop>/<user>/<passwd>/<algorithm>/<stale_after>')
def digest_auth(qop=None, user='user', passwd='passwd', algorithm='MD5', stale_after='never'):
"""Prompts the user for authorization using HTTP Digest auth"""
if algorithm not in ('MD5', 'SHA-256'):
algorithm = 'MD5'
if qop not in ('auth', 'auth-int'):
qop = None
if 'Authorization' not in request.headers or \
not check_digest_auth(user, passwd) or \
'Cookie' not in request.headers:
response = app.make_response('')
response.status_code = 401
# RFC2616 Section4.2: HTTP headers are ASCII. That means
# request.remote_addr was originally ASCII, so I should be able to
# encode it back to ascii. Also, RFC2617 says about nonces: "The
# contents of the nonce are implementation dependent"
nonce = H(b''.join([
getattr(request,'remote_addr',u'').encode('ascii'),
b':',
str(time.time()).encode('ascii'),
b':',
os.urandom(10)
]), "MD5")
opaque = H(os.urandom(10), "MD5")
auth = WWWAuthenticate("digest")
auth.set_digest('me@kennethreitz.com', nonce, opaque=opaque,
qop=('auth', 'auth-int') if qop is None else (qop, ), algorithm=algorithm)
response.headers['WWW-Authenticate'] = auth.to_header()
response.headers['Set-Cookie'] = 'fake=fake_value'
if 'Authorization' not in request.headers or \
not check_digest_auth(user, passwd) or \
'Cookie' not in request.headers:
response = digest_challenge_response(app, qop, algorithm)
response.set_cookie('stale_after', value=stale_after)
return response
return jsonify(authenticated=True, user=user)
@app.route('/digest-auth-stale/<qop>/<user>/<passwd>/<algorithm>')
def digest_auth_stale(qop=None, user='user', passwd='passwd', algorithm='MD5'):
"""Prompts the user for authorization using HTTP Digest auth"""
if 'stale_after' in request.cookies:
stale_after_value = request.cookies.get('stale_after')
if algorithm not in ('MD5', 'SHA-256'):
algorithm = 'MD5'
if stale_after_value == '0':
response = digest_challenge_response(app, qop, algorithm, True)
response.set_cookie('stale_after', value=stale_after)
return response
if qop not in ('auth', 'auth-int'):
qop = None
response = jsonify(authenticated=True, user=user)
response.set_cookie('stale_after', value=next_stale_after_value(stale_after_value))
return response
if 'Authorization' not in request.headers or \
not check_digest_auth(user, passwd) or \
'Cookie' not in request.headers:
response = app.make_response('')
response.status_code = 401
# RFC2616 Section4.2: HTTP headers are ASCII. That means
# request.remote_addr was originally ASCII, so I should be able to
# encode it back to ascii. Also, RFC2617 says about nonces: "The
# contents of the nonce are implementation dependent"
nonce = H(b''.join([
getattr(request,'remote_addr',u'').encode('ascii'),
b':',
str(time.time()).encode('ascii'),
b':',
os.urandom(10)
]), "MD5")
opaque = H(os.urandom(10), "MD5")
auth = WWWAuthenticate("digest")
auth.set_digest('me@kennethreitz.com', nonce, opaque=opaque,
qop=('auth', 'auth-int') if qop is None else (qop, ), algorithm=algorithm)
response.headers['WWW-Authenticate'] = auth.to_header()
response.headers['Set-Cookie'] = 'stale=no; Path=/'
return response
if not 'stale=yes' in request.headers['Cookie'] :
response = app.make_response('')
response.status_code = 401
# RFC2616 Section4.2: HTTP headers are ASCII. That means
# request.remote_addr was originally ASCII, so I should be able to
# encode it back to ascii. Also, RFC2617 says about nonces: "The
# contents of the nonce are implementation dependent"
nonce = H(b''.join([
getattr(request,'remote_addr',u'').encode('ascii'),
b':',
str(time.time()).encode('ascii'),
b':',
os.urandom(10)
]), "MD5")
opaque = H(os.urandom(10), "MD5")
auth = WWWAuthenticate("digest")
auth.set_digest('me@kennethreitz.com', nonce, opaque=opaque,
qop=('auth', 'auth-int') if qop is None else (qop, ), algorithm=algorithm)
auth.stale = True;
response.headers['WWW-Authenticate'] = auth.to_header()
response.headers['Set-Cookie'] = 'stale=yes; Path=/'
return response
response = jsonify(authenticated=True, user=user)
response.headers['Set-Cookie'] = 'stale=no; Path=/'
return response
response = jsonify(authenticated=True, user=user)
response.set_cookie('stale_after', value=stale_after)
return response
@app.route('/delay/<delay>')
+36
View File
@@ -10,8 +10,11 @@ This module provides helper functions for httpbin.
import json
import base64
import re
import time
import os
from hashlib import md5, sha256
from werkzeug.http import parse_authorization_header
from werkzeug.datastructures import WWWAuthenticate
from flask import request, make_response
from six.moves.urllib.parse import urlparse, urlunparse
@@ -432,3 +435,36 @@ def parse_multi_value_header(header_str):
if match is not None:
parsed_parts.append(match.group(2))
return parsed_parts
def next_stale_after_value(stale_after):
try:
stal_after_count = int(stale_after) - 1
return str(stal_after_count)
except ValueError:
return 'never'
def digest_challenge_response(app, qop, algorithm, stale = False):
response = app.make_response('')
response.status_code = 401
# RFC2616 Section4.2: HTTP headers are ASCII. That means
# request.remote_addr was originally ASCII, so I should be able to
# encode it back to ascii. Also, RFC2617 says about nonces: "The
# contents of the nonce are implementation dependent"
nonce = H(b''.join([
getattr(request, 'remote_addr', u'').encode('ascii'),
b':',
str(time.time()).encode('ascii'),
b':',
os.urandom(10)
]), "MD5")
opaque = H(os.urandom(10), "MD5")
auth = WWWAuthenticate("digest")
auth.set_digest('me@kennethreitz.com', nonce, opaque=opaque,
qop=('auth', 'auth-int') if qop is None else (qop,), algorithm=algorithm)
auth.stale = stale
response.headers['WWW-Authenticate'] = auth.to_header()
return response
+2 -2
View File
@@ -38,8 +38,8 @@
<li><a href="{{ url_for('delete_cookies', k1='', k2='') }}"><code>/cookies/delete?name</code></a> Deletes one or more simple cookies.</li>
<li><a href="{{ url_for('basic_auth', user='user', passwd='passwd') }}"><code>/basic-auth/:user/:passwd</code></a> Challenges HTTPBasic Auth.</li>
<li><a href="{{ url_for('hidden_basic_auth', user='user', passwd='passwd') }}"><code>/hidden-basic-auth/:user/:passwd</code></a> 404'd BasicAuth.</li>
<li><a href="{{ url_for('digest_auth', qop='auth', user='user', passwd='passwd', algorithm='MD5') }}"><code>/digest-auth/:qop/:user/:passwd/:algorithm</code></a> Challenges HTTP Digest Auth.</li>
<li><a href="{{ url_for('digest_auth', qop='auth', user='user', passwd='passwd', algorithm='MD5') }}"><code>/digest-auth/:qop/:user/:passwd</code></a> Challenges HTTP Digest Auth.</li>
<li><a href="{{ url_for('digest_auth', qop='auth', user='user', passwd='passwd', algorithm='MD5', stale_after='never') }}"><code>/digest-auth/:qop/:user/:passwd/:algorithm</code></a> Challenges HTTP Digest Auth.</li>
<li><a href="{{ url_for('digest_auth', qop='auth', user='user', passwd='passwd', algorithm='MD5', stale_after='never') }}"><code>/digest-auth/:qop/:user/:passwd</code></a> Challenges HTTP Digest Auth.</li>
<li><a href="{{ url_for('stream_n_messages', n=20) }}"><code>/stream/:n</code></a> Streams <em>min(n, 100)</em> lines.</li>
<li><a href="{{ url_for('delay_response', delay=3) }}"><code>/delay/:n</code></a> Delays responding for <em>min(n, 10)</em> seconds.</li>
<li><a href="{{ url_for('drip', numbytes=5, duration=5, code=200) }}"><code>/drip?numbytes=n&amp;duration=s&amp;delay=s&amp;code=code</code></a> Drips data over a duration after an optional initial delay, then (optionally) returns with the given status code.</li>