Merge pull request #137 from kevin1024/master

Python3 Support
This commit is contained in:
2014-05-27 13:13:00 -04:00
10 changed files with 167 additions and 49 deletions
+3
View File
@@ -2,3 +2,6 @@ env/
.workon
.epio-app
*.pyc
.tox
*.egg-info
*.swp
+1 -1
View File
@@ -1,3 +1,3 @@
# -*- coding: utf-8 -*-
from core import *
from .core import *
+14 -5
View File
@@ -19,6 +19,7 @@ from flask import Flask, Response, request, render_template, redirect, jsonify,
from werkzeug.datastructures import WWWAuthenticate
from werkzeug.http import http_date
from werkzeug.wrappers import BaseResponse
from six.moves import range as xrange
from . import filters
from .helpers import get_headers, status_code, get_dict, check_basic_auth, check_digest_auth, H, ROBOT_TXT, ANGRY_ASCII
@@ -352,9 +353,17 @@ def digest_auth(qop=None, user='user', passwd='passwd'):
response = app.make_response('')
response.status_code = 401
nonce = H("%s:%d:%s" % (request.remote_addr,
time.time(),
os.urandom(10)))
# RFC2616 Section4.2: HTTP headers are ASCII. That means
# request.remote_addr was originally ASCII, so I should be able to
# encode it back to ascii. Also, RFC2617 says about nonces: "The
# contents of the nonce are implementation dependent"
nonce = H(b''.join([
getattr(request,'remote_addr',u'').encode('ascii'),
b':',
str(time.time()).encode('ascii'),
b':',
os.urandom(10)
]))
opaque = H(os.urandom(10))
auth = WWWAuthenticate("digest")
@@ -393,7 +402,7 @@ def drip():
def generate_bytes():
for i in xrange(numbytes):
yield bytes(chr(42))
yield u"*".encode('utf-8')
time.sleep(pause)
return Response(generate_bytes(), headers={
@@ -403,7 +412,7 @@ def drip():
@app.route('/base64/<value>')
def decode_base64(value):
"""Decodes base64url-encoded string"""
encoded = value.encode('utf-8')
encoded = value.encode('utf-8') # base64 expects binary string as input
return base64.urlsafe_b64decode(encoded).decode('utf-8')
+2 -2
View File
@@ -10,7 +10,7 @@ This module provides response filter decorators.
import gzip as gzip2
import zlib
from cStringIO import StringIO
from six import BytesIO
from decimal import Decimal
from time import time as now
@@ -44,7 +44,7 @@ def gzip(f, *args, **kwargs):
else:
content = data
gzip_buffer = StringIO()
gzip_buffer = BytesIO()
gzip_file = gzip2.GzipFile(
mode='wb',
compresslevel=4,
+32 -24
View File
@@ -79,9 +79,13 @@ def json_safe(string, content_type='application/octet-stream'):
try:
_encoded = json.dumps(string)
return string
except ValueError:
return ''.join(['data:%s;base64,' % content_type,
base64.b64encode(string)])
except (ValueError, TypeError):
return b''.join([
b'data:',
content_type.encode('utf-8'),
b';base64,',
base64.b64encode(string)
]).decode('utf-8')
def get_files():
@@ -139,17 +143,11 @@ def get_dict(*keys, **extras):
data = request.data
form = request.form
if (len(form) == 1) and (not data):
if not form.values().pop():
data = form.keys().pop()
form = None
form = semiflatten(form)
form = semiflatten(request.form)
try:
_json = json.loads(data)
except ValueError:
_json = json.loads(data.decode('utf-8'))
except (ValueError, TypeError):
_json = None
d = dict(
@@ -237,9 +235,11 @@ def HA1(realm, username, password):
HA1 = md5(A1) = MD5(username:realm:password)
"""
return H("%s:%s:%s" % (username,
realm,
password))
if not realm:
realm = u''
return H(b":".join([username.encode('utf-8'),
realm.encode('utf-8'),
password.encode('utf-8')]))
def HA2(credentails, request):
@@ -251,7 +251,7 @@ def HA2(credentails, request):
HA2 = md5(A2) = MD5(method:digestURI:MD5(entityBody))
"""
if credentails.get("qop") == "auth" or credentails.get('qop') is None:
return H("%s:%s" % (request['method'], request['uri']))
return H(b":".join([request['method'].encode('utf-8'), request['uri'].encode('utf-8')]))
elif credentails.get("qop") == "auth-int":
for k in 'method', 'uri', 'body':
if k not in request:
@@ -276,20 +276,28 @@ def response(credentails, password, request):
- `request`: request dict
"""
response = None
HA1_value = HA1(credentails.get('realm'), credentails.get('username'), password)
HA1_value = HA1(
credentails.get('realm'),
credentails.get('username'),
password
)
HA2_value = HA2(credentails, request)
if credentails.get('qop') is None:
response = H(":".join([HA1_value, credentails.get('nonce'), HA2_value]))
response = H(b":".join([
HA1_value.encode('utf-8'),
credentails.get('nonce').encode('utf-8'),
HA2_value.encode('utf-8')
]))
elif credentails.get('qop') == 'auth' or credentails.get('qop') == 'auth-int':
for k in 'nonce', 'nc', 'cnonce', 'qop':
if k not in credentails:
raise ValueError("%s required for response H" % k)
response = H(":".join([HA1_value,
credentails.get('nonce'),
credentails.get('nc'),
credentails.get('cnonce'),
credentails.get('qop'),
HA2_value]))
response = H(b":".join([HA1_value.encode('utf-8'),
credentails.get('nonce').encode('utf-8'),
credentails.get('nc').encode('utf-8'),
credentails.get('cnonce').encode('utf-8'),
credentails.get('qop').encode('utf-8'),
HA2_value.encode('utf-8')]))
else:
raise ValueError("qop value are wrong")
+2 -2
View File
@@ -16,7 +16,7 @@ class CaseInsensitiveDict(dict):
"""
def _lower_keys(self):
return map(str.lower, self.keys())
return [str.lower(k) for k in self.keys()]
def __contains__(self, key):
return key.lower() in self._lower_keys()
@@ -24,4 +24,4 @@ class CaseInsensitiveDict(dict):
def __getitem__(self, key):
# We allow fall-through here, so values default to None
if key in self:
return self.items()[self._lower_keys().index(key.lower())][1]
return list(self.items())[self._lower_keys().index(key.lower())][1]
+1
View File
@@ -8,3 +8,4 @@ greenlet==0.4.2
gunicorn==18.0
itsdangerous==0.24
wsgiref==0.1.2
six==1.6.1
+32
View File
@@ -0,0 +1,32 @@
from setuptools import setup, find_packages
import codecs
import os
import re
setup(
name="httpbin",
version="0.1.0",
description="HTTP Request and Response Service",
# The project URL.
url='https://github.com/kennethreitz/httpbin',
# Author details
author='Kenneth Reitz',
author_email='me@kennethreitz.com',
# Choose your license
license='MIT',
classifiers=[
'Development Status :: 5 - Production/Stable',
'Intended Audience :: Developers',
'Natural Language :: English',
'License :: OSI Approved :: MIT License',
'Programming Language :: Python',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3.4',
],
packages=find_packages(),
install_requires=['Flask','MarkupSafe','decorator','itsdangerous','six'],
)
+76 -15
View File
@@ -2,6 +2,8 @@
# -*- coding: utf-8 -*-
import base64
import unittest
from werkzeug.http import parse_dict_header
from hashlib import md5
import httpbin
@@ -16,47 +18,106 @@ class HttpbinTestCase(unittest.TestCase):
"""Httpbin tests"""
def setUp(self):
httpbin.app.debug = True
self.app = httpbin.app.test_client()
def test_base64(self):
greeting = u'Здравствуй, мир!'
b64_encoded = _string_to_base64(greeting)
response = self.app.get('/base64/{0}'.format(b64_encoded))
response = self.app.get(b'/base64/' + b64_encoded)
content = response.data.decode('utf-8')
self.assertEquals(greeting, content)
self.assertEqual(greeting, content)
def test_post_binary(self):
response = self.app.post('/post',
data='\x01\x02\x03\x81\x82\x83',
data=b'\x01\x02\x03\x81\x82\x83',
content_type='application/octet-stream')
self.assertEquals(response.status_code, 200)
self.assertEqual(response.status_code, 200)
def test_post_file_text(self):
with open('httpbin/core.py') as f:
response = self.app.post('/post', data={"file": f})
self.assertEquals(response.status_code, 200)
response = self.app.post('/post', data={"file": f.read()})
self.assertEqual(response.status_code, 200)
def test_post_file_binary(self):
with open('httpbin/core.pyc') as f:
response = self.app.post('/post', data={"file": f})
self.assertEquals(response.status_code, 200)
with open('httpbin/core.pyc','rb') as f:
response = self.app.post('/post', data={"file": f.read()})
self.assertEqual(response.status_code, 200)
def test_set_cors_headers_after_request(self):
response = self.app.get('/get')
self.assertEquals(response.headers.get('Access-Control-Allow-Origin'), '*')
self.assertEqual(response.headers.get('Access-Control-Allow-Origin'), '*')
def test_set_cors_headers_after_request_with_request_origin(self):
response = self.app.get('/get', headers={'Origin': 'origin'})
self.assertEquals(response.headers.get('Access-Control-Allow-Origin'), 'origin')
self.assertEqual(response.headers.get('Access-Control-Allow-Origin'), 'origin')
def test_set_cors_headers_with_options_verb(self):
response = self.app.open('/get', method='OPTIONS')
self.assertEquals(response.headers.get('Access-Control-Allow-Origin'), '*')
self.assertEquals(response.headers.get('Access-Control-Allow-Credentials'), 'true')
self.assertEquals(response.headers.get('Access-Control-Allow-Methods'), 'GET, POST, PUT, DELETE, PATCH, OPTIONS')
self.assertEquals(response.headers.get('Access-Control-Max-Age'), '3600')
self.assertEqual(response.headers.get('Access-Control-Allow-Origin'), '*')
self.assertEqual(response.headers.get('Access-Control-Allow-Credentials'), 'true')
self.assertEqual(response.headers.get('Access-Control-Allow-Methods'), 'GET, POST, PUT, DELETE, PATCH, OPTIONS')
self.assertEqual(response.headers.get('Access-Control-Max-Age'), '3600')
self.assertNotIn('Access-Control-Allow-Headers', response.headers) # FIXME should we add any extra headers?
def test_user_agent(self):
response = self.app.get('/user-agent', headers={'User-Agent':'test'})
self.assertIn('test', response.data.decode('utf-8'))
self.assertEqual(response.status_code, 200)
def test_gzip(self):
response = self.app.get('/gzip')
self.assertEqual(response.status_code, 200)
def test_digest_auth(self):
# make first request
unauthorized_response = self.app.get(
'/digest-auth/auth/user/passwd',
environ_base = {
'REMOTE_ADDR':'127.0.0.1', # digest auth uses the remote addr to build the nonce
})
# make sure it returns a 401
self.assertEqual(unauthorized_response.status_code, 401)
header = unauthorized_response.headers.get('WWW-Authenticate')
auth_type, auth_info = header.split(None, 1)
# Begin crappy digest-auth implementation
d = parse_dict_header(auth_info)
a1 = b'user:' + d['realm'].encode('utf-8') + b':passwd'
ha1 = md5(a1).hexdigest().encode('utf-8')
a2 = b'GET:/digest-auth/auth/user/passwd'
ha2 = md5(a2).hexdigest().encode('utf-8')
a3 = ha1 + b':' + d['nonce'].encode('utf-8') + b':' + ha2
auth_response = md5(a3).hexdigest()
auth_header = 'Digest username="user",realm="' + \
d['realm'] + \
'",nonce="' + \
d['nonce'] + \
'",uri="/digest-auth/auth/user/passwd",response="' + \
auth_response + \
'",opaque="' + \
d['opaque'] + '"'
# make second request
authorized_response = self.app.get(
'/digest-auth/auth/user/passwd',
environ_base = {
'REMOTE_ADDR':'127.0.0.1', # httpbin's digest auth implementation uses the remote addr to build the nonce
},
headers = {
'Authorization': auth_header,
}
)
# done!
self.assertEqual(authorized_response.status_code, 200)
def test_drip(self):
response = self.app.get('/drip?numbytes=400&duration=2&delay=1')
self.assertEqual(len(response.get_data()), 400)
self.assertEqual(response.status_code, 200)
if __name__ == '__main__':
unittest.main()
+4
View File
@@ -0,0 +1,4 @@
[tox]
envlist = py27,py34
[testenv]
commands=python test_httpbin.py