diff --git a/README.md b/README.md index f53bbe3..ded00f0 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,7 @@ Freely hosted in [HTTP](http://httpbin.org) & - [`/cookies/set/:name/:value`](http://httpbin.org/cookies/set/key/value) Sets a simple cookie. - [`/basic-auth/:user/:passwd`](http://httpbin.org/basic-auth/user/passwd) Challenges HTTPBasic Auth. - [`/hidden-basic-auth/:user/:passwd`](http://httpbin.org/hidden-basic-auth/user/passwd) 404's if not proper HTTPBasic Auth. +- [`/digest-auth/:qop/:user/:passwd`](http://httpbin.org/digest-auth/auth/user/passwd) Challanges HTTP Digest Auth ## DESCRIPTION diff --git a/httpbin/core.py b/httpbin/core.py index a4381c9..80ac50c 100644 --- a/httpbin/core.py +++ b/httpbin/core.py @@ -6,11 +6,14 @@ httpbin.core This module provides the core HttpBin experience. """ - +import os +import time from flask import Flask, request, render_template, redirect +from werkzeug.datastructures import WWWAuthenticate + from . import filters -from .helpers import get_headers, status_code, get_dict, check_basic_auth +from .helpers import get_headers, status_code, get_dict, check_basic_auth, check_digest_auth, H ENV_COOKIES = ( @@ -199,6 +202,31 @@ def hidden_basic_auth(user='user', passwd='passwd'): return dict(authenticated=True, user=user) +@app.route('/digest-auth///') +@filters.json +def digest_auth(qop=None, user='user', passwd='passwd'): + """Prompts the user for authorization using HTTP Digest auth""" + if qop not in ('auth', 'auth-int'): + qop = None + if not request.headers.get('Authorization'): + response = app.make_response('') + response.status_code = 401 + + nonce = H("%s:%d:%s" % (request.remote_addr, + time.time(), + os.urandom(10))) + opaque = H(os.urandom(10)) + + auth = WWWAuthenticate("digest") + auth.set_digest('Fake Realm', nonce, opaque=opaque, + qop=('auth', 'auth-int') if qop is None else (qop, )) + response.headers['WWW-Authenticate'] = auth.to_header() + return response + elif not check_digest_auth(user, passwd): + return status_code(403) + return dict(authenticated=True, user=user) + + if __name__ == '__main__': app.run() diff --git a/httpbin/helpers.py b/httpbin/helpers.py index 72e9fd5..f3482b6 100644 --- a/httpbin/helpers.py +++ b/httpbin/helpers.py @@ -7,9 +7,12 @@ httpbin.helpers This module provides helper functions for httpbin. """ +from hashlib import md5 +from werkzeug.http import parse_authorization_header from flask import request, make_response + from .structures import CaseInsensitiveDict @@ -146,3 +149,90 @@ def check_basic_auth(user, passwd): auth = request.authorization return auth and auth.username == user and auth.password == passwd + + + +# Digest auth helpers +# qop is a quality of protection + +def H(data): + return md5(data).hexdigest() + + +def HA1(realm, username, password): + """Create HA1 hash by realm, username, password + + HA1 = md5(A1) = MD5(username:realm:password) + """ + return H("%s:%s:%s" % (username, + realm, + password)) + + +def HA2(credentails, request): + """Create HA2 md5 hash + + If the qop directive's value is "auth" or is unspecified, then HA2: + HA2 = md5(A2) = MD5(method:digestURI) + If the qop directive's value is "auth-int" , then HA2 is + HA2 = md5(A2) = MD5(method:digestURI:MD5(entityBody)) + """ + if credentails.get("qop") == "auth" or credentails.get('qop') is None: + return H("%s:%s" % (request['method'], request['uri'])) + elif credentails.get("qop") == "auth-int": + for k in 'method', 'uri', 'body': + if k not in request: + raise ValueError("%s required" % k) + return H("%s:%s:%s" % (request['method'], + request['uri'], + H(request['body']))) + raise ValueError + + +def response(credentails, password, request): + """Compile digest auth response + + If the qop directive's value is "auth" or "auth-int" , then compute the response as follows: + RESPONSE = MD5(HA1:nonce:nonceCount:clienNonce:qop:HA2) + Else if the qop directive is unspecified, then compute the response as follows: + RESPONSE = MD5(HA1:nonce:HA2) + + Arguments: + - `credentails`: credentails dict + - `password`: request user password + - `request`: request dict + """ + response = None + HA1_value = HA1(credentails.get('realm'), credentails.get('username'), password) + HA2_value = HA2(credentails, request) + if credentails.get('qop') is None: + response = H(":".join([HA1_value, credentails.get('nonce'), HA2_value])) + elif credentails.get('qop') == 'auth' or credentails.get('qop') == 'auth-int': + for k in 'nonce', 'nc', 'cnonce', 'qop': + if k not in credentails: + raise ValueError("%s required for response H" % k) + response = H(":".join([HA1_value, + credentails.get('nonce'), + credentails.get('nc'), + credentails.get('cnonce'), + credentails.get('qop'), + HA2_value])) + else: + raise ValueError("qop value are wrong") + + return response + + +def check_digest_auth(user, passwd): + """Check user authentication using HTTP Digest auth""" + + if request.headers.get('Authorization'): + credentails = parse_authorization_header(request.headers.get('Authorization')) + if not credentails: + return + response_hash = response(credentails, passwd, dict(uri=request.path, + body=request.data, + method=request.method)) + if credentails['response'] == response_hash: + return True + return False