mirror of
https://github.com/kennethreitz/responder.git
synced 2026-06-21 15:00:57 +00:00
3c2e42e2be
- Trailing-slash redirects (307, preserving method and query string) on by default; disable with API(redirect_slashes=False) - API(max_request_size=...) returns 413 for oversized bodies — fast Content-Length check plus cumulative enforcement for chunked uploads - API(auto_etag=True) adds content-hash ETags to GET responses with full 304 handling; explicit resp.etag wins - resp.background(func, *args) defers work until after the response is sent (sync via threadpool, async on the loop, ordered) - resp.cache_control(...) helper for Cache-Control headers Fixed: a 413 raised while reading the body during request_model validation is no longer swallowed into a 422. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
264 lines
6.6 KiB
Python
264 lines
6.6 KiB
Python
"""Tests for trailing-slash redirects, request size limits, auto-ETag,
|
|
after-response background tasks, and the Cache-Control helper."""
|
|
|
|
import threading
|
|
import time
|
|
|
|
import responder
|
|
|
|
# --- trailing-slash redirects ---
|
|
|
|
|
|
def test_trailing_slash_redirects_to_route(api):
|
|
@api.route("/users")
|
|
def users(req, resp):
|
|
resp.media = ["alice"]
|
|
|
|
r = api.requests.get("/users/", follow_redirects=False)
|
|
assert r.status_code == 307
|
|
assert r.headers["Location"] == "/users"
|
|
|
|
# And the client lands on the route when following.
|
|
r = api.requests.get("/users/")
|
|
assert r.json() == ["alice"]
|
|
|
|
|
|
def test_missing_slash_redirects_too(api):
|
|
@api.route("/admin/")
|
|
def admin(req, resp):
|
|
resp.text = "admin"
|
|
|
|
r = api.requests.get("/admin", follow_redirects=False)
|
|
assert r.status_code == 307
|
|
assert r.headers["Location"] == "/admin/"
|
|
|
|
|
|
def test_redirect_preserves_query_string(api):
|
|
@api.route("/search")
|
|
def search(req, resp):
|
|
resp.media = {"q": req.params.get("q")}
|
|
|
|
r = api.requests.get("/search/?q=hello", follow_redirects=False)
|
|
assert r.status_code == 307
|
|
assert r.headers["Location"] == "/search?q=hello"
|
|
|
|
r = api.requests.get("/search/?q=hello")
|
|
assert r.json() == {"q": "hello"}
|
|
|
|
|
|
def test_redirect_slashes_disabled():
|
|
api = responder.API(redirect_slashes=False, allowed_hosts=[";"])
|
|
|
|
@api.route("/users")
|
|
def users(req, resp):
|
|
resp.media = []
|
|
|
|
assert api.requests.get("/users/", follow_redirects=False).status_code == 404
|
|
|
|
|
|
def test_no_redirect_for_truly_unknown_path(api):
|
|
@api.route("/known")
|
|
def known(req, resp):
|
|
resp.text = "ok"
|
|
|
|
assert api.requests.get("/unknown/", follow_redirects=False).status_code == 404
|
|
|
|
|
|
# --- request size limits ---
|
|
|
|
|
|
def test_max_request_size_rejects_large_bodies():
|
|
api = responder.API(max_request_size=100, allowed_hosts=[";"])
|
|
|
|
@api.route("/upload", methods=["POST"])
|
|
async def upload(req, resp):
|
|
body = await req.content
|
|
resp.media = {"size": len(body)}
|
|
|
|
r = api.requests.post("/upload", content=b"x" * 50)
|
|
assert r.status_code == 200
|
|
assert r.json() == {"size": 50}
|
|
|
|
r = api.requests.post("/upload", content=b"x" * 200)
|
|
assert r.status_code == 413
|
|
|
|
|
|
def test_max_request_size_enforced_on_stream():
|
|
api = responder.API(max_request_size=100, allowed_hosts=[";"])
|
|
|
|
@api.route("/upload", methods=["POST"])
|
|
async def upload(req, resp):
|
|
total = 0
|
|
async for chunk in req.stream():
|
|
total += len(chunk)
|
|
resp.media = {"size": total}
|
|
|
|
r = api.requests.post("/upload", content=b"x" * 500)
|
|
assert r.status_code == 413
|
|
|
|
|
|
def test_max_request_size_413_negotiates_json():
|
|
api = responder.API(max_request_size=10, allowed_hosts=[";"])
|
|
|
|
@api.route("/upload", methods=["POST"])
|
|
async def upload(req, resp):
|
|
await req.content
|
|
resp.text = "ok"
|
|
|
|
r = api.requests.post(
|
|
"/upload", content=b"x" * 100, headers={"Accept": "application/json"}
|
|
)
|
|
assert r.status_code == 413
|
|
assert r.json() == {"error": "Request body too large"}
|
|
|
|
|
|
def test_max_request_size_beats_request_model_validation():
|
|
from pydantic import BaseModel
|
|
|
|
class Item(BaseModel):
|
|
name: str
|
|
|
|
api = responder.API(max_request_size=10, allowed_hosts=[";"])
|
|
|
|
@api.route("/items", methods=["POST"], request_model=Item)
|
|
async def create(req, resp):
|
|
resp.text = "ok"
|
|
|
|
r = api.requests.post("/items", json={"name": "x" * 100})
|
|
assert r.status_code == 413 # not a 422
|
|
|
|
|
|
def test_unlimited_by_default(api):
|
|
@api.route("/upload", methods=["POST"])
|
|
async def upload(req, resp):
|
|
resp.media = {"size": len(await req.content)}
|
|
|
|
r = api.requests.post("/upload", content=b"x" * 100_000)
|
|
assert r.json() == {"size": 100_000}
|
|
|
|
|
|
# --- auto-ETag ---
|
|
|
|
|
|
def test_auto_etag_round_trip():
|
|
api = responder.API(auto_etag=True, allowed_hosts=[";"])
|
|
|
|
@api.route("/data")
|
|
def data(req, resp):
|
|
resp.media = {"stable": "content"}
|
|
|
|
r = api.requests.get("/data")
|
|
assert r.status_code == 200
|
|
etag = r.headers["ETag"]
|
|
assert etag
|
|
|
|
r = api.requests.get("/data", headers={"If-None-Match": etag})
|
|
assert r.status_code == 304
|
|
assert r.text == ""
|
|
|
|
# Different content produces a different tag.
|
|
@api.route("/other")
|
|
def other(req, resp):
|
|
resp.media = {"other": "content"}
|
|
|
|
assert api.requests.get("/other").headers["ETag"] != etag
|
|
|
|
|
|
def test_auto_etag_skips_post():
|
|
api = responder.API(auto_etag=True, allowed_hosts=[";"])
|
|
|
|
@api.route("/submit", methods=["POST"])
|
|
def submit(req, resp):
|
|
resp.text = "created"
|
|
|
|
r = api.requests.post("/submit")
|
|
assert "ETag" not in r.headers
|
|
|
|
|
|
def test_explicit_etag_wins_over_auto():
|
|
api = responder.API(auto_etag=True, allowed_hosts=[";"])
|
|
|
|
@api.route("/doc")
|
|
def doc(req, resp):
|
|
resp.etag = "manual"
|
|
resp.text = "body"
|
|
|
|
assert api.requests.get("/doc").headers["ETag"] == '"manual"'
|
|
|
|
|
|
# --- after-response background tasks ---
|
|
|
|
|
|
def test_background_task_runs_after_response(api):
|
|
ran = threading.Event()
|
|
|
|
def task(value):
|
|
ran.set()
|
|
assert value == 42
|
|
|
|
@api.route("/")
|
|
def view(req, resp):
|
|
resp.media = {"ok": True}
|
|
resp.background(task, 42)
|
|
|
|
r = api.requests.get("/")
|
|
assert r.json() == {"ok": True}
|
|
assert ran.wait(timeout=2)
|
|
|
|
|
|
def test_async_background_task(api):
|
|
results = []
|
|
|
|
async def task():
|
|
results.append("done")
|
|
|
|
@api.route("/")
|
|
def view(req, resp):
|
|
resp.text = "ok"
|
|
resp.background(task)
|
|
|
|
api.requests.get("/")
|
|
deadline = time.time() + 2
|
|
while not results and time.time() < deadline:
|
|
time.sleep(0.01)
|
|
assert results == ["done"]
|
|
|
|
|
|
def test_multiple_background_tasks_in_order(api):
|
|
order = []
|
|
|
|
@api.route("/")
|
|
def view(req, resp):
|
|
resp.text = "ok"
|
|
resp.background(order.append, 1)
|
|
resp.background(order.append, 2)
|
|
|
|
api.requests.get("/")
|
|
deadline = time.time() + 2
|
|
while len(order) < 2 and time.time() < deadline:
|
|
time.sleep(0.01)
|
|
assert order == [1, 2]
|
|
|
|
|
|
# --- Cache-Control helper ---
|
|
|
|
|
|
def test_cache_control_directives(api):
|
|
@api.route("/cached")
|
|
def cached(req, resp):
|
|
resp.cache_control(public=True, max_age=3600)
|
|
resp.text = "ok"
|
|
|
|
r = api.requests.get("/cached")
|
|
assert r.headers["Cache-Control"] == "public, max-age=3600"
|
|
|
|
|
|
def test_cache_control_no_store(api):
|
|
@api.route("/private")
|
|
def private(req, resp):
|
|
resp.cache_control(no_store=True, private=True)
|
|
resp.text = "ok"
|
|
|
|
r = api.requests.get("/private")
|
|
assert r.headers["Cache-Control"] == "no-store, private"
|