diff --git a/.gitignore b/.gitignore
index ea1169a..eac3867 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,6 +1,7 @@
env/
build/
dist/
+.eggs/
.workon
.epio-app
*.pyc
diff --git a/Pipfile b/Pipfile
index eb854aa..8ad29bb 100644
--- a/Pipfile
+++ b/Pipfile
@@ -15,3 +15,4 @@ flasgger = "*"
pyyaml = {git = "https://github.com/yaml/pyyaml.git"}
[dev-packages]
+rope = "*"
diff --git a/Pipfile.lock b/Pipfile.lock
index 963e356..baa2566 100644
--- a/Pipfile.lock
+++ b/Pipfile.lock
@@ -1,7 +1,7 @@
{
"_meta": {
"hash": {
- "sha256": "1faaf631302564db835f60205b28b15c189fe0230fd836465991bd5c7f3e0801"
+ "sha256": "b709c9b498d9be5088c0f485aafe18a04a8ed5144d397111a8f1d8bd06d7a16e"
},
"pipfile-spec": 6,
"requires": {},
@@ -235,5 +235,12 @@
"version": "==0.14.1"
}
},
- "develop": {}
+ "develop": {
+ "rope": {
+ "hashes": [
+ "sha256:a09edfd2034fd50099a67822f9bd851fbd0f4e98d3b87519f6267b60e50d80d1"
+ ],
+ "version": "==0.10.7"
+ }
+ }
}
diff --git a/httpbin/core.py b/httpbin/core.py
index e9709a8..913bccc 100644
--- a/httpbin/core.py
+++ b/httpbin/core.py
@@ -15,7 +15,17 @@ import time
import uuid
import argparse
-from flask import Flask, Response, request, render_template, redirect, jsonify as flask_jsonify, make_response, url_for, abort
+from flask import (
+ Flask,
+ Response,
+ request,
+ render_template,
+ redirect,
+ jsonify as flask_jsonify,
+ make_response,
+ url_for,
+ abort,
+)
from six.moves import range as xrange
from werkzeug.datastructures import WWWAuthenticate, MultiDict
from werkzeug.http import http_date
@@ -24,98 +34,120 @@ from werkzeug.http import parse_authorization_header
from flasgger import Swagger, NO_SANITIZER
from . import filters
-from .helpers import get_headers, status_code, get_dict, get_request_range, check_basic_auth, check_digest_auth, \
- secure_cookie, H, ROBOT_TXT, ANGRY_ASCII, parse_multi_value_header, next_stale_after_value, \
- digest_challenge_response
+from .helpers import (
+ get_headers,
+ status_code,
+ get_dict,
+ get_request_range,
+ check_basic_auth,
+ check_digest_auth,
+ secure_cookie,
+ H,
+ ROBOT_TXT,
+ ANGRY_ASCII,
+ parse_multi_value_header,
+ next_stale_after_value,
+ digest_challenge_response,
+)
from .utils import weighted_choice
from .structures import CaseInsensitiveDict
-with open(os.path.join(os.path.realpath(os.path.dirname(__file__)), 'VERSION')) as version_file:
+with open(
+ os.path.join(os.path.realpath(os.path.dirname(__file__)), "VERSION")
+) as version_file:
version = version_file.read().strip()
ENV_COOKIES = (
- '_gauges_unique',
- '_gauges_unique_year',
- '_gauges_unique_month',
- '_gauges_unique_day',
- '_gauges_unique_hour',
- '__utmz',
- '__utma',
- '__utmb'
+ "_gauges_unique",
+ "_gauges_unique_year",
+ "_gauges_unique_month",
+ "_gauges_unique_day",
+ "_gauges_unique_hour",
+ "__utmz",
+ "__utma",
+ "__utmb",
)
+
def jsonify(*args, **kwargs):
response = flask_jsonify(*args, **kwargs)
- if not response.data.endswith(b'\n'):
- response.data += b'\n'
+ if not response.data.endswith(b"\n"):
+ response.data += b"\n"
return response
+
# Prevent WSGI from correcting the casing of the Location header
BaseResponse.autocorrect_location_header = False
# Find the correct template folder when running from a different location
-tmpl_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'templates')
+tmpl_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "templates")
app = Flask(__name__, template_folder=tmpl_dir)
-app.debug = bool(os.environ.get('DEBUG'))
-app.config['JSONIFY_PRETTYPRINT_REGULAR'] = True
+app.debug = bool(os.environ.get("DEBUG"))
+app.config["JSONIFY_PRETTYPRINT_REGULAR"] = True
-app.add_template_global('HTTPBIN_TRACKING' in os.environ, name='tracking_enabled')
+app.add_template_global("HTTPBIN_TRACKING" in os.environ, name="tracking_enabled")
-app.config['SWAGGER'] = {
- 'title': 'httpbin.org',
- 'uiversion': 3
-}
+app.config["SWAGGER"] = {"title": "httpbin.org", "uiversion": 3}
template = {
- "swagger": "2.0",
- "info": {
- "title": "httpbin.org",
- "description": (
- "A simple HTTP Request & Response Service."
- "
Run locally: $ docker run -p 80:80 kennethreitz/httpbin"
- ),
- "contact": {
- "responsibleOrganization": "Kenneth Reitz",
- "responsibleDeveloper": "Kenneth Reitz",
- "email": "me@kennethreitz.org",
- "url": "https://kennethreitz.org",
+ "swagger": "2.0",
+ "info": {
+ "title": "httpbin.org",
+ "description": (
+ "A simple HTTP Request & Response Service."
+ "
Run locally: $ docker run -p 80:80 kennethreitz/httpbin"
+ ),
+ "contact": {
+ "responsibleOrganization": "Kenneth Reitz",
+ "responsibleDeveloper": "Kenneth Reitz",
+ "email": "me@kennethreitz.org",
+ "url": "https://kennethreitz.org",
+ },
+ # "termsOfService": "http://me.com/terms",
+ "version": version,
},
- # "termsOfService": "http://me.com/terms",
- "version": version
- },
- "host": "httpbin.org", # overrides localhost:5000
- "basePath": "/", # base bash for blueprint registration
- "schemes": [
- "https"
- ],
- 'protocol': 'https',
- 'tags': [
- {
- 'name': 'HTTP Methods',
- 'description': 'Testing different HTTP verbs',
- # 'externalDocs': {'description': 'Learn more', 'url': 'https://www.w3.org/Protocols/rfc2616/rfc2616-sec9.html'}
- },
- {'name': 'Auth', 'description': 'Auth methods'},
- {'name': 'Status codes', 'description': 'Generates responses with given status code'},
- {'name': 'Request inspection', 'description': 'Inspect the request data'},
- {'name': 'Response inspection', 'description': 'Inspect the response data like caching and headers'},
- {'name': 'Response formats', 'description': 'Returns responses in different data formats'},
- {'name': 'Dynamic data', 'description': 'Generates random and dynamic data'},
- {'name': 'Cookies', 'description': 'Creates, reads and deletes Cookies'},
- {'name': 'Images', 'description': 'Returns different image formats'},
- {'name': 'Redirects', 'description': 'Returns different redirect responses'},
- {'name': 'Anything', 'description': 'Returns anything that is passed to request'},
- ]
+ "host": "httpbin.org", # overrides localhost:5000
+ "basePath": "/", # base bash for blueprint registration
+ "schemes": ["https"],
+ "protocol": "https",
+ "tags": [
+ {
+ "name": "HTTP Methods",
+ "description": "Testing different HTTP verbs",
+ # 'externalDocs': {'description': 'Learn more', 'url': 'https://www.w3.org/Protocols/rfc2616/rfc2616-sec9.html'}
+ },
+ {"name": "Auth", "description": "Auth methods"},
+ {
+ "name": "Status codes",
+ "description": "Generates responses with given status code",
+ },
+ {"name": "Request inspection", "description": "Inspect the request data"},
+ {
+ "name": "Response inspection",
+ "description": "Inspect the response data like caching and headers",
+ },
+ {
+ "name": "Response formats",
+ "description": "Returns responses in different data formats",
+ },
+ {"name": "Dynamic data", "description": "Generates random and dynamic data"},
+ {"name": "Cookies", "description": "Creates, reads and deletes Cookies"},
+ {"name": "Images", "description": "Returns different image formats"},
+ {"name": "Redirects", "description": "Returns different redirect responses"},
+ {
+ "name": "Anything",
+ "description": "Returns anything that is passed to request",
+ },
+ ],
}
swagger_config = {
- "headers": [
- ],
+ "headers": [],
"specs": [
{
- "endpoint": 'spec',
- "route": '/spec.json',
+ "endpoint": "spec",
+ "route": "/spec.json",
"rule_filter": lambda rule: True, # all in
"model_filter": lambda tag: True, # all in
}
@@ -123,7 +155,7 @@ swagger_config = {
"static_url_path": "/flasgger_static",
# "static_folder": "static", # must be set by user
"swagger_ui": True,
- "specs_route": "/"
+ "specs_route": "/",
}
swagger = Swagger(app, sanitizer=NO_SANITIZER, template=template, config=swagger_config)
@@ -136,11 +168,15 @@ if os.environ.get("BUGSNAG_API_KEY") is not None:
try:
import bugsnag
import bugsnag.flask
+
release_stage = os.environ.get("BUGSNAG_RELEASE_STAGE") or "production"
- bugsnag.configure(api_key=os.environ.get("BUGSNAG_API_KEY"),
- project_root=os.path.dirname(os.path.abspath(__file__)),
- use_ssl=True, release_stage=release_stage,
- ignore_classes=['werkzeug.exceptions.NotFound'])
+ bugsnag.configure(
+ api_key=os.environ.get("BUGSNAG_API_KEY"),
+ project_root=os.path.dirname(os.path.abspath(__file__)),
+ use_ssl=True,
+ release_stage=release_stage,
+ ignore_classes=["werkzeug.exceptions.NotFound"],
+ )
bugsnag.flask.handle_exceptions(app)
except:
app.logger.warning("Unable to initialize Bugsnag exception handling.")
@@ -159,31 +195,40 @@ empties the input request stream.
- flask will hang and does not seem to properly terminate the request, so
we explicitly deny chunked requests.
"""
+
+
@app.before_request
def before_request():
- if request.environ.get('HTTP_TRANSFER_ENCODING', '').lower() == 'chunked':
- server = request.environ.get('SERVER_SOFTWARE', '')
- if server.lower().startswith('gunicorn/'):
- if 'wsgi.input_terminated' in request.environ:
- app.logger.debug("environ wsgi.input_terminated already set, keeping: %s"
- % request.environ['wsgi.input_terminated'])
+ if request.environ.get("HTTP_TRANSFER_ENCODING", "").lower() == "chunked":
+ server = request.environ.get("SERVER_SOFTWARE", "")
+ if server.lower().startswith("gunicorn/"):
+ if "wsgi.input_terminated" in request.environ:
+ app.logger.debug(
+ "environ wsgi.input_terminated already set, keeping: %s"
+ % request.environ["wsgi.input_terminated"]
+ )
else:
- request.environ['wsgi.input_terminated'] = 1
+ request.environ["wsgi.input_terminated"] = 1
else:
abort(501, "Chunked requests are not supported for server %s" % server)
+
@app.after_request
def set_cors_headers(response):
- response.headers['Access-Control-Allow-Origin'] = request.headers.get('Origin', '*')
- response.headers['Access-Control-Allow-Credentials'] = 'true'
+ response.headers["Access-Control-Allow-Origin"] = request.headers.get("Origin", "*")
+ response.headers["Access-Control-Allow-Credentials"] = "true"
- if request.method == 'OPTIONS':
+ if request.method == "OPTIONS":
# Both of these headers are only used for the "preflight request"
# http://www.w3.org/TR/cors/#access-control-allow-methods-response-header
- response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, DELETE, PATCH, OPTIONS'
- response.headers['Access-Control-Max-Age'] = '3600' # 1 hour cache
- if request.headers.get('Access-Control-Request-Headers') is not None:
- response.headers['Access-Control-Allow-Headers'] = request.headers['Access-Control-Request-Headers']
+ response.headers[
+ "Access-Control-Allow-Methods"
+ ] = "GET, POST, PUT, DELETE, PATCH, OPTIONS"
+ response.headers["Access-Control-Max-Age"] = "3600" # 1 hour cache
+ if request.headers.get("Access-Control-Request-Headers") is not None:
+ response.headers["Access-Control-Allow-Headers"] = request.headers[
+ "Access-Control-Request-Headers"
+ ]
return response
@@ -191,13 +236,14 @@ def set_cors_headers(response):
# Routes
# ------
-@app.route('/legacy')
+
+@app.route("/legacy")
def view_landing_page():
"""Generates Landing Page in legacy layout."""
- return render_template('index.html')
+ return render_template("index.html")
-@app.route('/html')
+@app.route("/html")
def view_html_page():
"""Returns a simple HTML document.
---
@@ -210,10 +256,10 @@ def view_html_page():
description: An HTML page.
"""
- return render_template('moby.html')
+ return render_template("moby.html")
-@app.route('/robots.txt')
+@app.route("/robots.txt")
def view_robots_page():
"""Returns some robots.txt rules.
---
@@ -232,7 +278,7 @@ def view_robots_page():
return response
-@app.route('/deny')
+@app.route("/deny")
def view_deny_page():
"""Returns page denied by robots.txt rules.
---
@@ -251,7 +297,7 @@ def view_deny_page():
# return "YOU SHOULDN'T BE HERE"
-@app.route('/ip')
+@app.route("/ip")
def view_origin():
"""Returns the requester's IP Address.
---
@@ -264,10 +310,10 @@ def view_origin():
description: The Requester's IP Address.
"""
- return jsonify(origin=request.headers.get('X-Forwarded-For', request.remote_addr))
+ return jsonify(origin=request.headers.get("X-Forwarded-For", request.remote_addr))
-@app.route('/uuid')
+@app.route("/uuid")
def view_uuid():
"""Return a UUID4.
---
@@ -283,7 +329,7 @@ def view_uuid():
return jsonify(uuid=str(uuid.uuid4()))
-@app.route('/headers')
+@app.route("/headers")
def view_headers():
"""Return the incoming request's HTTP headers.
---
@@ -299,7 +345,7 @@ def view_headers():
return jsonify(get_headers())
-@app.route('/user-agent')
+@app.route("/user-agent")
def view_user_agent():
"""Return the incoming requests's User-Agent header.
---
@@ -314,10 +360,10 @@ def view_user_agent():
headers = get_headers()
- return jsonify({'user-agent': headers['user-agent']})
+ return jsonify({"user-agent": headers["user-agent"]})
-@app.route('/get', methods=('GET',))
+@app.route("/get", methods=("GET",))
def view_get():
"""The request's query parameters.
---
@@ -330,11 +376,14 @@ def view_get():
description: The request's query parameters.
"""
- return jsonify(get_dict('url', 'args', 'headers', 'origin'))
+ return jsonify(get_dict("url", "args", "headers", "origin"))
-@app.route('/anything', methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'TRACE'])
-@app.route('/anything/', methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'TRACE'])
+@app.route("/anything", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "TRACE"])
+@app.route(
+ "/anything/",
+ methods=["GET", "POST", "PUT", "DELETE", "PATCH", "TRACE"],
+)
def view_anything(anything=None):
"""Returns anything passed in request data.
---
@@ -347,10 +396,22 @@ def view_anything(anything=None):
description: Anything passed in request
"""
- return jsonify(get_dict('url', 'args', 'headers', 'origin', 'method', 'form', 'data', 'files', 'json'))
+ return jsonify(
+ get_dict(
+ "url",
+ "args",
+ "headers",
+ "origin",
+ "method",
+ "form",
+ "data",
+ "files",
+ "json",
+ )
+ )
-@app.route('/post', methods=('POST',))
+@app.route("/post", methods=("POST",))
def view_post():
"""The request's POST parameters.
---
@@ -363,11 +424,12 @@ def view_post():
description: The request's POST parameters.
"""
- return jsonify(get_dict(
- 'url', 'args', 'form', 'data', 'origin', 'headers', 'files', 'json'))
+ return jsonify(
+ get_dict("url", "args", "form", "data", "origin", "headers", "files", "json")
+ )
-@app.route('/put', methods=('PUT',))
+@app.route("/put", methods=("PUT",))
def view_put():
"""The request's PUT parameters.
---
@@ -380,11 +442,12 @@ def view_put():
description: The request's PUT parameters.
"""
- return jsonify(get_dict(
- 'url', 'args', 'form', 'data', 'origin', 'headers', 'files', 'json'))
+ return jsonify(
+ get_dict("url", "args", "form", "data", "origin", "headers", "files", "json")
+ )
-@app.route('/patch', methods=('PATCH',))
+@app.route("/patch", methods=("PATCH",))
def view_patch():
"""The request's PATCH parameters.
---
@@ -397,11 +460,12 @@ def view_patch():
description: The request's PATCH parameters.
"""
- return jsonify(get_dict(
- 'url', 'args', 'form', 'data', 'origin', 'headers', 'files', 'json'))
+ return jsonify(
+ get_dict("url", "args", "form", "data", "origin", "headers", "files", "json")
+ )
-@app.route('/delete', methods=('DELETE',))
+@app.route("/delete", methods=("DELETE",))
def view_delete():
""""The request's DELETE parameters.
---
@@ -414,11 +478,12 @@ def view_delete():
description: The request's DELETE parameters.
"""
- return jsonify(get_dict(
- 'url', 'args', 'form', 'data', 'origin', 'headers', 'files', 'json'))
+ return jsonify(
+ get_dict("url", "args", "form", "data", "origin", "headers", "files", "json")
+ )
-@app.route('/gzip')
+@app.route("/gzip")
@filters.gzip
def view_gzip_encoded_content():
"""Returns GZip-encoded data.
@@ -432,11 +497,10 @@ def view_gzip_encoded_content():
description: GZip-encoded data.
"""
- return jsonify(get_dict(
- 'origin', 'headers', method=request.method, gzipped=True))
+ return jsonify(get_dict("origin", "headers", method=request.method, gzipped=True))
-@app.route('/deflate')
+@app.route("/deflate")
@filters.deflate
def view_deflate_encoded_content():
""""Returns Deflate-encoded data.
@@ -450,11 +514,10 @@ def view_deflate_encoded_content():
description: Defalte-encoded data.
"""
- return jsonify(get_dict(
- 'origin', 'headers', method=request.method, deflated=True))
+ return jsonify(get_dict("origin", "headers", method=request.method, deflated=True))
-@app.route('/brotli')
+@app.route("/brotli")
@filters.brotli
def view_brotli_encoded_content():
""""Returns Brotli-encoded data.
@@ -468,11 +531,10 @@ def view_brotli_encoded_content():
description: Brotli-encoded data.
"""
- return jsonify(get_dict(
- 'origin', 'headers', method=request.method, brotli=True))
+ return jsonify(get_dict("origin", "headers", method=request.method, brotli=True))
-@app.route('/redirect/')
+@app.route("/redirect/")
def redirect_n_times(n):
"""302 Redirects n times.
---
@@ -490,22 +552,24 @@ def redirect_n_times(n):
"""
assert n > 0
- absolute = request.args.get('absolute', 'false').lower() == 'true'
+ absolute = request.args.get("absolute", "false").lower() == "true"
if n == 1:
- return redirect(url_for('view_get', _external=absolute))
+ return redirect(url_for("view_get", _external=absolute))
if absolute:
- return _redirect('absolute', n, True)
+ return _redirect("absolute", n, True)
else:
- return _redirect('relative', n, False)
+ return _redirect("relative", n, False)
def _redirect(kind, n, external):
- return redirect(url_for('{0}_redirect_n_times'.format(kind), n=n - 1, _external=external))
+ return redirect(
+ url_for("{0}_redirect_n_times".format(kind), n=n - 1, _external=external)
+ )
-@app.route('/redirect-to', methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'TRACE'])
+@app.route("/redirect-to", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "TRACE"])
def redirect_to():
"""302/3XX Redirects to the given URL.
---
@@ -563,24 +627,24 @@ def redirect_to():
description: A redirection.
"""
- argsDict = request.form.to_dict(flat=True) if request.method in ('POST', 'PATCH', 'PUT') else request.args.items()
- args = CaseInsensitiveDict(argsDict)
+ args_dict = request.args.items()
+ args = CaseInsensitiveDict(args_dict)
# We need to build the response manually and convert to UTF-8 to prevent
# werkzeug from "fixing" the URL. This endpoint should set the Location
# header to the exact string supplied.
- response = app.make_response('')
+ response = app.make_response("")
response.status_code = 302
- if 'status_code' in args:
- status_code = int(args['status_code'])
+ if "status_code" in args:
+ status_code = int(args["status_code"])
if status_code >= 300 and status_code < 400:
response.status_code = status_code
- response.headers['Location'] = args['url'].encode('utf-8')
+ response.headers["Location"] = args["url"].encode("utf-8")
return response
-@app.route('/relative-redirect/')
+@app.route("/relative-redirect/")
def relative_redirect_n_times(n):
"""Relatively 302 Redirects n times.
---
@@ -599,18 +663,18 @@ def relative_redirect_n_times(n):
assert n > 0
- response = app.make_response('')
+ response = app.make_response("")
response.status_code = 302
if n == 1:
- response.headers['Location'] = url_for('view_get')
+ response.headers["Location"] = url_for("view_get")
return response
- response.headers['Location'] = url_for('relative_redirect_n_times', n=n - 1)
+ response.headers["Location"] = url_for("relative_redirect_n_times", n=n - 1)
return response
-@app.route('/absolute-redirect/')
+@app.route("/absolute-redirect/")
def absolute_redirect_n_times(n):
"""Absolutely 302 Redirects n times.
---
@@ -630,12 +694,12 @@ def absolute_redirect_n_times(n):
assert n > 0
if n == 1:
- return redirect(url_for('view_get', _external=True))
+ return redirect(url_for("view_get", _external=True))
- return _redirect('absolute', n, True)
+ return _redirect("absolute", n, True)
-@app.route('/stream/')
+@app.route("/stream/")
def stream_n_messages(n):
"""Stream n JSON responses
---
@@ -651,20 +715,20 @@ def stream_n_messages(n):
200:
description: Streamed JSON responses.
"""
- response = get_dict('url', 'args', 'headers', 'origin')
+ response = get_dict("url", "args", "headers", "origin")
n = min(n, 100)
def generate_stream():
for i in range(n):
- response['id'] = i
- yield json.dumps(response) + '\n'
+ response["id"] = i
+ yield json.dumps(response) + "\n"
- return Response(generate_stream(), headers={
- "Content-Type": "application/json",
- })
+ return Response(generate_stream(), headers={"Content-Type": "application/json"})
-@app.route('/status/', methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'TRACE'])
+@app.route(
+ "/status/", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "TRACE"]
+)
def view_status_code(codes):
"""Return status code or random status code if more than one are given
---
@@ -688,32 +752,32 @@ def view_status_code(codes):
description: Server Errors
"""
- if ',' not in codes:
+ if "," not in codes:
try:
code = int(codes)
except ValueError:
- return Response('Invalid status code', status=400)
+ return Response("Invalid status code", status=400)
return status_code(code)
choices = []
- for choice in codes.split(','):
- if ':' not in choice:
+ for choice in codes.split(","):
+ if ":" not in choice:
code = choice
weight = 1
else:
- code, weight = choice.split(':')
+ code, weight = choice.split(":")
try:
choices.append((int(code), float(weight)))
except ValueError:
- return Response('Invalid status code', status=400)
+ return Response("Invalid status code", status=400)
code = weighted_choice(choices)
return status_code(code)
-@app.route('/response-headers', methods=['GET', 'POST'])
+@app.route("/response-headers", methods=["GET", "POST"])
def response_headers():
"""Returns a set of response headers from the query string.
---
@@ -757,7 +821,7 @@ def response_headers():
return response
-@app.route('/cookies')
+@app.route("/cookies")
def view_cookies(hide_env=True):
"""Returns cookie data.
---
@@ -772,7 +836,7 @@ def view_cookies(hide_env=True):
cookies = dict(request.cookies.items())
- if hide_env and ('show_env' not in request.args):
+ if hide_env and ("show_env" not in request.args):
for key in ENV_COOKIES:
try:
del cookies[key]
@@ -782,14 +846,14 @@ def view_cookies(hide_env=True):
return jsonify(cookies=cookies)
-@app.route('/forms/post')
+@app.route("/forms/post")
def view_forms_post():
"""Simple HTML form."""
- return render_template('forms-post.html')
+ return render_template("forms-post.html")
-@app.route('/cookies/set//')
+@app.route("/cookies/set//")
def set_cookie(name, value):
"""Sets a cookie and redirects to cookie list.
---
@@ -809,13 +873,13 @@ def set_cookie(name, value):
description: Set cookies and redirects to cookie list.
"""
- r = app.make_response(redirect(url_for('view_cookies')))
+ r = app.make_response(redirect(url_for("view_cookies")))
r.set_cookie(key=name, value=value, secure=secure_cookie())
return r
-@app.route('/cookies/set')
+@app.route("/cookies/set")
def set_cookies():
"""Sets cookie(s) as provided by the query string and redirects to cookie list.
---
@@ -839,14 +903,14 @@ def set_cookies():
"""
cookies = dict(request.args.items())
- r = app.make_response(redirect(url_for('view_cookies')))
+ r = app.make_response(redirect(url_for("view_cookies")))
for key, value in cookies.items():
r.set_cookie(key=key, value=value, secure=secure_cookie())
return r
-@app.route('/cookies/delete')
+@app.route("/cookies/delete")
def delete_cookies():
"""Deletes cookie(s) as provided by the query string and redirects to cookie list.
---
@@ -870,15 +934,15 @@ def delete_cookies():
"""
cookies = dict(request.args.items())
- r = app.make_response(redirect(url_for('view_cookies')))
+ r = app.make_response(redirect(url_for("view_cookies")))
for key, value in cookies.items():
r.delete_cookie(key=key)
return r
-@app.route('/basic-auth//')
-def basic_auth(user='user', passwd='passwd'):
+@app.route("/basic-auth//")
+def basic_auth(user="user", passwd="passwd"):
"""Prompts the user for authorization using HTTP Basic Auth.
---
tags:
@@ -905,8 +969,8 @@ def basic_auth(user='user', passwd='passwd'):
return jsonify(authenticated=True, user=user)
-@app.route('/hidden-basic-auth//')
-def hidden_basic_auth(user='user', passwd='passwd'):
+@app.route("/hidden-basic-auth//")
+def hidden_basic_auth(user="user", passwd="passwd"):
""""Prompts the user for authorization using HTTP Basic Auth.
---
tags:
@@ -932,7 +996,7 @@ def hidden_basic_auth(user='user', passwd='passwd'):
return jsonify(authenticated=True, user=user)
-@app.route('/bearer')
+@app.route("/bearer")
def bearer_auth():
""""Prompts the user for authorization using bearer authentication..
---
@@ -951,20 +1015,20 @@ def bearer_auth():
401:
description: Unsuccessful authentication.
"""
- authorization = request.headers.get('Authorization')
- if not (authorization and authorization.startswith('Bearer ')):
- response = app.make_response('')
- response.headers['WWW-Authenticate'] = 'Bearer'
+ authorization = request.headers.get("Authorization")
+ if not (authorization and authorization.startswith("Bearer ")):
+ response = app.make_response("")
+ response.headers["WWW-Authenticate"] = "Bearer"
response.status_code = 401
return response
- slice_start = len('Bearer ')
+ slice_start = len("Bearer ")
token = authorization[slice_start:]
return jsonify(authenticated=True, token=token)
-@app.route('/digest-auth///')
-def digest_auth_md5(qop=None, user='user', passwd='passwd'):
+@app.route("/digest-auth///")
+def digest_auth_md5(qop=None, user="user", passwd="passwd"):
""""Prompts the user for authorization using Digest Auth.
---
tags:
@@ -988,11 +1052,11 @@ def digest_auth_md5(qop=None, user='user', passwd='passwd'):
401:
description: Unsuccessful authentication.
"""
- return digest_auth(qop, user, passwd, "MD5", 'never')
+ return digest_auth(qop, user, passwd, "MD5", "never")
-@app.route('/digest-auth////')
-def digest_auth_nostale(qop=None, user='user', passwd='passwd', algorithm='MD5'):
+@app.route("/digest-auth////")
+def digest_auth_nostale(qop=None, user="user", passwd="passwd", algorithm="MD5"):
""""Prompts the user for authorization using Digest Auth + Algorithm.
---
tags:
@@ -1021,11 +1085,13 @@ def digest_auth_nostale(qop=None, user='user', passwd='passwd', algorithm='MD5')
401:
description: Unsuccessful authentication.
"""
- return digest_auth(qop, user, passwd, algorithm, 'never')
+ return digest_auth(qop, user, passwd, algorithm, "never")
-@app.route('/digest-auth/////')
-def digest_auth(qop=None, user='user', passwd='passwd', algorithm='MD5', stale_after='never'):
+@app.route("/digest-auth/////")
+def digest_auth(
+ qop=None, user="user", passwd="passwd", algorithm="MD5", stale_after="never"
+):
""""Prompts the user for authorization using Digest Auth + Algorithm.
allow settings the stale_after argument.
---
@@ -1059,65 +1125,74 @@ def digest_auth(qop=None, user='user', passwd='passwd', algorithm='MD5', stale_a
401:
description: Unsuccessful authentication.
"""
- require_cookie_handling = (request.args.get('require-cookie', '').lower() in
- ('1', 't', 'true'))
- if algorithm not in ('MD5', 'SHA-256', 'SHA-512'):
- algorithm = 'MD5'
+ require_cookie_handling = request.args.get("require-cookie", "").lower() in (
+ "1",
+ "t",
+ "true",
+ )
+ if algorithm not in ("MD5", "SHA-256", "SHA-512"):
+ algorithm = "MD5"
- if qop not in ('auth', 'auth-int'):
+ if qop not in ("auth", "auth-int"):
qop = None
- authorization = request.headers.get('Authorization')
+ authorization = request.headers.get("Authorization")
credentials = None
if authorization:
credentials = parse_authorization_header(authorization)
- if (not authorization or
- not credentials or credentials.type.lower() != 'digest' or
- (require_cookie_handling and 'Cookie' not in request.headers)):
+ if (
+ not authorization
+ or not credentials
+ or credentials.type.lower() != "digest"
+ or (require_cookie_handling and "Cookie" not in request.headers)
+ ):
response = digest_challenge_response(app, qop, algorithm)
- response.set_cookie('stale_after', value=stale_after)
- response.set_cookie('fake', value='fake_value')
+ response.set_cookie("stale_after", value=stale_after)
+ response.set_cookie("fake", value="fake_value")
return response
- if (require_cookie_handling and
- request.cookies.get('fake') != 'fake_value'):
- response = jsonify({'errors': ['missing cookie set on challenge']})
- response.set_cookie('fake', value='fake_value')
+ if require_cookie_handling and request.cookies.get("fake") != "fake_value":
+ response = jsonify({"errors": ["missing cookie set on challenge"]})
+ response.set_cookie("fake", value="fake_value")
response.status_code = 403
return response
- current_nonce = credentials.get('nonce')
+ current_nonce = credentials.get("nonce")
stale_after_value = None
- if 'stale_after' in request.cookies:
- stale_after_value = request.cookies.get('stale_after')
+ if "stale_after" in request.cookies:
+ stale_after_value = request.cookies.get("stale_after")
- if ('last_nonce' in request.cookies and
- current_nonce == request.cookies.get('last_nonce') or
- stale_after_value == '0'):
+ if (
+ "last_nonce" in request.cookies
+ and current_nonce == request.cookies.get("last_nonce")
+ or stale_after_value == "0"
+ ):
response = digest_challenge_response(app, qop, algorithm, True)
- response.set_cookie('stale_after', value=stale_after)
- response.set_cookie('last_nonce', value=current_nonce)
- response.set_cookie('fake', value='fake_value')
+ response.set_cookie("stale_after", value=stale_after)
+ response.set_cookie("last_nonce", value=current_nonce)
+ response.set_cookie("fake", value="fake_value")
return response
if not check_digest_auth(user, passwd):
response = digest_challenge_response(app, qop, algorithm, False)
- response.set_cookie('stale_after', value=stale_after)
- response.set_cookie('last_nonce', value=current_nonce)
- response.set_cookie('fake', value='fake_value')
+ response.set_cookie("stale_after", value=stale_after)
+ response.set_cookie("last_nonce", value=current_nonce)
+ response.set_cookie("fake", value="fake_value")
return response
response = jsonify(authenticated=True, user=user)
- response.set_cookie('fake', value='fake_value')
- if stale_after_value :
- response.set_cookie('stale_after', value=next_stale_after_value(stale_after_value))
+ response.set_cookie("fake", value="fake_value")
+ if stale_after_value:
+ response.set_cookie(
+ "stale_after", value=next_stale_after_value(stale_after_value)
+ )
return response
-@app.route('/delay/', methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'TRACE'])
+@app.route("/delay/", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "TRACE"])
def delay_response(delay):
""""Returns a delayed response (max of 10 seconds).
---
@@ -1137,11 +1212,12 @@ def delay_response(delay):
time.sleep(delay)
- return jsonify(get_dict(
- 'url', 'args', 'form', 'data', 'origin', 'headers', 'files'))
+ return jsonify(
+ get_dict("url", "args", "form", "data", "origin", "headers", "files")
+ )
-@app.route('/drip')
+@app.route("/drip")
def drip():
"""Drips data over a duration after an optional initial delay.
---
@@ -1179,34 +1255,39 @@ def drip():
description: A dripped response.
"""
args = CaseInsensitiveDict(request.args.items())
- duration = float(args.get('duration', 2))
- numbytes = min(int(args.get('numbytes', 10)),(10 * 1024 * 1024)) # set 10MB limit
- code = int(args.get('code', 200))
+ duration = float(args.get("duration", 2))
+ numbytes = min(int(args.get("numbytes", 10)), (10 * 1024 * 1024)) # set 10MB limit
+ code = int(args.get("code", 200))
if numbytes <= 0:
- response = Response('number of bytes must be positive', status=400)
+ response = Response("number of bytes must be positive", status=400)
return response
- delay = float(args.get('delay', 0))
+ delay = float(args.get("delay", 0))
if delay > 0:
time.sleep(delay)
pause = duration / numbytes
+
def generate_bytes():
for i in xrange(numbytes):
yield b"*"
time.sleep(pause)
- response = Response(generate_bytes(), headers={
- "Content-Type": "application/octet-stream",
- "Content-Length": str(numbytes),
- })
+ response = Response(
+ generate_bytes(),
+ headers={
+ "Content-Type": "application/octet-stream",
+ "Content-Length": str(numbytes),
+ },
+ )
response.status_code = code
return response
-@app.route('/base64/')
+
+@app.route("/base64/")
def decode_base64(value):
""""Decodes base64url-encoded string.
---
@@ -1223,14 +1304,14 @@ def decode_base64(value):
200:
description: Decoded base64 content.
"""
- encoded = value.encode('utf-8') # base64 expects binary string as input
+ encoded = value.encode("utf-8") # base64 expects binary string as input
try:
- return base64.urlsafe_b64decode(encoded).decode('utf-8')
+ return base64.urlsafe_b64decode(encoded).decode("utf-8")
except:
return "Incorrect Base64 data try: SFRUUEJJTiBpcyBhd2Vzb21l"
-@app.route('/cache', methods=('GET',))
+@app.route("/cache", methods=("GET",))
def cache():
"""Returns a 304 if an If-Modified-Since header or If-None-Match is present. Returns the same as a GET otherwise.
---
@@ -1250,17 +1331,20 @@ def cache():
description: Modified
"""
- is_conditional = request.headers.get('If-Modified-Since') or request.headers.get('If-None-Match')
+ is_conditional = request.headers.get("If-Modified-Since") or request.headers.get(
+ "If-None-Match"
+ )
if is_conditional is None:
response = view_get()
- response.headers['Last-Modified'] = http_date()
- response.headers['ETag'] = uuid.uuid4().hex
+ response.headers["Last-Modified"] = http_date()
+ response.headers["ETag"] = uuid.uuid4().hex
return response
else:
return status_code(304)
-@app.route('/etag/', methods=('GET',))
+
+@app.route("/etag/", methods=("GET",))
def etag(etag):
"""Assumes the resource has the given etag and responds to If-None-Match and If-Match headers appropriately.
---
@@ -1280,24 +1364,25 @@ def etag(etag):
description: match
"""
- if_none_match = parse_multi_value_header(request.headers.get('If-None-Match'))
- if_match = parse_multi_value_header(request.headers.get('If-Match'))
+ if_none_match = parse_multi_value_header(request.headers.get("If-None-Match"))
+ if_match = parse_multi_value_header(request.headers.get("If-Match"))
if if_none_match:
- if etag in if_none_match or '*' in if_none_match:
+ if etag in if_none_match or "*" in if_none_match:
response = status_code(304)
- response.headers['ETag'] = etag
+ response.headers["ETag"] = etag
return response
elif if_match:
- if etag not in if_match and '*' not in if_match:
+ if etag not in if_match and "*" not in if_match:
return status_code(412)
# Special cases don't apply, return normal response
response = view_get()
- response.headers['ETag'] = etag
+ response.headers["ETag"] = etag
return response
-@app.route('/cache/')
+
+@app.route("/cache/")
def cache_control(value):
"""Sets a Cache-Control header for n seconds.
---
@@ -1314,11 +1399,11 @@ def cache_control(value):
description: Cache control set
"""
response = view_get()
- response.headers['Cache-Control'] = 'public, max-age={0}'.format(value)
+ response.headers["Cache-Control"] = "public, max-age={0}".format(value)
return response
-@app.route('/encoding/utf8')
+@app.route("/encoding/utf8")
def encoding():
"""Returns a UTF-8 encoded body.
---
@@ -1331,10 +1416,10 @@ def encoding():
description: Encoded UTF-8 content.
"""
- return render_template('UTF-8-demo.txt')
+ return render_template("UTF-8-demo.txt")
-@app.route('/bytes/')
+@app.route("/bytes/")
def random_bytes(n):
"""Returns n random bytes generated with given seed
---
@@ -1354,18 +1439,18 @@ def random_bytes(n):
n = min(n, 100 * 1024) # set 100KB limit
params = CaseInsensitiveDict(request.args.items())
- if 'seed' in params:
- random.seed(int(params['seed']))
+ if "seed" in params:
+ random.seed(int(params["seed"]))
response = make_response()
# Note: can't just use os.urandom here because it ignores the seed
response.data = bytearray(random.randint(0, 255) for i in range(n))
- response.content_type = 'application/octet-stream'
+ response.content_type = "application/octet-stream"
return response
-@app.route('/stream-bytes/')
+@app.route("/stream-bytes/")
def stream_random_bytes(n):
"""Streams n random bytes generated with given seed, at given chunk size per packet.
---
@@ -1381,14 +1466,14 @@ def stream_random_bytes(n):
200:
description: Bytes.
"""
- n = min(n, 100 * 1024) # set 100KB limit
+ n = min(n, 100 * 1024) # set 100KB limit
params = CaseInsensitiveDict(request.args.items())
- if 'seed' in params:
- random.seed(int(params['seed']))
+ if "seed" in params:
+ random.seed(int(params["seed"]))
- if 'chunk_size' in params:
- chunk_size = max(1, int(params['chunk_size']))
+ if "chunk_size" in params:
+ chunk_size = max(1, int(params["chunk_size"]))
else:
chunk_size = 10 * 1024
@@ -1398,17 +1483,18 @@ def stream_random_bytes(n):
for i in xrange(n):
chunks.append(random.randint(0, 255))
if len(chunks) == chunk_size:
- yield(bytes(chunks))
+ yield (bytes(chunks))
chunks = bytearray()
if chunks:
- yield(bytes(chunks))
+ yield (bytes(chunks))
- headers = {'Content-Type': 'application/octet-stream'}
+ headers = {"Content-Type": "application/octet-stream"}
return Response(generate_bytes(), headers=headers)
-@app.route('/range/')
+
+@app.route("/range/")
def range_request(numbytes):
"""Streams n random bytes generated with given seed, at given chunk size per packet.
---
@@ -1426,34 +1512,39 @@ def range_request(numbytes):
"""
if numbytes <= 0 or numbytes > (100 * 1024):
- response = Response(headers={
- 'ETag' : 'range%d' % numbytes,
- 'Accept-Ranges' : 'bytes'
- })
+ response = Response(
+ headers={"ETag": "range%d" % numbytes, "Accept-Ranges": "bytes"}
+ )
response.status_code = 404
- response.data = 'number of bytes must be in the range (0, 102400]'
+ response.data = "number of bytes must be in the range (0, 102400]"
return response
params = CaseInsensitiveDict(request.args.items())
- if 'chunk_size' in params:
- chunk_size = max(1, int(params['chunk_size']))
+ if "chunk_size" in params:
+ chunk_size = max(1, int(params["chunk_size"]))
else:
chunk_size = 10 * 1024
- duration = float(params.get('duration', 0))
+ duration = float(params.get("duration", 0))
pause_per_byte = duration / numbytes
request_headers = get_headers()
first_byte_pos, last_byte_pos = get_request_range(request_headers, numbytes)
- range_length = (last_byte_pos+1) - first_byte_pos
+ range_length = (last_byte_pos + 1) - first_byte_pos
- if first_byte_pos > last_byte_pos or first_byte_pos not in xrange(0, numbytes) or last_byte_pos not in xrange(0, numbytes):
- response = Response(headers={
- 'ETag' : 'range%d' % numbytes,
- 'Accept-Ranges' : 'bytes',
- 'Content-Range' : 'bytes */%d' % numbytes,
- 'Content-Length': '0',
- })
+ if (
+ first_byte_pos > last_byte_pos
+ or first_byte_pos not in xrange(0, numbytes)
+ or last_byte_pos not in xrange(0, numbytes)
+ ):
+ response = Response(
+ headers={
+ "ETag": "range%d" % numbytes,
+ "Accept-Ranges": "bytes",
+ "Content-Range": "bytes */%d" % numbytes,
+ "Content-Length": "0",
+ }
+ )
response.status_code = 416
return response
@@ -1464,23 +1555,23 @@ def range_request(numbytes):
# We don't want the resource to change across requests, so we need
# to use a predictable data generation function
- chunks.append(ord('a') + (i % 26))
+ chunks.append(ord("a") + (i % 26))
if len(chunks) == chunk_size:
- yield(bytes(chunks))
+ yield (bytes(chunks))
time.sleep(pause_per_byte * chunk_size)
chunks = bytearray()
if chunks:
time.sleep(pause_per_byte * len(chunks))
- yield(bytes(chunks))
+ yield (bytes(chunks))
- content_range = 'bytes %d-%d/%d' % (first_byte_pos, last_byte_pos, numbytes)
+ content_range = "bytes %d-%d/%d" % (first_byte_pos, last_byte_pos, numbytes)
response_headers = {
- 'Content-Type': 'application/octet-stream',
- 'ETag' : 'range%d' % numbytes,
- 'Accept-Ranges' : 'bytes',
- 'Content-Length': str(range_length),
- 'Content-Range' : content_range
+ "Content-Type": "application/octet-stream",
+ "ETag": "range%d" % numbytes,
+ "Accept-Ranges": "bytes",
+ "Content-Length": str(range_length),
+ "Content-Range": content_range,
}
response = Response(generate_bytes(), headers=response_headers)
@@ -1492,7 +1583,8 @@ def range_request(numbytes):
return response
-@app.route('/links//')
+
+@app.route("/links//")
def link_page(n, offset):
"""Generate a page containing n links to other pages which do the same.
---
@@ -1511,28 +1603,28 @@ def link_page(n, offset):
200:
description: HTML links.
"""
- n = min(max(1, n), 200) # limit to between 1 and 200 links
+ n = min(max(1, n), 200) # limit to between 1 and 200 links
link = "{1} "
- html = ['Links']
+ html = ["Links"]
for i in xrange(n):
if i == offset:
html.append("{0} ".format(i))
else:
- html.append(link.format(url_for('link_page', n=n, offset=i), i))
- html.append('')
+ html.append(link.format(url_for("link_page", n=n, offset=i), i))
+ html.append("")
- return ''.join(html)
+ return "".join(html)
-@app.route('/links/')
+@app.route("/links/")
def links(n):
"""Redirect to first links page."""
- return redirect(url_for('link_page', n=n, offset=0))
+ return redirect(url_for("link_page", n=n, offset=0))
-@app.route('/image')
+@app.route("/image")
def image():
"""Returns a simple image of the type suggest by the Accept header.
---
@@ -1550,24 +1642,24 @@ def image():
"""
headers = get_headers()
- if 'accept' not in headers:
- return image_png() # Default media type to png
+ if "accept" not in headers:
+ return image_png() # Default media type to png
- accept = headers['accept'].lower()
+ accept = headers["accept"].lower()
- if 'image/webp' in accept:
+ if "image/webp" in accept:
return image_webp()
- elif 'image/svg+xml' in accept:
+ elif "image/svg+xml" in accept:
return image_svg()
- elif 'image/jpeg' in accept:
+ elif "image/jpeg" in accept:
return image_jpeg()
- elif 'image/png' in accept or 'image/*' in accept:
+ elif "image/png" in accept or "image/*" in accept:
return image_png()
else:
- return status_code(406) # Unsupported media type
+ return status_code(406) # Unsupported media type
-@app.route('/image/png')
+@app.route("/image/png")
def image_png():
"""Returns a simple PNG image.
---
@@ -1579,11 +1671,11 @@ def image_png():
200:
description: A PNG image.
"""
- data = resource('images/pig_icon.png')
- return Response(data, headers={'Content-Type': 'image/png'})
+ data = resource("images/pig_icon.png")
+ return Response(data, headers={"Content-Type": "image/png"})
-@app.route('/image/jpeg')
+@app.route("/image/jpeg")
def image_jpeg():
"""Returns a simple JPEG image.
---
@@ -1595,11 +1687,11 @@ def image_jpeg():
200:
description: A JPEG image.
"""
- data = resource('images/jackal.jpg')
- return Response(data, headers={'Content-Type': 'image/jpeg'})
+ data = resource("images/jackal.jpg")
+ return Response(data, headers={"Content-Type": "image/jpeg"})
-@app.route('/image/webp')
+@app.route("/image/webp")
def image_webp():
"""Returns a simple WEBP image.
---
@@ -1611,11 +1703,11 @@ def image_webp():
200:
description: A WEBP image.
"""
- data = resource('images/wolf_1.webp')
- return Response(data, headers={'Content-Type': 'image/webp'})
+ data = resource("images/wolf_1.webp")
+ return Response(data, headers={"Content-Type": "image/webp"})
-@app.route('/image/svg')
+@app.route("/image/svg")
def image_svg():
"""Returns a simple SVG image.
---
@@ -1627,15 +1719,13 @@ def image_svg():
200:
description: An SVG image.
"""
- data = resource('images/svg_logo.svg')
- return Response(data, headers={'Content-Type': 'image/svg+xml'})
+ data = resource("images/svg_logo.svg")
+ return Response(data, headers={"Content-Type": "image/svg+xml"})
def resource(filename):
- path = os.path.join(
- tmpl_dir,
- filename)
- return open(path, 'rb').read()
+ path = os.path.join(tmpl_dir, filename)
+ return open(path, "rb").read()
@app.route("/xml")
@@ -1669,24 +1759,25 @@ def a_json_endpoint():
"""
return flask_jsonify(
slideshow={
- 'title': 'Sample Slide Show',
- 'date': 'date of publication',
- 'author': 'Yours Truly',
- 'slides': [
- {'type': 'all',
- 'title': 'Wake up to WonderWidgets!'},
- {'type': 'all',
- 'title': 'Overview',
- 'items': [
- 'Why WonderWidgets are great',
- 'Who buys WonderWidgets'
- ]}
- ]
+ "title": "Sample Slide Show",
+ "date": "date of publication",
+ "author": "Yours Truly",
+ "slides": [
+ {"type": "all", "title": "Wake up to WonderWidgets!"},
+ {
+ "type": "all",
+ "title": "Overview",
+ "items": [
+ "Why WonderWidgets are great",
+ "Who buys WonderWidgets",
+ ],
+ },
+ ],
}
)
-if __name__ == '__main__':
+if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--port", type=int, default=5000)
parser.add_argument("--host", default="127.0.0.1")