Compare commits

...

25 Commits

Author SHA1 Message Date
kennethreitz 1c5dc1dfbb Replace HTTP/2 server push with dependency injection in backlog
HTTP/2 server push was removed from the spec and dropped by browsers.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 16:02:15 -04:00
kennethreitz cb4bc295b8 Docs: second improvement pass
- tutorial-auth: fix deprecated datetime.utcnow() → datetime.now(timezone.utc),
  add role-based access control section, add auth strategy comparison
- tutorial-websockets: use WebSocketDisconnect instead of bare Exception,
  add connection lifecycle section, add rejected connection test example
- tutorial-sqlalchemy: modernize to mapped_column() / Mapped[] (SQLAlchemy 2.0)
- deployment: use uv in Docker example, fix stale uv.lock reference
- quickstart: link to all tutorials in "next steps"
- sandbox: rewrite with project layout, mypy, and pattern-matching test examples

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 15:58:48 -04:00
kennethreitz 536428a787 Remove Unreleased section from changelog
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 15:54:13 -04:00
kennethreitz 3d5f3c7e93 Improve documentation across the board (#602)
## Summary

- **Deployment guide**: health check endpoint, Docker Compose example,
Caddy reverse proxy config, Procfile pattern, production checklist
- **API reference**: quick usage examples for every class (API, Request,
Response, RouteGroup, BackgroundQueue, RateLimiter, status helpers)
- **Feature tour**: new Pydantic validation and content negotiation
sections, expanded MessagePack docs
- **Testing guide**: rate limiting and mounted WSGI app test examples,
Werkzeug 3.1.7 tip
- **Middleware tutorial**: pure ASGI middleware example (no
BaseHTTPMiddleware dependency)
- **CLI guide**: environment variables section (PORT, SECRET_KEY)
- **Homepage**: updated feature list with SSE, rate limiting, Pydantic,
content negotiation, route groups
- **Backlog**: removed already-implemented items, added current ideas

+362 lines of docs, no code changes.

## Test plan
- [x] `make html` builds cleanly with no warnings
- [x] All 199 tests pass

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Signed-off-by: Kenneth Reitz <me@kennethreitz.org>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-authored-by: Andreas Motl <andreas.motl@panodata.org>
2026-03-24 15:51:55 -04:00
kennethreitz e0cce231ea Update CLAUDE.md to reflect lock file status
Signed-off-by: Kenneth Reitz <me@kennethreitz.org>
2026-03-24 15:42:14 -04:00
Andreas Motl 44c33475b2 Remove dependency lock file uv.lock. This is a library. (#601)
## About
We think using an `uv.lock` file is only applicable for applications,
and otherwise gives wrong impressions and expectations.

## References
- https://discuss.python.org/t/the-purpose-of-a-lock-file/38756
- https://github.com/orgs/python-poetry/discussions/7847
2026-03-24 15:35:41 -04:00
kennethreitz f4a292108b Move version below tagline in docs sidebar
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 15:34:49 -04:00
kennethreitz 25ea333ad4 Prefix version with v in docs sidebar
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 15:33:12 -04:00
kennethreitz 6279835040 Show version number in docs sidebar
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 15:32:48 -04:00
kennethreitz 0e493ad8d1 Add CLAUDE.md and /release command
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 15:31:42 -04:00
kennethreitz ce3ab46d59 Bump version to 3.5.0 and update changelog
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 15:27:36 -04:00
kennethreitz 6f9c87d71c Fix broad exception handling and future.result() call
- Call future.result() instead of bare property access in test (#596)
- Catch (ValueError, TypeError) instead of broad Exception in
  response model serialization (#597)
- Catch WebSocketDisconnect instead of broad Exception in
  websocket chat example (#598)

Closes #596, closes #597, closes #598

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 15:25:12 -04:00
kennethreitz 29d0621d98 Replace deprecated asyncio.iscoroutinefunction with inspect.iscoroutinefunction
Closes #599

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 15:23:24 -04:00
kennethreitz 30fa2dfda7 Add uv.lock
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 15:22:06 -04:00
kennethreitz 43c803a426 Restore print statements in lifespan example
These serve as illustrative markers for users reading the example.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 15:21:21 -04:00
Andreas Motl ff6d530338 Chore: Code formatting (#594)
## About
A few cosmetic adjustments aka. code formatting.
Also validate the outcome on CI/GHA.
Feel free to improve now or later at your disposal.

## Details
The updates are based on using the most recent versions of pyproject-fmt
and ruff.
Specifically, spots marked with `noqa` might need further love, also at
your disposal.

---------

Co-authored-by: Kenneth Reitz <me@kennethreitz.org>
2026-03-24 15:21:04 -04:00
kennethreitz a375984310 Fix WSGI mount returning 400 at mount root (#600)
## Summary
- When a WSGI app (e.g. Flask) is mounted at `/prefix` and a request
hits exactly `/prefix`, the path prefix was stripped to `""` instead of
`"/"`, causing frameworks like Flask to return 400.
- One-character fix: default the stripped path to `"/"` when empty.

## Test plan
- [x] `tests/test_responder.py::test_mount_wsgi_app` now passes

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 15:16:29 -04:00
Andreas Motl 46c6f440c5 Chore: Remove configuration for Read the Docs (#595)
https://responder.kennethreitz.org/ is here to stay.
This patch concludes the decision in GH-564.
2026-03-23 20:19:15 -04:00
Andreas Motl c87e8c876d CI: Validate on Python 3.14 vanilla, free-threaded, and PyPy (#593) 2026-03-23 18:28:27 -04:00
kennethreitz f86c7eed70 Fix RST title underline warning breaking docs CI
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-22 22:10:51 -04:00
kennethreitz 9d492a383c Add marimo notebook mounting docs and example
- Document mounting marimo ASGI apps in the feature tour
- Add examples/marimo_mount.py showing the integration
- Verified working: marimo.create_asgi_app() mounts cleanly via api.mount()

Fixes #580.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-22 22:09:01 -04:00
kennethreitz 77ae49aaef Improve GraphQL API interface, expand tests, drop 3.9 from CI
- Extract variables and operationName from query params and form data,
  not just JSON bodies. Fixes #571.
- Add docstrings to GraphQLView class and methods. Fixes #572.
- Add 10 new GraphQL tests: variables, operationName, mutations,
  context access, malformed queries, raw text, invalid variables
  param. Fixes #568.
- Remove Python 3.9 from CI matrix (dropped in 3.4.0).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-22 22:06:00 -04:00
kennethreitz 74c872ed57 Add type annotations to routes.py
Add comprehensive type hints to compile_path, BaseRoute, Route,
WebSocketRoute, and Router classes. Uses Starlette's Scope, Receive,
Send types and properly types the ASGI/WSGI union in Router.apps.
Fixes #566.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-22 21:47:21 -04:00
kennethreitz 724b769c9e Fix OpenAPI template packaging, add static file tests
- Include ext/openapi/docs/*.html in package_data so OpenAPI docs
  themes (swagger_ui, redoc, rapidoc, elements) ship with the wheel.
  Fixes #583.
- Add tests for static file serving and index.html fallback. Fixes #563.
- Bump version to 3.4.1.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-22 21:44:02 -04:00
kennethreitz 4f02016ed6 Add comprehensive docstrings, expand API reference, upgrade to Starlette 1.0
- Add docstrings to all undocumented public methods across API, Request,
  Response, Router, Route, BackgroundQueue, and related classes
- Expand api.rst with autodoc sections for RouteGroup, BackgroundQueue,
  QueryDict, and RateLimiter
- Update starlette dependency to >=1.0
- Drop Python 3.9 support (required by Starlette 1.0), minimum is now 3.10
- Bump version to 3.4.0

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-22 21:37:36 -04:00
37 changed files with 1416 additions and 261 deletions
+42
View File
@@ -0,0 +1,42 @@
Release a new version of responder to PyPI and GitHub.
Usage: /release <version> (e.g. /release 3.6.0)
If no version is provided, ask the user what version to release.
## Steps
1. **Verify clean state**: Run `git status` and ensure the working tree is clean. If not, stop and ask the user.
2. **Run tests**: Run `uv run pytest -x --no-header -q`. If any fail, stop and report.
3. **Bump version**: Update `responder/__version__.py` to the new version.
4. **Update changelog**:
- Run `git log --oneline $(git describe --tags --abbrev=0)..HEAD` to get commits since last release.
- Add a new section in `CHANGELOG.md` under `## [Unreleased]` with the date, categorized into Added/Changed/Fixed/Removed.
- Update the compare links at the bottom of the file.
5. **Lock deps**: Run `uv lock`.
6. **Commit**: Stage `responder/__version__.py`, `CHANGELOG.md`, and `uv.lock`. Commit with message `Bump version to X.Y.Z and update changelog`.
7. **Push and tag**:
```
git push
git tag vX.Y.Z
git push origin vX.Y.Z
```
8. **GitHub release**: Create a release with `gh release create` including highlights and a link to the full changelog.
9. **Build and publish**:
```
uv build
uvx twine upload dist/responder-X.Y.Z*
```
Note: This requires a PyPI token. If twine fails due to auth, tell the user to set `TWINE_USERNAME=__token__` and `TWINE_PASSWORD` and re-run, or run `! uvx twine upload dist/responder-X.Y.Z*` interactively.
10. **Update GitHub release**: Edit the release to add a link to the PyPI page: `https://pypi.org/project/responder/X.Y.Z/`
11. **Report**: Print a summary with links to the GitHub release and PyPI page.
+3 -1
View File
@@ -20,11 +20,13 @@ jobs:
fail-fast: false
matrix:
python-version: [
"3.9",
"3.10",
"3.11",
"3.12",
"3.13",
"3.14",
"3.14t",
"pypy3.11",
]
env:
UV_SYSTEM_PYTHON: true
+1
View File
@@ -8,6 +8,7 @@
.DS_Store
coverage.xml
.coverage*
*.lock
__pycache__
tests/__pycache__
-33
View File
@@ -1,33 +0,0 @@
# .readthedocs.yml
# Read the Docs configuration file
# Details
# - https://docs.readthedocs.io/en/stable/config-file/v2.html
# Required
version: 2
build:
os: "ubuntu-24.04"
tools:
python: "3.12"
python:
install:
- method: pip
path: .
extra_requirements:
- docs
sphinx:
configuration: docs/source/conf.py
# Use standard HTML builder.
builder: html
# Fail on all warnings to avoid broken references.
fail_on_warning: true
# Optionally build your docs in additional formats such as PDF
#formats:
# - pdf
+53 -2
View File
@@ -5,7 +5,55 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and
this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
## [v3.5.0] - 2026-03-24
### Added
- CI validation for Python 3.14, 3.14 free-threaded, and PyPy 3.11
- Marimo notebook mounting docs and example
- Type annotations for `routes.py`
### Changed
- Replaced deprecated `asyncio.iscoroutinefunction` with `inspect.iscoroutinefunction` ahead of Python 3.16 removal
- Narrowed broad `except Exception` to specific exceptions in response model serialization and websocket chat example
- Improved GraphQL API interface with expanded test coverage
- Code formatting cleanup via pyproject-fmt and ruff
- Dropped Python 3.9 from CI
### Fixed
- WSGI mount returning 400 when requesting the exact mount root path
- Werkzeug 3.1.7 compatibility for trusted host validation in tests
- `future.result` bare property access in background task test (now properly calls `future.result()`)
- OpenAPI template packaging and static file serving
- RST title underline warning breaking docs CI
### Removed
- Read the Docs configuration (docs hosted on GitHub Pages)
## [v3.4.0] - 2026-03-22
### Changed
- Upgraded to Starlette 1.0
- Added comprehensive docstrings across the codebase
- Expanded API reference documentation
## [v3.3.0] - 2026-03-22
### Added
- Full documentation rewrite: tutorials for REST APIs, SQLAlchemy, Flask migration
- Auth, WebSocket, middleware, and configuration guides
- Testing docs with prose, examples, and tips
- GitHub Pages deployment for docs
### Changed
- Reworked homepage prose
- Rewrote CLI and API reference docs
## [v3.2.0] - 2026-03-22
@@ -411,7 +459,10 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Conception!
[unreleased]: https://github.com/kennethreitz/responder/compare/v3.0.0..HEAD
[v3.5.0]: https://github.com/kennethreitz/responder/compare/v3.4.0..v3.5.0
[v3.4.0]: https://github.com/kennethreitz/responder/compare/v3.3.0..v3.4.0
[v3.3.0]: https://github.com/kennethreitz/responder/compare/v3.2.0..v3.3.0
[v3.2.0]: https://github.com/kennethreitz/responder/compare/v3.0.0..v3.2.0
[v3.0.0]: https://github.com/kennethreitz/responder/compare/v2.0.5..v3.0.0
[v2.0.5]: https://github.com/kennethreitz/responder/compare/v2.0.4..v2.0.5
[v2.0.4]: https://github.com/kennethreitz/responder/compare/v2.0.3..v2.0.4
+44
View File
@@ -0,0 +1,44 @@
# Responder
A familiar HTTP Service Framework for Python, by Kenneth Reitz.
## Commands
- **Tests**: `uv run pytest` (runs full suite with coverage)
- **Single test**: `uv run pytest tests/test_responder.py::test_name -xvs`
- **Lint**: `uv run ruff check .`
- **Type check**: `uv run mypy`
- **Build docs**: `cd docs && uv run make html`
- **Build package**: `uv build`
- **Lock deps**: `uv lock`
## Architecture
- `responder/api.py` — Main `API` class, the entry point for all apps
- `responder/routes.py``Router`, `Route`, `WebSocketRoute` dispatch
- `responder/models.py``Request` and `Response` wrappers around Starlette
- `responder/ext/` — Extensions: CLI, GraphQL, OpenAPI, rate limiting
- `responder/background.py` — Background task queue
- `responder/formats.py` — Content negotiation (JSON, YAML, msgpack)
- `responder/__version__.py` — Single source of truth for version string
## Conventions
- Python 3.10+ only. Use `from __future__ import annotations` where present.
- Use `inspect.iscoroutinefunction` (not `asyncio.iscoroutinefunction`).
- Tests use `api.requests` (Starlette TestClient) with `allowed_hosts=[";"]` or `["localhost"]`.
- Werkzeug 3.1.7+ rejects invalid Host headers — use `localhost` when mounting WSGI apps in tests.
- Version is in `responder/__version__.py`, bump it there.
- Changelog follows [Keep a Changelog](https://keepachangelog.com/) format in `CHANGELOG.md`.
- Compare links at the bottom of CHANGELOG.md must be updated when adding a release.
- All deps managed via `uv`. Lock file (`uv.lock`) is not committed.
## Release Process
1. Bump version in `responder/__version__.py`
2. Add changelog entry in `CHANGELOG.md` (update compare links too)
3. `uv lock` to refresh the lock file
4. Commit: `Bump version to X.Y.Z and update changelog`
5. `git tag vX.Y.Z && git push && git push origin vX.Y.Z`
6. `gh release create vX.Y.Z --title "vX.Y.Z" --notes "..."`
7. `uv build && uvx twine upload dist/responder-X.Y.Z*`
+1 -1
View File
@@ -17,7 +17,7 @@ if __name__ == "__main__":
$ pip install responder
That's it. Supports Python 3.9+.
That's it. Supports Python 3.10+.
## The Basics
+2
View File
@@ -5,6 +5,8 @@
</p>
<p>
<strong>Responder</strong> — a familiar HTTP service framework for Python.
<br />
<small>v{{ version }}</small>
</p>
<h3>Useful Links</h3>
<ul>
+123 -1
View File
@@ -12,6 +12,20 @@ The central object of every Responder application. It holds your routes,
middleware, templates, and configuration. Create one at the top of your
module and use it to define your entire web service.
Quick example::
import responder
api = responder.API(
title="My Service", # OpenAPI title
version="1.0", # OpenAPI version
openapi="3.0.2", # enable OpenAPI
docs_route="/docs", # Swagger UI at /docs
cors=True, # enable CORS
secret_key="change-me", # session signing key
allowed_hosts=["example.com"],
)
.. module:: responder
.. autoclass:: API
@@ -28,6 +42,27 @@ parameters, the request body, cookies, and more.
Most properties are synchronous, but reading the body requires ``await``
because it involves I/O.
Common patterns::
# Headers (case-insensitive)
token = req.headers.get("Authorization")
# Query parameters: /search?q=python&page=2
query = req.params["q"]
# JSON body
data = await req.media()
# Form data
form = await req.media("form")
# File uploads
files = await req.media("files")
# Client info
ip, port = req.client
is_https = req.is_secure
.. autoclass:: Request
:inherited-members:
@@ -39,15 +74,102 @@ The response object is passed into every view as the second argument.
Mutate it to control what gets sent back to the client — the body,
status code, headers, and cookies.
Common patterns::
resp.text = "plain text" # text/plain
resp.html = "<h1>Hello</h1>" # text/html
resp.media = {"key": "value"} # application/json
resp.content = b"raw bytes" # application/octet-stream
resp.file("path/to/file.pdf") # auto content-type
resp.stream_file("large/export.csv") # streamed
resp.status_code = 201
resp.headers["X-Custom"] = "value"
resp.cookies["session"] = "abc123"
.. autoclass:: Response
:inherited-members:
Route Groups
------------
Group related routes under a shared URL prefix — useful for API versioning
and organizing large applications::
v1 = api.group("/v1")
@v1.route("/users")
def list_users(req, resp):
resp.media = []
.. autoclass:: responder.api.RouteGroup
:members:
Background Queue
----------------
Run tasks in background threads without blocking the response. Available
as ``api.background``::
@api.route("/submit")
async def submit(req, resp):
data = await req.media()
@api.background.task
def process(data):
# runs in a thread pool
...
process(data)
resp.media = {"status": "accepted"}
.. autoclass:: responder.background.BackgroundQueue
:members:
Query Dict
----------
A dictionary subclass for query string parameters with multi-value support.
Behaves like a normal dict for single values, but supports ``getlist()``
for parameters that appear multiple times (e.g. ``?tag=a&tag=b``).
.. autoclass:: responder.models.QueryDict
:members:
Rate Limiter
------------
In-memory token bucket rate limiter. Limits requests per client IP address
and returns ``429 Too Many Requests`` when exceeded::
from responder.ext.ratelimit import RateLimiter
limiter = RateLimiter(requests=100, period=60) # 100 req/min
limiter.install(api)
Response headers: ``X-RateLimit-Limit``, ``X-RateLimit-Remaining``,
and ``Retry-After`` (when limited).
.. autoclass:: responder.ext.ratelimit.RateLimiter
:members:
Status Code Helpers
-------------------
Convenience functions for checking which category a status code falls
into. Useful in middleware and after-request hooks.
into. Useful in middleware and after-request hooks::
from responder.status_codes import is_200, is_400, is_500
@api.after_request()
def log_errors(req, resp):
if is_400(resp.status_code) or is_500(resp.status_code):
print(f"Error: {req.method} {req.url.path} -> {resp.status_code}")
.. autofunction:: responder.status_codes.is_100
+5 -4
View File
@@ -1,7 +1,8 @@
# Backlog
## Future Ideas
- Consider adding `after_request` hooks (complement to `before_request`)
- Explore WebSocket before_request short-circuit support
- Add rate limiting middleware
- Consider async template rendering by default
- WebSocket before_request short-circuit support (reject before accept)
- Per-route rate limiting (different limits for different endpoints)
- Built-in structured logging with request context
- OpenAPI 3.1 support
- Dependency injection for route handlers
+19
View File
@@ -71,6 +71,25 @@ For URLs, use a fragment::
$ responder run https://example.com/app.py#service
Environment Variables
---------------------
Responder automatically reads the ``PORT`` environment variable at
runtime:
- ``PORT`` — bind to ``0.0.0.0`` on this port (cloud platform convention)
When ``PORT`` is set, the server binds to all interfaces automatically.
This is how cloud platforms like Fly.io, Railway, and Heroku inject the
listen port.
For other settings like ``SECRET_KEY``, read them in your application
code and pass them to ``responder.API()``::
import os
api = responder.API(secret_key=os.environ["SECRET_KEY"])
Building Frontend Assets
-------------------------
+88 -4
View File
@@ -30,8 +30,9 @@ Here's a minimal Dockerfile::
FROM python:3.13-slim
WORKDIR /app
COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv
COPY . .
RUN pip install responder
RUN uv pip install --system responder
ENV PORT=80
EXPOSE 80
CMD ["python", "api.py"]
@@ -42,9 +43,10 @@ Build and run::
$ docker run -p 8000:80 myapi
The ``python:3.13-slim`` image is about 150MB — small enough for fast
deploys but includes everything you need. For even smaller images, you
can use ``python:3.13-alpine``, though some packages may need extra
build dependencies.
deploys but includes everything you need. Using ``uv`` for installs
is significantly faster than pip. For even smaller images, you can use
``python:3.13-alpine``, though some packages may need extra build
dependencies.
Cloud Platforms
@@ -67,6 +69,27 @@ The pattern is always the same: deploy your code, set the start command
to ``python api.py``, and the platform handles the rest.
Health Check Endpoint
---------------------
Every production deployment needs a health check — a lightweight endpoint
that monitoring tools, load balancers, and orchestrators can poll to verify
your service is running::
@api.route("/health")
def health(req, resp):
resp.media = {"status": "healthy"}
Keep it simple. Don't query the database or do expensive work — the health
check should return instantly. Cloud platforms, Docker, and Kubernetes all
look for an HTTP 200 to confirm your service is alive.
For Docker, add a ``HEALTHCHECK`` instruction::
HEALTHCHECK --interval=30s --timeout=3s \
CMD curl -f http://localhost/health || exit 1
Uvicorn Directly
----------------
@@ -82,6 +105,45 @@ Uvicorn supports many options — SSL certificates, access logging, graceful
shutdown timeouts, and more. See the
`uvicorn documentation <https://www.uvicorn.org/deployment/>`_ for details.
For platforms like Heroku or Railway that use a ``Procfile``::
web: uvicorn api:api --host 0.0.0.0 --port $PORT --workers 4
Docker Compose
--------------
For local development with databases and other services, Docker Compose
ties everything together::
# docker-compose.yml
services:
api:
build: .
ports:
- "5042:80"
environment:
- PORT=80
- DATABASE_URL=postgresql+asyncpg://user:pass@db/myapp
- SECRET_KEY=dev-secret
depends_on:
- db
db:
image: docker.io/postgres:16-alpine
environment:
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
POSTGRES_DB: myapp
volumes:
- pgdata:/var/lib/postgresql/data
volumes:
pgdata:
Run with ``docker compose up``. The API waits for ``db`` to start, then
connects using the ``DATABASE_URL`` environment variable.
Reverse Proxy
-------------
@@ -95,9 +157,31 @@ front of your application for:
- **Static asset serving** — offload static files to the proxy
- **Rate limiting** — at the infrastructure level
A minimal Caddy config that handles HTTPS automatically::
# Caddyfile
example.com {
reverse_proxy localhost:5042
}
Responder's ``TrustedHostMiddleware`` and ``HTTPSRedirectMiddleware`` work
correctly behind proxies that set standard forwarding headers
(``X-Forwarded-For``, ``X-Forwarded-Proto``).
That said, uvicorn is production-ready on its own. Many applications run
uvicorn directly without a reverse proxy and do just fine.
Production Checklist
--------------------
Before going live:
- **Set a secret key**``SECRET_KEY`` env var, never the default
- **Disable debug mode**``DEBUG=false`` or omit it entirely
- **Set allowed hosts** — restrict to your actual domain names
- **Use multiple workers**``--workers 4`` or more, depending on CPU cores
- **Add a health check**``/health`` endpoint for monitoring
- **Enable HTTPS** — via your proxy, cloud platform, or uvicorn's ``--ssl-*`` flags
- **Set up logging** — uvicorn logs requests by default; pipe them to your log aggregator
- **Pin your dependencies** — use a lock file or pinned requirements for reproducible deploys
+7 -3
View File
@@ -59,21 +59,25 @@ What You Get
One ``pip install``, batteries included:
- Pydantic request validation and response serialization.
- Mount Flask, Django, or any WSGI/ASGI app at a subroute.
- Gzip compression, HSTS, CORS, and trusted host validation.
- Before-request hooks that can short-circuit for auth guards.
- Before-request and after-request hooks for auth and logging.
- A test client for fast, in-process testing with pytest.
- Route parameters with f-string syntax and type convertors.
- Lifespan context managers for startup and shutdown logic.
- Custom exception handlers for clean error responses.
- `GraphQL`_ with Graphene and a built-in GraphiQL IDE.
- Server-Sent Events for real-time streaming.
- File serving with automatic content-type detection.
- Sync and async views — ``async`` is always optional.
- Class-based views with ``on_get``, ``on_post``, ``on_request``.
- Built-in rate limiting with ``X-RateLimit`` headers.
- Content negotiation: JSON, YAML, and MessagePack.
- A pleasant API with a single import statement.
- OpenAPI schema generation with Swagger UI.
- A production `uvicorn`_ server, ready to deploy.
- HTTP method filtering for REST APIs.
- Route groups for API versioning.
- Signed cookie-based sessions.
- Background tasks in a thread pool.
- WebSocket support.
@@ -86,7 +90,7 @@ Installation
$ uv pip install responder
Python 3.9 and above. That's it.
Python 3.10 and above. That's it.
.. toctree::
+6
View File
@@ -376,3 +376,9 @@ jump into the tutorials:
- :doc:`tutorial-rest` — build a full CRUD API with validation
- :doc:`tutorial-sqlalchemy` — connect to a database
- :doc:`tutorial-auth` — add authentication
- :doc:`tutorial-websockets` — real-time communication
- :doc:`tutorial-middleware` — hooks and middleware
- :doc:`tutorial-flask` — migrating from Flask
- :doc:`guide-config` — environment variables and secrets
- :doc:`deployment` — Docker, cloud platforms, and production
- :doc:`testing` — writing tests with pytest
+43 -17
View File
@@ -2,35 +2,61 @@
# Development Sandbox
## Setup
Set up a development sandbox.
Acquire sources and create virtualenv.
Clone the repo and install all dependencies:
```shell
git clone https://github.com/kennethreitz/responder.git
cd responder
uv venv
```
Install project in editable mode, including
all development tools.
```shell
uv venv && source .venv/bin/activate
uv pip install --upgrade --editable '.[develop,docs,release,test]'
```
## Operations
Run tests.
## Running Tests
```shell
source .venv/bin/activate
pytest
pytest # full suite with coverage
pytest tests/test_responder.py -xvs # single file, stop on first failure
pytest -k "test_mount" # run tests matching a pattern
```
Format code.
## Code Formatting
```shell
ruff format .
ruff check --fix .
ruff format . # auto-format
ruff check --fix . # lint and auto-fix
```
Documentation authoring.
## Type Checking
```shell
sphinx-autobuild --open-browser --watch docs/source docs/source docs/build
mypy
```
## Documentation
Live-reloading doc server (opens in browser):
```shell
cd docs
sphinx-autobuild --open-browser --watch source source build
```
Or build once:
```shell
cd docs
make html
# open build/html/index.html
```
## Project Layout
```
responder/
├── responder/ # main package
│ ├── api.py # API class — the entry point
│ ├── routes.py # Router, Route, WebSocketRoute
│ ├── models.py # Request and Response wrappers
│ ├── ext/ # extensions (CLI, GraphQL, OpenAPI, rate limiting)
│ ├── background.py # background task queue
│ └── formats.py # content negotiation (JSON, YAML, msgpack)
├── tests/ # pytest test suite
├── examples/ # runnable example apps
├── docs/source/ # Sphinx documentation
└── pyproject.toml # project metadata and tool config
```
+54
View File
@@ -269,6 +269,57 @@ just like in production. You can verify their effects on the response::
assert r.headers["X-Served-By"] == "responder"
Testing Rate Limiting
---------------------
Rate limiters are just hooks — they run automatically during tests.
Verify the headers and the 429 response::
from responder.ext.ratelimit import RateLimiter
def test_rate_limiting():
api = responder.API(allowed_hosts=["localhost"])
limiter = RateLimiter(requests=2, period=60)
limiter.install(api)
@api.route("/")
def view(req, resp):
resp.text = "ok"
# First two requests succeed
for _ in range(2):
r = api.requests.get("http://localhost/")
assert r.status_code == 200
assert "X-RateLimit-Remaining" in r.headers
# Third request is rate limited
r = api.requests.get("http://localhost/")
assert r.status_code == 429
Testing Mounted Apps
--------------------
When testing WSGI apps mounted at a subroute, use ``localhost`` as the
host to avoid Werkzeug's trusted host validation::
from flask import Flask
def test_flask_mount():
api = responder.API(allowed_hosts=["localhost"])
flask_app = Flask(__name__)
@flask_app.route("/")
def hello():
return "Hello from Flask!"
api.mount("/flask", flask_app)
r = api.requests.get("http://localhost/flask")
assert r.status_code == 200
assert "Hello from Flask" in r.text
Tips
----
@@ -284,3 +335,6 @@ Tips
- **Test the contract, not the implementation.** Assert on status codes,
response bodies, and headers — not on internal state.
- **Use ``localhost`` for mounted WSGI apps.** Werkzeug 3.1.7+ validates
the ``Host`` header, so avoid synthetic hosts like ``;`` in tests.
+89 -2
View File
@@ -416,6 +416,22 @@ Requests to ``/flask/`` will be handled by Flask. Everything else goes
through Responder. Both WSGI and ASGI apps are supported — Responder
wraps WSGI apps in an ASGI adapter automatically.
You can also mount `marimo <https://marimo.io/>`_ notebooks as
interactive dashboards within your API::
import marimo
server = (
marimo.create_asgi_app()
.with_app(path="", root="./notebooks/dashboard.py")
.with_app(path="/analysis", root="./notebooks/analysis.py")
)
api.mount("/notebooks", server.build())
Notebooks are served at ``/notebooks/`` and ``/notebooks/analysis``,
with full interactivity — reactive cells, widgets, plots, and all.
Cookies
-------
@@ -580,6 +596,73 @@ can pace themselves.
The rate limiter is per-client, keyed by IP address.
Pydantic Validation
-------------------
`Pydantic <https://docs.pydantic.dev/>`_ models integrate directly with
Responder's routing. Set ``request_model`` to validate incoming data and
``response_model`` to control the shape of outgoing data::
from pydantic import BaseModel
class ItemIn(BaseModel):
name: str
price: float
class ItemOut(BaseModel):
id: int
name: str
price: float
@api.route("/items", methods=["POST"],
request_model=ItemIn, response_model=ItemOut)
async def create_item(req, resp):
data = await req.media()
resp.media = {"id": 1, **data}
When ``request_model`` is set:
- Valid requests are parsed and the data is available via ``await req.media()``
- Invalid requests get an automatic ``422 Unprocessable Entity`` response
with detailed error messages — you don't write any validation code
When ``response_model`` is set:
- The response is serialized through the model before being sent
- Extra fields are stripped automatically
- Type coercion happens at the boundary
This is the recommended way to build validated REST APIs with Responder.
See the :doc:`tutorial-rest` for a complete walkthrough.
Content Negotiation
-------------------
Responder automatically negotiates the response format based on the
client's ``Accept`` header. Set ``resp.media`` to a Python object and
the right thing happens:
- ``Accept: application/json`` (default) → JSON
- ``Accept: application/x-yaml`` → YAML
- ``Accept: application/x-msgpack`` → MessagePack
This means a single endpoint serves multiple formats without any
conditional logic in your code::
@api.route("/data")
def data(req, resp):
resp.media = {"key": "value"}
Clients get the format they ask for::
$ curl http://localhost:5042/data
{"key": "value"}
$ curl -H "Accept: application/x-yaml" http://localhost:5042/data
key: value
MessagePack
-----------
@@ -592,6 +675,10 @@ Responder supports MessagePack alongside JSON and YAML::
# Decode a MessagePack request body
data = await req.media("msgpack")
Content negotiation works too — clients can send
# Respond with MessagePack
resp.media = {"result": [1, 2, 3]}
Content negotiation works automatically — clients can send
``Accept: application/x-msgpack`` to receive MessagePack responses
instead of JSON.
instead of JSON. You can also explicitly decode MessagePack request
bodies by passing ``"msgpack"`` to ``req.media()``.
+55 -2
View File
@@ -46,14 +46,14 @@ Install PyJWT::
Create a helper to encode and decode tokens::
import jwt
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
SECRET = "your-secret-key"
def create_token(user_id: int) -> str:
payload = {
"sub": user_id,
"exp": datetime.utcnow() + timedelta(hours=24),
"exp": datetime.now(timezone.utc) + timedelta(hours=24),
}
return jwt.encode(payload, SECRET, algorithm="HS256")
@@ -189,3 +189,56 @@ Remember to set a proper secret key::
The session data is signed (not encrypted) — users can read it but
can't tamper with it. Don't store sensitive data like passwords in
sessions.
Role-Based Access Control
--------------------------
For APIs where different users have different permissions, embed the
role in the token and check it in route-specific guards::
def create_token(user_id: int, role: str = "user") -> str:
payload = {
"sub": user_id,
"role": role,
"exp": datetime.now(timezone.utc) + timedelta(hours=24),
}
return jwt.encode(payload, SECRET, algorithm="HS256")
Create a helper that checks for a specific role::
def require_role(*roles):
"""Before-request hook factory that restricts by role."""
def check(req, resp):
user_role = getattr(req.state, "role", None)
if user_role not in roles:
resp.status_code = 403
resp.media = {"error": "Insufficient permissions"}
return check
Use it on specific routes::
@api.route("/admin/users", before_request=require_role("admin"))
def list_all_users(req, resp):
resp.media = {"users": [...]}
And store the role during token verification::
# In your auth_guard:
req.state.user_id = payload["sub"]
req.state.role = payload.get("role", "user")
Choosing an Auth Strategy
--------------------------
- **API keys** — simplest. Good for server-to-server, CLI tools, and
internal services. No expiration unless you build it.
- **JWT tokens** — standard for SPAs and mobile apps. Stateless, so
they scale well. Downside: you can't revoke them without a blocklist.
- **Sessions** — best for traditional web apps with HTML forms. The
browser manages cookies automatically. Stateful — the server controls
the session lifecycle.
Start with API keys for internal tools, JWT for public APIs, and
sessions for web apps with login pages.
+42 -1
View File
@@ -1,5 +1,5 @@
Writing Middleware
=================
==================
Middleware sits between the server and your route handlers, processing
every request and response that flows through your application. It's the
@@ -117,6 +117,47 @@ the existing stack. Keep this in mind for ordering dependencies — if
middleware A depends on middleware B having run first, add B before A.
Writing Pure ASGI Middleware
----------------------------
For maximum performance and control, you can write middleware as a plain
ASGI application. This bypasses Starlette's ``BaseHTTPMiddleware``
abstraction — it's faster and gives you direct access to the ASGI
protocol::
class SecurityHeadersMiddleware:
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
async def send_with_headers(message):
if message["type"] == "http.response.start":
extra = [
(b"x-content-type-options", b"nosniff"),
(b"x-frame-options", b"DENY"),
(b"referrer-policy", b"strict-origin-when-cross-origin"),
]
message["headers"] = list(message["headers"]) + extra
await send(message)
await self.app(scope, receive, send_with_headers)
api.add_middleware(SecurityHeadersMiddleware)
This is the same pattern used internally by Starlette and uvicorn. The
middleware receives the ASGI ``scope``, ``receive``, and ``send`` callables,
and wraps ``send`` to inject headers into the response.
For most cases, ``BaseHTTPMiddleware`` is simpler and perfectly fine.
Use the pure ASGI approach when you need to handle WebSocket connections,
streaming responses, or want to avoid the overhead of request/response
object creation.
When to Use What
-----------------
+11 -10
View File
@@ -28,8 +28,8 @@ SQLAlchemy models map Python classes to database tables. Each attribute
becomes a column::
# models.py
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy import String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
pass
@@ -37,15 +37,16 @@ becomes a column::
class Book(Base):
__tablename__ = "books"
id = Column(Integer, primary_key=True, autoincrement=True)
title = Column(String, nullable=False)
author = Column(String, nullable=False)
year = Column(Integer, nullable=False)
isbn = Column(String, nullable=True)
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
title: Mapped[str] = mapped_column(String, nullable=False)
author: Mapped[str] = mapped_column(String, nullable=False)
year: Mapped[int] = mapped_column(nullable=False)
isbn: Mapped[str | None] = mapped_column(String, nullable=True)
``DeclarativeBase`` is SQLAlchemy's modern base class (SQLAlchemy 2.0+).
Each model class corresponds to a table, and each ``Column`` corresponds
to a column in that table.
This uses SQLAlchemy 2.0's ``Mapped`` type annotations and
``mapped_column()``, which give you type checker support and cleaner
syntax than the older ``Column()`` style. Each model class corresponds
to a table, and each ``mapped_column()`` corresponds to a column.
Database Setup
+50 -2
View File
@@ -58,6 +58,8 @@ A chat room needs to broadcast messages to all connected clients. We keep
a set of active connections and iterate through them when someone sends
a message::
from starlette.websockets import WebSocketDisconnect
connected = set()
@api.route("/chat", websocket=True)
@@ -70,13 +72,15 @@ a message::
# Broadcast to all connected clients
for client in connected:
await client.send_text(message)
except Exception:
except WebSocketDisconnect:
pass
finally:
connected.discard(ws)
The ``try/finally`` block ensures we remove disconnected clients from
the set, even if the connection drops unexpectedly.
the set, even if the connection drops unexpectedly. Catching
``WebSocketDisconnect`` specifically (rather than bare ``Exception``)
makes the intent clear and avoids swallowing real bugs.
Data Formats
@@ -154,6 +158,40 @@ WebSocket before-request hooks receive the ``ws`` object and must call
``await ws.accept()`` if they want the connection to proceed.
Connection Lifecycle
--------------------
WebSocket connections go through several states:
1. **Connecting** — the client sends an upgrade request
2. **Open** — after ``await ws.accept()``, both sides can send messages
3. **Closing** — either side initiates a close handshake
4. **Closed** — the connection is fully terminated
When a client disconnects (closes the tab, loses network), the next
``await ws.receive_text()`` raises ``WebSocketDisconnect``. Always
handle this — otherwise your server accumulates dead connections::
from starlette.websockets import WebSocketDisconnect
@api.route("/ws", websocket=True)
async def handler(ws):
await ws.accept()
try:
while True:
data = await ws.receive_text()
await ws.send_text(f"Got: {data}")
except WebSocketDisconnect:
print("Client disconnected")
You can also close connections from the server side::
await ws.close(code=1000) # 1000 = normal closure
Common close codes: ``1000`` (normal), ``1001`` (going away),
``1008`` (policy violation), ``1011`` (server error).
Testing WebSockets
------------------
@@ -169,3 +207,13 @@ Use Starlette's ``TestClient`` for WebSocket tests::
The ``websocket_connect`` context manager handles the connection
lifecycle — it connects on enter and disconnects on exit.
You can also test that connections are properly rejected::
from starlette.websockets import WebSocketDisconnect
def test_websocket_404():
client = TestClient(api)
with pytest.raises(WebSocketDisconnect):
with client.websocket_connect("/nonexistent"):
pass
+31
View File
@@ -0,0 +1,31 @@
"""Mount marimo notebooks inside a Responder API.
Requirements:
pip install responder marimo
Usage:
python examples/marimo_mount.py
Then visit:
http://127.0.0.1:5042/hello → Responder JSON endpoint
http://127.0.0.1:5042/notebooks/ → Interactive marimo notebook
"""
import marimo
import responder
api = responder.API()
@api.route("/hello")
def hello(req, resp):
resp.media = {"message": "Hello from Responder!"}
# Mount marimo notebooks at /notebooks
server = marimo.create_asgi_app().with_app(path="", root="notebooks/hello.py")
api.mount("/notebooks", server.build())
if __name__ == "__main__":
api.run()
+9 -3
View File
@@ -1,8 +1,9 @@
# Complete REST API example with Pydantic validation.
# https://responder.kennethreitz.org/tutorial-rest.html
import responder
from pydantic import BaseModel
import responder
class BookIn(BaseModel):
title: str
@@ -35,8 +36,13 @@ def list_books(req, resp):
resp.media = list(books_db.values())
@api.route("/books", methods=["POST"], check_existing=False,
request_model=BookIn, response_model=BookOut)
@api.route(
"/books",
methods=["POST"],
check_existing=False,
request_model=BookIn,
response_model=BookOut,
)
async def create_book(req, resp):
global next_id
data = await req.media()
+4 -2
View File
@@ -1,5 +1,7 @@
# WebSocket chat room example.
# https://responder.kennethreitz.org/tutorial-websockets.html
from starlette.websockets import WebSocketDisconnect
import responder
api = responder.API()
@@ -35,7 +37,7 @@ def index(req, resp):
</script>
</body>
</html>
"""
""" # noqa: E501
@api.route("/chat", websocket=True)
@@ -47,7 +49,7 @@ async def chat(ws):
message = await ws.receive_text()
for client in connected:
await client.send_text(message)
except Exception:
except WebSocketDisconnect:
pass
finally:
connected.discard(ws)
+57 -74
View File
@@ -8,11 +8,11 @@ requires = [
name = "responder"
description = "A familiar HTTP Service Framework for Python."
readme = "README.md"
license = {text = "Apache 2.0"}
license = { text = "Apache 2.0" }
authors = [
{ name = "Kenneth Reitz", email = "me@kennethreitz.org" },
]
requires-python = ">=3.9"
requires-python = ">=3.10"
classifiers = [
"Development Status :: 5 - Production/Stable",
"Environment :: Web Environment",
@@ -20,19 +20,21 @@ classifiers = [
"License :: OSI Approved :: Apache Software License",
"Operating System :: OS Independent",
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Programming Language :: Python :: 3.14",
"Programming Language :: Python :: Free Threading",
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy",
"Topic :: Internet :: WWW/HTTP",
]
dynamic = ["version"]
dynamic = [ "version" ]
dependencies = [
"a2wsgi",
"apispec>=1.0.0",
"apispec>=1",
"chardet",
"docopt-ng",
"graphene>=3",
@@ -42,17 +44,15 @@ dependencies = [
"pueblo[sfa-full]>=0.0.11",
"pydantic>=2",
"python-multipart",
"starlette[full]>=0.40",
"starlette[full]>=1",
"uvicorn[standard]",
]
[project.optional-dependencies]
develop = [
optional-dependencies.develop = [
"pyproject-fmt",
"ruff",
"validate-pyproject",
]
docs = [
optional-dependencies.docs = [
"alabaster<1.1",
"myst-parser",
"sphinx>=5,<9",
@@ -60,8 +60,8 @@ docs = [
"sphinx-copybutton",
"sphinx-design-elements",
]
release = ["build", "twine"]
test = [
optional-dependencies.release = [ "build", "twine" ]
optional-dependencies.test = [
"flask",
"mypy",
"pytest",
@@ -69,32 +69,22 @@ test = [
"pytest-mock",
"pytest-rerunfailures",
]
urls.Documentation = "https://responder.kennethreitz.org"
urls.Homepage = "https://github.com/kennethreitz/responder"
urls.Issues = "https://github.com/kennethreitz/responder/issues"
urls.Repository = "https://github.com/kennethreitz/responder"
scripts.responder = "responder.ext.cli:cli"
[project.scripts]
responder = "responder.ext.cli:cli"
[project.urls]
Homepage = "https://github.com/kennethreitz/responder"
Documentation = "https://responder.kennethreitz.org"
Repository = "https://github.com/kennethreitz/responder"
Issues = "https://github.com/kennethreitz/responder/issues"
[tool.setuptools.dynamic]
version = {attr = "responder.__version__.__version__"}
[tool.setuptools.package-data]
responder = ["py.typed"]
[tool.setuptools.packages.find]
exclude = ["tests"]
[tool.setuptools]
dynamic.version = { attr = "responder.__version__.__version__" }
package-data.responder = [ "py.typed", "ext/openapi/docs/*.html" ]
packages.find.exclude = [ "tests" ]
[tool.ruff]
line-length = 90
extend-exclude = [
"docs/source/conf.py",
]
lint.select = [
# Builtins
"A",
@@ -122,59 +112,20 @@ lint.select = [
# flake8-2020
"YTT",
]
lint.extend-ignore = [
"S101", # Allow use of `assert`.
"S101", # Allow use of `assert`.
]
lint.per-file-ignores."responder/util/cmd.py" = [ "A005" ] # Module shadows a Python standard-library module
lint.per-file-ignores."responder/util/cmd.py" = [ "A005" ] # Module shadows a Python standard-library module
lint.per-file-ignores."tests/*" = [
"ERA001", # Found commented-out code.
"S101", # Allow use of `assert`, and `print`.
]
[tool.pytest.ini_options]
addopts = """
-rfEXs -p pytester --strict-markers --verbosity=3
--cov --cov-report=term-missing --cov-report=xml
"""
filterwarnings = [
"error::UserWarning",
]
log_level = "DEBUG"
log_cli_level = "DEBUG"
log_format = "%(asctime)-15s [%(name)-36s] %(levelname)-8s: %(message)s"
minversion = "2.0"
testpaths = [
"responder",
"tests",
]
markers = [
]
xfail_strict = true
[tool.coverage.run]
branch = false
omit = [
"*.html",
"tests/*",
]
[tool.coverage.report]
fail_under = 0
show_missing = true
exclude_lines = [
"# pragma: no cover",
"raise NotImplemented",
]
[tool.mypy]
packages = [
"responder",
]
exclude = [
]
exclude = []
check_untyped_defs = true
explicit_package_bases = true
ignore_missing_imports = true
@@ -182,3 +133,35 @@ implicit_optional = true
install_types = true
namespace_packages = true
non_interactive = true
[tool.pytest]
ini_options.addopts = """
-rfEXs -p pytester --strict-markers --verbosity=3
--cov --cov-report=term-missing --cov-report=xml
"""
ini_options.filterwarnings = [
"error::UserWarning",
]
ini_options.log_level = "DEBUG"
ini_options.log_cli_level = "DEBUG"
ini_options.log_format = "%(asctime)-15s [%(name)-36s] %(levelname)-8s: %(message)s"
ini_options.minversion = "2.0"
ini_options.testpaths = [
"responder",
"tests",
]
ini_options.markers = []
ini_options.xfail_strict = true
[tool.coverage]
run.branch = false
run.omit = [
"*.html",
"tests/*",
]
report.exclude_lines = [
"# pragma: no cover",
"raise NotImplemented",
]
report.fail_under = 0
report.show_missing = true
+1 -1
View File
@@ -1 +1 @@
__version__ = "3.3.0"
__version__ = "3.5.0"
+65 -4
View File
@@ -1,4 +1,5 @@
import asyncio
import inspect
import os
from pathlib import Path
@@ -61,6 +62,31 @@ class API:
lifespan=None,
request_id=False,
):
"""Create a new Responder API instance.
:param debug: If ``True``, enable debug mode with verbose error pages.
:param title: The title of the API, used in OpenAPI documentation.
:param version: The version string for the API (e.g. ``"1.0"``).
:param description: A longer description of the API for OpenAPI docs.
:param terms_of_service: URL to the API's terms of service.
:param contact: Contact information dict for the API (``name``, ``url``, ``email``).
:param license: License information dict (``name``, ``url``).
:param openapi: The OpenAPI version string (e.g. ``"3.0.2"``). Enables OpenAPI schema generation.
:param openapi_route: The URL path for the OpenAPI schema (default ``"/schema.yml"``).
:param static_dir: Directory for static files. Set to ``None`` to disable. Created automatically if missing.
:param static_route: URL prefix for serving static files (default ``"/static"``).
:param templates_dir: Directory for Jinja2 templates (default ``"templates"``).
:param auto_escape: If ``True``, auto-escape HTML/XML in templates.
:param secret_key: Secret key for signing cookie-based sessions. **Always set this in production.**
:param enable_hsts: If ``True``, redirect all HTTP requests to HTTPS.
:param docs_route: URL path for interactive API docs (e.g. ``"/docs"``). Enables OpenAPI if not already set.
:param cors: If ``True``, enable CORS middleware.
:param cors_params: Dict of CORS configuration (``allow_origins``, ``allow_methods``, etc.).
:param allowed_hosts: List of allowed hostnames (e.g. ``["example.com"]``). Defaults to ``["*"]``.
:param openapi_theme: Documentation UI theme: ``"swagger_ui"``, ``"redoc"``, ``"rapidoc"``, or ``"elements"``.
:param lifespan: An async context manager for startup/shutdown logic.
:param request_id: If ``True``, add ``X-Request-ID`` headers to all responses.
""" # noqa: E501
self.background = BackgroundQueue()
self.secret_key = secret_key
@@ -136,9 +162,7 @@ class API:
import uuid as _uuid
def _add_request_id(req, resp):
rid = req.headers.get(
"X-Request-ID", str(_uuid.uuid4())
)
rid = req.headers.get("X-Request-ID", str(_uuid.uuid4()))
resp.headers["X-Request-ID"] = rid
self.router.after_request(_add_request_id)
@@ -150,12 +174,30 @@ class API:
@property
def static_app(self):
"""The Starlette ``StaticFiles`` application for serving static assets."""
if not hasattr(self, "_static_app"):
assert self.static_dir is not None
self._static_app = StaticFiles(directory=self.static_dir)
return self._static_app
def before_request(self, websocket=False):
"""Register a function to run before every request.
If the hook sets ``resp.status_code``, the route handler is skipped
and the response is sent immediately (short-circuiting).
:param websocket: If ``True``, register as a WebSocket before-request hook instead of HTTP.
Usage::
@api.before_request()
def check_auth(req, resp):
if "Authorization" not in req.headers:
resp.status_code = 401
resp.media = {"error": "unauthorized"}
""" # noqa: E501
def decorator(f):
self.router.before_request(f, websocket=websocket)
return f
@@ -180,6 +222,21 @@ class API:
return decorator
def add_middleware(self, middleware_cls, **middleware_config):
"""Add ASGI middleware to the application.
Middleware wraps the entire application and can inspect or modify
every request and response. Middleware is applied in reverse order —
the last middleware added runs first.
:param middleware_cls: A Starlette-compatible middleware class.
:param middleware_config: Keyword arguments passed to the middleware constructor.
Usage::
from starlette.middleware.httpsredirect import HTTPSRedirectMiddleware
api.add_middleware(HTTPSRedirectMiddleware)
"""
self.app = middleware_cls(self.app, **middleware_config)
def exception_handler(self, exception_cls):
@@ -200,7 +257,7 @@ class API:
req = Request(request.scope, request.receive, formats=get_formats())
resp = Response(req=req, formats=get_formats())
if asyncio.iscoroutinefunction(func):
if inspect.iscoroutinefunction(func):
await func(req, resp, exc)
else:
func(req, resp, exc)
@@ -501,6 +558,10 @@ class API:
uvicorn.run(self, host=address, port=port, **options)
def run(self, **kwargs):
"""Run the application. Shorthand for :meth:`serve` that inherits the ``debug`` setting.
:param kwargs: Keyword arguments passed through to :meth:`serve`.
""" # noqa: E501
if "debug" not in kwargs:
kwargs.update({"debug": self.debug})
self.serve(**kwargs)
+41 -1
View File
@@ -1,5 +1,6 @@
import asyncio
import concurrent.futures
import inspect
import multiprocessing
import traceback
@@ -9,7 +10,33 @@ __all__ = ["BackgroundQueue"]
class BackgroundQueue:
"""A queue for running tasks in background threads.
Uses a ``ThreadPoolExecutor`` sized to the number of CPUs. Access it
via ``api.background``.
Usage::
# As a decorator — fire and forget
@api.background.task
def send_email(to, subject):
...
send_email("user@example.com", "Hello")
# Direct submission
future = api.background.run(send_email, "user@example.com", "Hello")
# As a callable (supports async functions)
await api.background(send_email, "user@example.com", "Hello")
"""
def __init__(self, n=None):
"""Create a new background queue.
:param n: Number of worker threads. Defaults to CPU count.
"""
if n is None:
n = multiprocessing.cpu_count()
@@ -18,11 +45,24 @@ class BackgroundQueue:
self.results = []
def run(self, f, *args, **kwargs):
"""Submit a function to run in a background thread.
:param f: The function to run.
:returns: A ``concurrent.futures.Future`` for the result.
"""
f = self.pool.submit(f, *args, **kwargs)
self.results.append(f)
return f
def task(self, f):
"""Decorator that wraps a function to run in the background thread pool.
The decorated function returns a ``Future`` instead of blocking.
Exceptions are printed to stderr via traceback.
:param f: The function to wrap.
"""
def on_future_done(fs):
try:
fs.result()
@@ -37,6 +77,6 @@ class BackgroundQueue:
return do_task
async def __call__(self, func, *args, **kwargs) -> None:
if asyncio.iscoroutinefunction(func):
if inspect.iscoroutinefunction(func):
return await asyncio.create_task(func(*args, **kwargs))
return await run_in_threadpool(func, *args, **kwargs)
+47 -2
View File
@@ -4,12 +4,43 @@ from .templates import GRAPHIQL
class GraphQLView:
"""A class-based view that serves a GraphQL API.
Handles query resolution from multiple sources (JSON body, query
parameters, raw request text) and renders the GraphiQL IDE for
browser requests.
:param api: The Responder API instance.
:param schema: A Graphene schema instance.
"""
def __init__(self, *, api, schema):
self.api = api
self.schema = schema
@staticmethod
def _parse_variables(raw):
"""Parse variables from a string (query param) or return as-is (dict)."""
if raw is None:
return None
if isinstance(raw, str):
try:
return json.loads(raw)
except (json.JSONDecodeError, ValueError):
return None
return raw
@staticmethod
async def _resolve_graphql_query(req, resp):
"""Extract query, variables, and operationName from the request.
Supports multiple input sources, checked in order:
1. JSON body (``Content-Type: application/json``)
2. Form data (``Content-Type: application/x-www-form-urlencoded``)
3. Query parameters (``?query=...&variables=...&operationName=...``)
4. Raw request text
"""
if "json" in req.mimetype:
json_media = await req.media("json")
if "query" not in json_media:
@@ -22,9 +53,22 @@ class GraphQLView:
json_media.get("operationName"),
)
# Support query/q in params.
if "form" in req.mimetype:
form_data = await req.media("form")
if "query" in form_data:
return (
form_data["query"],
GraphQLView._parse_variables(form_data.get("variables")),
form_data.get("operationName"),
)
# Support query/variables/operationName in query params.
if "query" in req.params:
return req.params["query"], None, None
return (
req.params["query"],
GraphQLView._parse_variables(req.params.get("variables")),
req.params.get("operationName"),
)
if "q" in req.params:
return req.params["q"], None, None
@@ -32,6 +76,7 @@ class GraphQLView:
return await req.text, None, None
async def graphql_response(self, req, resp):
"""Process a GraphQL request and populate the response."""
show_graphiql = req.method == "get" and req.accepts("text/html")
if show_graphiql:
+3 -1
View File
@@ -129,7 +129,9 @@ class OpenAPISchema:
op["requestBody"] = {
"content": {
"application/json": {
"schema": {"$ref": f"#/components/schemas/{model_name}"}
"schema": {
"$ref": f"#/components/schemas/{model_name}"
}
}
}
}
+1 -3
View File
@@ -38,9 +38,7 @@ class RateLimiter:
def _cleanup(self, key):
now = time.time()
cutoff = now - self.period
self._buckets[key] = [
t for t in self._buckets[key] if t > cutoff
]
self._buckets[key] = [t for t in self._buckets[key] if t > cutoff]
def check(self, req, resp):
"""Check rate limit. Sets 429 status if exceeded."""
+79 -10
View File
@@ -49,6 +49,12 @@ class CaseInsensitiveDict(dict):
class QueryDict(dict):
"""A dictionary for query string parameters that handles multi-value keys.
Single-value access returns the last value for a key. Use :meth:`get_list`
to retrieve all values for a multi-value parameter.
"""
def __init__(self, query_string):
self.update(parse_qs(query_string))
@@ -117,6 +123,13 @@ class QueryDict(dict):
class Request:
"""An HTTP request, passed to each view as the first argument.
Provides access to headers, cookies, query parameters, the request body,
session data, and more. Most properties are synchronous; reading the body
(via :attr:`content`, :attr:`text`, or :meth:`media`) requires ``await``.
"""
__slots__ = [
"_starlette",
"formats",
@@ -153,6 +166,7 @@ class Request:
@property
def mimetype(self):
"""The MIME type of the request body, from the ``Content-Type`` header."""
return self.headers.get("Content-Type", "")
@property
@@ -270,6 +284,7 @@ class Request:
@property
def is_secure(self):
"""``True`` if the request was made over HTTPS."""
return self.url.scheme == "https"
def accepts(self, content_type):
@@ -315,6 +330,22 @@ def content_setter(mimetype):
class Response:
"""An HTTP response, passed to each view as the second argument.
Mutate this object to control what gets sent back to the client. Set
:attr:`text`, :attr:`html`, :attr:`media`, or :attr:`content` to define
the body. Use :attr:`headers` and :meth:`set_cookie` to control metadata.
:var text: Set the response body as plain text (sets ``Content-Type: text/plain``).
:var html: Set the response body as HTML (sets ``Content-Type: text/html``).
:var media: Set a Python object (dict, list) to be serialized as JSON (or negotiated format).
:var content: Set the raw response body as bytes.
:var status_code: The HTTP status code (e.g. ``200``, ``404``). Defaults to ``200`` if not set.
:var headers: A dict of response headers.
:var cookies: A ``SimpleCookie`` holding cookies to set on the response.
:var session: A dict of session data. Changes are persisted in a signed cookie.
""" # noqa: E501
__slots__ = [
"req",
"status_code",
@@ -334,23 +365,34 @@ class Response:
def __init__(self, req, *, formats):
self.req = req
#: The HTTP Status Code to use for the Response.
self.status_code: int | None = None
self.content = None #: A bytes representation of the response body.
self.content = None
self.mimetype = None
self.encoding = DEFAULT_ENCODING
self.media = None #: A Python object that will be content-negotiated and
#: sent back to the client. Typically, in JSON formatting.
self.media = None
self._stream = None
self.headers = {} #: A Python dictionary of ``{key: value}``,
#: representing the headers of the response.
self.headers = {}
self.formats = formats
self.cookies: SimpleCookie = SimpleCookie() #: The cookies set in the Response
self.session = (
req.session
) #: The cookie-based session data, in dict form, to add to the Response.
self.cookies: SimpleCookie = SimpleCookie()
self.session = req.session
def stream(self, func, *args, **kwargs):
"""Set up a streaming response from an async generator function.
The generator yields chunks of bytes that are sent to the client
as they are produced, without buffering the full response in memory.
Usage::
@api.route("/stream")
async def stream_data(req, resp):
@resp.stream
async def body():
for i in range(10):
yield f"chunk {i}\\n".encode()
:param func: An async generator function that yields response chunks.
"""
assert inspect.isasyncgenfunction(func)
self._stream = functools.partial(func, *args, **kwargs)
@@ -451,6 +493,12 @@ class Response:
self.mimetype = guessed or "application/octet-stream"
def redirect(self, location, *, set_text=True, status_code=HTTP_301):
"""Redirect the client to a different URL.
:param location: The URL to redirect to.
:param set_text: If ``True``, set a default redirect message as the body.
:param status_code: The HTTP status code (default ``301``).
"""
self.status_code = status_code
if set_text:
self.text = f"Redirecting to: {location}"
@@ -496,6 +544,25 @@ class Response:
secure=False,
httponly=True,
):
"""Set a cookie on the response with full control over directives.
:param key: The cookie name.
:param value: The cookie value.
:param expires: Expiration date string (e.g. ``"Thu, 01 Jan 2026 00:00:00 GMT"``).
:param path: URL path the cookie applies to (default ``"/"``).
:param domain: Domain the cookie is valid for.
:param max_age: Maximum age in seconds before the cookie expires.
:param secure: If ``True``, cookie is only sent over HTTPS.
:param httponly: If ``True`` (default), cookie is inaccessible to JavaScript.
Usage::
resp.set_cookie(
"token", value="abc123",
max_age=3600, secure=True, httponly=True,
)
"""
self.cookies[key] = value
morsel = self.cookies[key]
if expires is not None:
@@ -534,10 +601,12 @@ class Response:
@property
def ok(self):
"""``True`` if the status code is in the 2xx range (success)."""
return 200 <= self.status_code_safe < 300
@property
def status_code_safe(self) -> int:
"""Return the status code, raising ``RuntimeError`` if it hasn't been set."""
if self.status_code is None:
raise RuntimeError("HTTP status code has not been defined")
return self.status_code
+96 -57
View File
@@ -1,14 +1,18 @@
from __future__ import annotations
import asyncio
import inspect
import re
import traceback
from collections import defaultdict
from collections.abc import Callable
from typing import Any, Union
__all__ = ["Route", "WebSocketRoute", "Router"]
from starlette.concurrency import run_in_threadpool
from starlette.exceptions import HTTPException
from starlette.types import ASGIApp
from starlette.types import ASGIApp, Receive, Scope, Send
from starlette.websockets import WebSocket, WebSocketClose
from . import status_codes
@@ -28,9 +32,9 @@ _CONVERTORS = {
PARAM_RE = re.compile("{([a-zA-Z_][a-zA-Z0-9_]*)(:[a-zA-Z_][a-zA-Z0-9_]*)?}")
def compile_path(path):
def compile_path(path: str) -> tuple[re.Pattern, dict[str, type]]:
path_re = "^"
param_convertors = {}
param_convertors: dict[str, type] = {}
idx = 0
for match in PARAM_RE.finditer(path):
@@ -54,40 +58,55 @@ def compile_path(path):
class BaseRoute:
def matches(self, scope):
def matches(self, scope: Scope) -> tuple[bool, dict]:
raise NotImplementedError()
async def __call__(self, scope, receive, send):
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
raise NotImplementedError()
class Route(BaseRoute):
def __init__(self, route, endpoint, *, before_request=False, methods=None):
"""An HTTP route that maps a URL pattern to an endpoint.
Supports path parameters with type convertors (``{id:int}``, ``{slug:str}``,
``{pk:uuid}``, ``{value:float}``, ``{rest:path}``).
"""
def __init__(
self,
route: str,
endpoint: Callable,
*,
before_request: bool = False,
methods: list[str] | None = None,
) -> None:
assert route.startswith("/"), "Route path must start with '/'"
self.route = route
self.endpoint = endpoint
self.before_request = before_request
self.methods = {m.upper() for m in methods} if methods else None
self.methods: set[str] | None = {m.upper() for m in methods} if methods else None
self.path_re: re.Pattern
self.param_convertors: dict[str, type]
self.path_re, self.param_convertors = compile_path(route)
# Strip type annotations for URL generation (e.g. {id:int} -> {id})
self._url_template = PARAM_RE.sub(r"{\1}", route)
def __repr__(self):
def __repr__(self) -> str:
return f"<Route {self.route!r}={self.endpoint!r}>"
def url(self, **params):
def url(self, **params: Any) -> str:
return self._url_template.format(**params)
@property
def endpoint_name(self):
def endpoint_name(self) -> str:
return self.endpoint.__name__
@property
def description(self):
def description(self) -> str | None:
return self.endpoint.__doc__
def matches(self, scope):
def matches(self, scope: Scope) -> tuple[bool, dict]:
if scope["type"] != "http":
return False, {}
@@ -106,7 +125,7 @@ class Route(BaseRoute):
return True, {"path_params": {**matched_params}}
async def __call__(self, scope, receive, send):
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
request = Request(scope, receive, formats=get_formats())
response = Response(req=request, formats=get_formats())
@@ -114,7 +133,7 @@ class Route(BaseRoute):
before_requests = scope.get("before_requests", [])
for before_request in before_requests.get("http", []):
if asyncio.iscoroutinefunction(before_request):
if inspect.iscoroutinefunction(before_request):
await before_request(request, response)
else:
await run_in_threadpool(before_request, request, response)
@@ -160,7 +179,7 @@ class Route(BaseRoute):
for view in views:
# Check __call__ for class-based views (e.g. GraphQL)
if asyncio.iscoroutinefunction(view) or asyncio.iscoroutinefunction(
if inspect.iscoroutinefunction(view) or inspect.iscoroutinefunction(
view.__call__
):
await view(request, response, **path_params)
@@ -173,13 +192,13 @@ class Route(BaseRoute):
try:
validated = resp_model(**response.media)
response.media = validated.model_dump()
except Exception:
except (ValueError, TypeError):
pass # Don't break the response if serialization fails
# Run after-request hooks
after_requests = scope.get("after_requests", [])
for after_request in after_requests:
if asyncio.iscoroutinefunction(after_request):
if inspect.iscoroutinefunction(after_request):
await after_request(request, response)
else:
await run_in_threadpool(after_request, request, response)
@@ -189,38 +208,46 @@ class Route(BaseRoute):
await response(scope, receive, send)
def __eq__(self, other):
def __eq__(self, other: object) -> bool:
if not isinstance(other, Route):
return NotImplemented
return self.route == other.route and self.endpoint == other.endpoint
def __hash__(self):
def __hash__(self) -> int:
return hash(self.route) ^ hash(self.endpoint) ^ hash(self.before_request)
class WebSocketRoute(BaseRoute):
def __init__(self, route, endpoint, *, before_request=False):
"""A WebSocket route that maps a URL pattern to a WebSocket handler."""
def __init__(
self, route: str, endpoint: Callable, *, before_request: bool = False
) -> None:
assert route.startswith("/"), "Route path must start with '/'"
self.route = route
self.endpoint = endpoint
self.before_request = before_request
self.path_re: re.Pattern
self.param_convertors: dict[str, type]
self.path_re, self.param_convertors = compile_path(route)
self._url_template = PARAM_RE.sub(r"{\1}", route)
def __repr__(self):
def __repr__(self) -> str:
return f"<Route {self.route!r}={self.endpoint!r}>"
def url(self, **params):
def url(self, **params: Any) -> str:
return self._url_template.format(**params)
@property
def endpoint_name(self):
def endpoint_name(self) -> str:
return self.endpoint.__name__
@property
def description(self):
def description(self) -> str | None:
return self.endpoint.__doc__
def matches(self, scope):
def matches(self, scope: Scope) -> tuple[bool, dict]:
if scope["type"] != "websocket":
return False, {}
@@ -236,7 +263,7 @@ class WebSocketRoute(BaseRoute):
return True, {"path_params": {**matched_params}}
async def __call__(self, scope, receive, send):
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
ws = WebSocket(scope, receive, send)
before_requests = scope.get("before_requests", [])
@@ -245,41 +272,53 @@ class WebSocketRoute(BaseRoute):
await self.endpoint(ws)
def __eq__(self, other):
def __eq__(self, other: object) -> bool:
if not isinstance(other, WebSocketRoute):
return NotImplemented
return self.route == other.route and self.endpoint == other.endpoint
def __hash__(self):
def __hash__(self) -> int:
return hash(self.route) ^ hash(self.endpoint) ^ hash(self.before_request)
class Router:
def __init__(
self, routes=None, default_response=None, before_requests=None, lifespan=None
):
self.routes = [] if routes is None else list(routes)
"""The core router that dispatches incoming requests to matching routes.
self.apps: dict[str, ASGIApp] = {}
self.default_endpoint = (
Handles route matching, before/after request hooks, lifespan events,
and mounted sub-applications.
"""
def __init__(
self,
routes: list[BaseRoute] | None = None,
default_response: Callable | None = None,
before_requests: dict[str, list[Callable]] | None = None,
lifespan: Callable | None = None,
) -> None:
self.routes: list[BaseRoute] = [] if routes is None else list(routes)
self.apps: dict[str, Union[ASGIApp, Any]] = {}
self.default_endpoint: Callable = (
self.default_response if default_response is None else default_response
)
self.before_requests = (
self.before_requests: dict[str, list[Callable]] = (
{"http": [], "ws": []} if before_requests is None else before_requests
)
self.after_requests: list = []
self.events = defaultdict(list)
self.after_requests: list[Callable] = []
self.events: defaultdict[str, list[Callable]] = defaultdict(list)
self._lifespan_handler = lifespan
def add_route(
self,
route=None,
endpoint=None,
route: str | None = None,
endpoint: Callable | None = None,
*,
default=False,
websocket=False,
before_request=False,
check_existing=False,
methods=None,
):
default: bool = False,
websocket: bool = False,
before_request: bool = False,
check_existing: bool = False,
methods: list[str] | None = None,
) -> None:
"""Adds a route to the router.
:param route: A string representation of the route
:param endpoint: The endpoint for the route -- can be callable, or class.
@@ -308,40 +347,40 @@ class Router:
self.routes.append(route)
def mount(self, route, app):
def mount(self, route: str, app: Any) -> None:
"""Mounts ASGI / WSGI applications at a given route"""
self.apps.update({route: app})
def add_event_handler(self, event_type, handler):
def add_event_handler(self, event_type: str, handler: Callable) -> None:
assert event_type in (
"startup",
"shutdown",
), f"Only 'startup' and 'shutdown' events are supported, not {event_type}."
self.events[event_type].append(handler)
async def trigger_event(self, event_type):
async def trigger_event(self, event_type: str) -> None:
for handler in self.events.get(event_type, []):
if asyncio.iscoroutinefunction(handler):
if inspect.iscoroutinefunction(handler):
await handler()
else:
handler()
def before_request(self, endpoint, websocket=False):
def before_request(self, endpoint: Callable, websocket: bool = False) -> None:
if websocket:
self.before_requests.setdefault("ws", []).append(endpoint)
else:
self.before_requests.setdefault("http", []).append(endpoint)
def after_request(self, endpoint):
def after_request(self, endpoint: Callable) -> None:
self.after_requests.append(endpoint)
def url_for(self, endpoint, **params):
def url_for(self, endpoint: Callable | str, **params: Any) -> str | None:
for route in self.routes:
if endpoint in (route.endpoint, route.endpoint.__name__):
return route.url(**params)
return None
async def default_response(self, scope, receive, send):
async def default_response(self, scope: Scope, receive: Receive, send: Send) -> None:
if scope["type"] == "websocket":
websocket_close = WebSocketClose()
await websocket_close(scope, receive, send)
@@ -352,7 +391,7 @@ class Router:
raise HTTPException(status_code=status_codes.HTTP_404) # type: ignore[attr-defined]
def _resolve_route(self, scope):
def _resolve_route(self, scope: Scope) -> BaseRoute | None:
for route in self.routes:
matches, child_scope = route.matches(scope)
if matches:
@@ -360,7 +399,7 @@ class Router:
return route
return None
async def lifespan(self, scope, receive, send):
async def lifespan(self, scope: Scope, receive: Receive, send: Send) -> None:
message = await receive()
assert message["type"] == "lifespan.startup"
@@ -395,7 +434,7 @@ class Router:
await send({"type": "lifespan.shutdown.complete"})
async def __call__(self, scope, receive, send):
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
assert scope["type"] in ("http", "websocket", "lifespan")
if scope["type"] == "lifespan":
@@ -418,7 +457,7 @@ class Router:
# Call into a submounted app, if one exists.
for path_prefix, app in self.apps.items():
if path.startswith(path_prefix):
scope["path"] = path[len(path_prefix) :]
scope["path"] = path[len(path_prefix) :] or "/"
scope["root_path"] = root_path + path_prefix
try:
await app(scope, receive, send)
+63 -11
View File
@@ -4,14 +4,14 @@ import time
import pytest
from starlette.testclient import TestClient as StarletteTestClient
from starlette.websockets import WebSocketDisconnect
import responder
from responder.background import BackgroundQueue
from responder.models import CaseInsensitiveDict, QueryDict, Response
from responder.models import QueryDict
from responder.routes import Route, WebSocketRoute
from responder.templates import Templates
# --- api.py coverage ---
@@ -78,7 +78,10 @@ def test_background_task_exception(capsys):
raise ValueError("task failed")
future = failing_task()
future.result # wait for completion
try:
future.result() # wait for completion
except ValueError:
pass
time.sleep(0.2) # let the done callback fire
captured = capsys.readouterr()
@@ -302,7 +305,7 @@ def test_yaml_content_negotiation(api):
def test_websocket_404(api):
"""Lines 308-310: WebSocket to unknown route gets closed."""
client = StarletteTestClient(api)
with pytest.raises(Exception):
with pytest.raises(WebSocketDisconnect):
with client.websocket_connect("ws://;/nonexistent"):
pass
@@ -325,9 +328,7 @@ def test_websocket_route_params():
pass
route = WebSocketRoute("/ws/{room_id:int}", handler)
matches, scope = route.matches(
{"type": "websocket", "path": "/ws/42"}
)
matches, scope = route.matches({"type": "websocket", "path": "/ws/42"})
assert matches is True
assert scope["path_params"] == {"room_id": 42}
@@ -591,7 +592,10 @@ def test_pydantic_schema():
from pydantic import BaseModel
api = responder.API(
title="Test", version="1.0", openapi="3.0.2", allowed_hosts=[";"],
title="Test",
version="1.0",
openapi="3.0.2",
allowed_hosts=[";"],
)
@api.schema("Pet")
@@ -611,7 +615,10 @@ def test_pydantic_request_response_models():
from pydantic import BaseModel
api = responder.API(
title="Test", version="1.0", openapi="3.0.2", allowed_hosts=[";"],
title="Test",
version="1.0",
openapi="3.0.2",
allowed_hosts=[";"],
)
class ItemIn(BaseModel):
@@ -623,8 +630,7 @@ def test_pydantic_request_response_models():
name: str
price: float
@api.route("/items", methods=["POST"],
request_model=ItemIn, response_model=ItemOut)
@api.route("/items", methods=["POST"], request_model=ItemIn, response_model=ItemOut)
async def create(req, resp):
data = await req.media()
resp.media = {"id": 1, **data}
@@ -660,3 +666,49 @@ def test_templates_context(tmp_path):
result = templates.render("test.html")
assert "hello" in result
assert "world" in result
def test_static_file_serving(tmp_path):
"""Verify static files are served correctly from the static directory."""
static_dir = tmp_path / "static"
static_dir.mkdir()
(static_dir / "style.css").write_text("body { color: red; }")
(static_dir / "app.js").write_text("console.log('hello');")
api = responder.API(
static_dir=str(static_dir),
static_route="/static",
allowed_hosts=[";"],
)
# CSS file served with correct content
r = api.requests.get("http://;/static/style.css")
assert r.status_code == 200
assert "body { color: red; }" in r.text
assert "text/css" in r.headers.get("content-type", "")
# JS file served with correct content
r = api.requests.get("http://;/static/app.js")
assert r.status_code == 200
assert "console.log" in r.text
# Missing file returns 404
r = api.requests.get("http://;/static/missing.txt")
assert r.status_code == 404
def test_static_index_fallback(tmp_path):
"""Verify static index.html is served as default route."""
static_dir = tmp_path / "static"
static_dir.mkdir()
(static_dir / "index.html").write_text("<h1>SPA</h1>")
api = responder.API(
static_dir=str(static_dir),
allowed_hosts=[";"],
)
api.add_route("/", static=True)
r = api.requests.get("http://;/")
assert r.status_code == 200
assert "<h1>SPA</h1>" in r.text
+171
View File
@@ -1,4 +1,6 @@
# ruff: noqa: E402
import json
import pytest
graphene = pytest.importorskip("graphene")
@@ -17,6 +19,45 @@ def schema():
return graphene.Schema(query=Query)
@pytest.fixture
def mutation_schema():
class Query(graphene.ObjectType):
hello = graphene.String(name=graphene.String(default_value="stranger"))
def resolve_hello(self, info, name):
return f"Hello {name}"
class CreateUser(graphene.Mutation):
class Arguments:
name = graphene.String(required=True)
ok = graphene.Boolean()
name = graphene.String()
def mutate(self, info, name):
return CreateUser(ok=True, name=name)
class Mutation(graphene.ObjectType):
create_user = CreateUser.Field()
return graphene.Schema(query=Query, mutation=Mutation)
@pytest.fixture
def multi_op_schema():
class Query(graphene.ObjectType):
hello = graphene.String(name=graphene.String(default_value="stranger"))
goodbye = graphene.String(name=graphene.String(default_value="stranger"))
def resolve_hello(self, info, name):
return f"Hello {name}"
def resolve_goodbye(self, info, name):
return f"Goodbye {name}"
return graphene.Schema(query=Query)
def test_graphql_schema_query_querying(api, schema):
api.add_route("/", GraphQLView(schema=schema, api=api))
r = api.requests.get("http://;/?q={ hello }", headers={"Accept": "json"})
@@ -63,3 +104,133 @@ def test_graphql_error_response(api, schema):
api.add_route("/", GraphQLView(schema=schema, api=api))
r = api.requests.post("http://;/", json={"query": "{ nonexistent }"})
assert "errors" in r.json()
def test_graphql_variables_json(api, schema):
"""Variables passed via JSON body."""
api.add_route("/", GraphQLView(schema=schema, api=api))
r = api.requests.post(
"http://;/",
json={
"query": "query Hello($name: String!) { hello(name: $name) }",
"variables": {"name": "Alice"},
},
)
assert r.json() == {"data": {"hello": "Hello Alice"}}
def test_graphql_variables_query_param(api, schema):
"""Variables passed as JSON string in query parameter."""
api.add_route("/", GraphQLView(schema=schema, api=api))
variables = json.dumps({"name": "Bob"})
r = api.requests.get(
f"http://;/?query=query Hello($name: String!) "
f"{{ hello(name: $name) }}&variables={variables}",
headers={"Accept": "json"},
)
assert r.json() == {"data": {"hello": "Hello Bob"}}
def test_graphql_operation_name_json(api, multi_op_schema):
"""operationName selects which operation to run."""
api.add_route("/", GraphQLView(schema=multi_op_schema, api=api))
query = """
query SayHello { hello }
query SayGoodbye { goodbye }
"""
r = api.requests.post(
"http://;/",
json={
"query": query,
"operationName": "SayHello",
},
)
data = r.json()
assert data["data"]["hello"] == "Hello stranger"
def test_graphql_operation_name_query_param(api, multi_op_schema):
"""operationName via query parameter."""
api.add_route("/", GraphQLView(schema=multi_op_schema, api=api))
query = "query SayHello { hello } query SayGoodbye { goodbye }"
r = api.requests.get(
f"http://;/?query={query}&operationName=SayGoodbye",
headers={"Accept": "json"},
)
data = r.json()
assert data["data"]["goodbye"] == "Goodbye stranger"
def test_graphql_mutation(api, mutation_schema):
"""Mutations work via JSON body."""
api.add_route("/", GraphQLView(schema=mutation_schema, api=api))
r = api.requests.post(
"http://;/",
json={
"query": 'mutation { createUser(name: "Eve") { ok name } }',
},
)
data = r.json()
assert data["data"]["createUser"]["ok"] is True
assert data["data"]["createUser"]["name"] == "Eve"
def test_graphql_mutation_with_variables(api, mutation_schema):
"""Mutations with variables."""
api.add_route("/", GraphQLView(schema=mutation_schema, api=api))
r = api.requests.post(
"http://;/",
json={
"query": "mutation CreateUser($name: String!) "
"{ createUser(name: $name) { ok name } }",
"variables": {"name": "Frank"},
},
)
data = r.json()
assert data["data"]["createUser"]["ok"] is True
assert data["data"]["createUser"]["name"] == "Frank"
def test_graphql_context_access(api):
"""Resolvers can access request and response via info.context."""
class Query(graphene.ObjectType):
method = graphene.String()
def resolve_method(self, info):
return info.context["request"].method
schema = graphene.Schema(query=Query)
api.add_route("/", GraphQLView(schema=schema, api=api))
r = api.requests.post("http://;/", json={"query": "{ method }"})
assert r.json() == {"data": {"method": "post"}}
def test_graphql_malformed_query(api, schema):
"""Malformed GraphQL syntax returns errors."""
api.add_route("/", GraphQLView(schema=schema, api=api))
r = api.requests.post("http://;/", json={"query": "{ this is not valid"})
data = r.json()
assert "errors" in data
assert len(data["errors"]) > 0
def test_graphql_raw_text_query(api, schema):
"""Query sent as raw text body."""
api.add_route("/", GraphQLView(schema=schema, api=api))
r = api.requests.post(
"http://;/",
content=b"{ hello }",
headers={"Content-Type": "text/plain"},
)
assert r.json() == {"data": {"hello": "Hello stranger"}}
def test_graphql_invalid_variables_query_param(api, schema):
"""Invalid JSON in variables query param is treated as None."""
api.add_route("/", GraphQLView(schema=schema, api=api))
r = api.requests.get(
"http://;/?query={ hello }&variables=not-json",
headers={"Accept": "json"},
)
assert r.json() == {"data": {"hello": "Hello stranger"}}
+5 -7
View File
@@ -1,13 +1,10 @@
"""Tests for new features: validation, SSE, after_request, route groups, etc."""
import pytest
from pydantic import BaseModel
from starlette.testclient import TestClient as StarletteTestClient
import responder
from responder.ext.ratelimit import RateLimiter
# --- Pydantic auto-validation ---
@@ -42,7 +39,9 @@ def test_pydantic_request_validation():
assert "errors" in r.json()
# Invalid request — wrong type
r = api.requests.post("http://;/items", json={"name": "widget", "price": "not_a_number"})
r = api.requests.post(
"http://;/items", json={"name": "widget", "price": "not_a_number"}
)
assert r.status_code == 422
@@ -50,8 +49,7 @@ def test_pydantic_response_serialization():
"""Auto-serialize response through response_model."""
api = responder.API(allowed_hosts=[";"])
@api.route("/items", methods=["POST"],
request_model=ItemIn, response_model=ItemOut)
@api.route("/items", methods=["POST"], request_model=ItemIn, response_model=ItemOut)
async def create(req, resp):
data = await req.media()
# Include an extra field that should be stripped by the model
@@ -257,7 +255,7 @@ def test_rate_limiter():
def view(req, resp):
resp.text = "ok"
for i in range(3):
for _i in range(3):
r = api.requests.get("http://;/")
assert r.status_code == 200
assert "X-RateLimit-Remaining" in r.headers
+5 -2
View File
@@ -546,14 +546,17 @@ def test_documentation(needs_openapi):
assert "html" in r.text
def test_mount_wsgi_app(api, flask):
def test_mount_wsgi_app(flask):
# Use localhost so Werkzeug's trusted-host check accepts the request.
api = responder.API(allowed_hosts=["localhost"])
@api.route("/")
def hello(req, resp):
resp.text = "hello"
api.mount("/flask", flask)
r = api.requests.get("http://;/flask")
r = api.requests.get("http://localhost/flask")
assert r.status_code < 300