Compare commits

...

31 Commits

Author SHA1 Message Date
kennethreitz b678542192 Bump version to 3.6.2 and update changelog
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 18:11:52 -04:00
kennethreitz 079e7ae3e5 Simplify status code category check
Replace all([(x >= y), (x < z)]) with chained comparison x <= y < z.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 18:10:09 -04:00
kennethreitz 9d4a9d5083 Fix potential XSS in GraphiQL template
The endpoint URL was inserted into a JS string literal with single
quotes. A crafted endpoint containing a single quote could break out.
Now uses Jinja2's tojson filter for proper escaping.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 18:09:45 -04:00
kennethreitz 79c32fcef4 Remove dead or-empty-string in format detection
The \`or \"\"\` in \`\"yaml\" in self.mimetype or \"\"\` was a no-op that
made the logic harder to read. The \`in\` check on mimetype already
returns a bool — \`or \"\"\` just evaluates to that bool.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 18:09:21 -04:00
kennethreitz 52896c6040 Remove unused Node.js setup from CI workflow
The test workflow installed Node 22 but never used it.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 18:08:57 -04:00
kennethreitz def071fc71 Add missing methods to CaseInsensitiveDict
Added __delitem__, pop, and setdefault overrides so all dict operations
go through case-insensitive key normalization.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 18:08:45 -04:00
kennethreitz f194efecae Replace assert with proper exceptions in OpenAPI extension
assert statements are stripped with python -O. Use ValueError for
duplicate schema registration and RuntimeError for missing static
route configuration.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 18:08:16 -04:00
kennethreitz 868f84fc8b Remove unused method parameter from load_target()
The parameter was accepted but never used — the function always returns
the entrypoint directly without calling any method on it.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 18:07:46 -04:00
kennethreitz ea3b6a6e4a Fix test assertions that could never fail
- test_background_task_exception: `or True` made assert always pass
- test_form_uploads_without_multipart: `or r.status_code < 500` masked
  wrong expected value (dict() of QueryDict returns lists, not scalars)
- test_graphql_text_query: `< 500` replaced with exact `== 200`

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 18:07:11 -04:00
kennethreitz 0b16558b04 Fix Pydantic model validation assuming dict body
Request validation would crash with **body if the body was a list.
Response validation would crash the same way. Now checks isinstance
before unpacking. Also added DELETE to request validation methods.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 18:06:17 -04:00
kennethreitz 2440d4caed Narrow WSGI fallback to only catch call signature TypeErrors
The bare except TypeError when calling mounted apps would silently
swallow any TypeError raised inside an ASGI app and incorrectly
convert it to WSGI mode. Now only falls back when the error is
about call arguments.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 18:05:46 -04:00
kennethreitz e653a9f1fd Fix race condition in rate limiter
Cleanup, length check, and append were separate non-atomic steps.
Under concurrent requests a client could exceed the limit. Added a
threading lock around the critical section.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 18:05:18 -04:00
kennethreitz c1fe6e11bd Fix memory leak in rate limiter
Empty bucket keys were never removed after their timestamps expired.
Over time this accumulated an entry for every unique client IP that
ever made a request.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 18:04:33 -04:00
kennethreitz 06b7bae7c0 Fix blocking file I/O in Response.stream_file()
The async generator was using synchronous open() and read() which
blocks the event loop. Switched to anyio.open_file() for proper
async file I/O.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 18:04:10 -04:00
kennethreitz 2494034111 Fix before_requests default type mismatch in Route and WebSocketRoute
The default value for before_requests was [] (list) but the code calls
.get() on it which is a dict method. Changed default to match the
expected dict structure {"http": [], "ws": []}.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 18:03:45 -04:00
kennethreitz 07cfa66e5c Fix OpenAPI docs UI using hardcoded schema URL
The docs template always fetched from /schema.yml regardless of the
user's openapi_route setting. Now uses self.openapi_route so custom
schema paths (e.g. /openapi.json) work correctly.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 18:03:15 -04:00
kennethreitz a5d38cf9c3 Fix GraphQL status code never being applied to response
graphql_response() computed status_code (200 or 400) but only returned
it in an unused tuple instead of setting resp.status_code. All GraphQL
error responses were returning 200.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 18:02:53 -04:00
kennethreitz 2f5e46e233 Bump version to 3.6.1 and update changelog
Add configurable GZip compression via `gzip` parameter on API().

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 17:52:35 -04:00
Andreas Motl c1d789f279 CI: Use uv exclusively (#606)
## About
That's just maintenance. In this case, streamline the CI configuration
for improved ergonomics and future proofing, see GH-605.

## Details
- `activate-environment: true` of newer `astral-sh/setup-uv@v7` is the
killer feature here, which wasn't available before.
- Configuring `cache-dependency-glob: pyproject.toml` isn't needed,
because the recipe already employs an excellent default configuration
that includes this and other project metadata files.
- The adjustments in `test_cli` were needed because of a different PyPy
version installed by `setup-uv`. This will yield a report to Astral
maintainers, unrelated to this patch.
2026-03-25 19:48:54 -04:00
kennethreitz 691f6b4d5c Bump version to 3.6.0 and update changelog
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 16:24:03 -04:00
kennethreitz 90a082a0ac Add structured logging with per-request context (#604)
## Summary

New `enable_logging=True` parameter on `responder.API()` that provides
structured, request-scoped logging using stdlib `logging` and
`contextvars`.

### What it does

- **`api.log`** — always available on every API instance. Works as a
plain logger by default; gains per-request context enrichment with
`enable_logging=True`
- **Per-request context** — every log message automatically includes
request ID, HTTP method, path, and client IP
- **Access logging** — logs every request with timing: `GET /path → 200
(1.2ms)`
- **Request ID** — generates or forwards `X-Request-ID` headers
(supersedes `request_id=True` when both are set)
- **stdlib logging** — works with any existing handler, formatter, or
log aggregator

### Usage

```python
# api.log always works — no setup required
api = responder.API()
api.log.info("starting up")  # plain logger, no context

# With enable_logging=True, log messages get request context automatically
api = responder.API(enable_logging=True)

@api.route("/")
def index(req, resp):
    api.log.info("handling request")
    # => 2026-03-24 12:00:00 [INFO] responder.app — handling request [GET /] [req:abc123] [client:127.0.0.1]
```

For additional loggers in helper modules:

```python
from responder.ext.logging import get_logger
logger = get_logger("myapp.db")
```

### Architecture

- `responder/ext/logging.py` — self-contained module with:
- `LoggingMiddleware` — pure ASGI middleware that sets contextvars and
logs access
- `RequestContextFilter` — logging filter that injects context into
records
  - `RequestContext` — read-only access to current request metadata
  - `get_logger()` / `setup_logging()` — convenience functions
- `api.log` — always a valid logger; context-aware when
`enable_logging=True`, plain stdlib logger otherwise
- Wired into `API.__init__` via the `enable_logging` parameter

### Files

- `responder/ext/logging.py` — new module
- `responder/api.py` — added `enable_logging` parameter and `api.log`
- `tests/test_logging.py` — 9 tests
- `docs/source/tour.rst` — new Structured Logging section
- `docs/source/index.rst` — added to feature list

## Test plan
- [x] 9 logging tests pass
- [x] Full suite: 208 passed

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 16:22:57 -04:00
kennethreitz 5ee0de6458 Replace HTTP/2 server push with dependency injection in backlog
HTTP/2 server push was removed from the spec and dropped by browsers.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 16:02:56 -04:00
kennethreitz 1c729c8542 Docs: second improvement pass (#603)
## Summary

- **Auth tutorial**: fix deprecated `datetime.utcnow()` →
`datetime.now(timezone.utc)`, add role-based access control section, add
auth strategy comparison guide
- **WebSocket tutorial**: use `WebSocketDisconnect` instead of bare
`Exception` in chat example, add connection lifecycle section with close
codes, add rejected connection test example
- **SQLAlchemy tutorial**: modernize from `Column()` to
`mapped_column()` / `Mapped[]` (SQLAlchemy 2.0 style with type checker
support)
- **Deployment**: use `uv` in Docker example instead of pip, fix stale
`uv.lock` reference in production checklist
- **Quickstart**: link to all tutorials in "next steps" (was only
linking to 3 of 9)
- **Sandbox**: rewrite with project layout tree, mypy command, and
pattern-matching test examples

## Test plan
- [x] `make html` builds cleanly
- [x] All 199 tests pass

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 16:00:56 -04:00
kennethreitz 536428a787 Remove Unreleased section from changelog
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 15:54:13 -04:00
kennethreitz 3d5f3c7e93 Improve documentation across the board (#602)
## Summary

- **Deployment guide**: health check endpoint, Docker Compose example,
Caddy reverse proxy config, Procfile pattern, production checklist
- **API reference**: quick usage examples for every class (API, Request,
Response, RouteGroup, BackgroundQueue, RateLimiter, status helpers)
- **Feature tour**: new Pydantic validation and content negotiation
sections, expanded MessagePack docs
- **Testing guide**: rate limiting and mounted WSGI app test examples,
Werkzeug 3.1.7 tip
- **Middleware tutorial**: pure ASGI middleware example (no
BaseHTTPMiddleware dependency)
- **CLI guide**: environment variables section (PORT, SECRET_KEY)
- **Homepage**: updated feature list with SSE, rate limiting, Pydantic,
content negotiation, route groups
- **Backlog**: removed already-implemented items, added current ideas

+362 lines of docs, no code changes.

## Test plan
- [x] `make html` builds cleanly with no warnings
- [x] All 199 tests pass

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Signed-off-by: Kenneth Reitz <me@kennethreitz.org>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-authored-by: Andreas Motl <andreas.motl@panodata.org>
2026-03-24 15:51:55 -04:00
kennethreitz e0cce231ea Update CLAUDE.md to reflect lock file status
Signed-off-by: Kenneth Reitz <me@kennethreitz.org>
2026-03-24 15:42:14 -04:00
Andreas Motl 44c33475b2 Remove dependency lock file uv.lock. This is a library. (#601)
## About
We think using an `uv.lock` file is only applicable for applications,
and otherwise gives wrong impressions and expectations.

## References
- https://discuss.python.org/t/the-purpose-of-a-lock-file/38756
- https://github.com/orgs/python-poetry/discussions/7847
2026-03-24 15:35:41 -04:00
kennethreitz f4a292108b Move version below tagline in docs sidebar
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 15:34:49 -04:00
kennethreitz 25ea333ad4 Prefix version with v in docs sidebar
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 15:33:12 -04:00
kennethreitz 6279835040 Show version number in docs sidebar
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 15:32:48 -04:00
kennethreitz 0e493ad8d1 Add CLAUDE.md and /release command
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 15:31:42 -04:00
35 changed files with 1192 additions and 3561 deletions
+42
View File
@@ -0,0 +1,42 @@
Release a new version of responder to PyPI and GitHub.
Usage: /release <version> (e.g. /release 3.6.0)
If no version is provided, ask the user what version to release.
## Steps
1. **Verify clean state**: Run `git status` and ensure the working tree is clean. If not, stop and ask the user.
2. **Run tests**: Run `uv run pytest -x --no-header -q`. If any fail, stop and report.
3. **Bump version**: Update `responder/__version__.py` to the new version.
4. **Update changelog**:
- Run `git log --oneline $(git describe --tags --abbrev=0)..HEAD` to get commits since last release.
- Add a new section in `CHANGELOG.md` under `## [Unreleased]` with the date, categorized into Added/Changed/Fixed/Removed.
- Update the compare links at the bottom of the file.
5. **Lock deps**: Run `uv lock`.
6. **Commit**: Stage `responder/__version__.py`, `CHANGELOG.md`, and `uv.lock`. Commit with message `Bump version to X.Y.Z and update changelog`.
7. **Push and tag**:
```
git push
git tag vX.Y.Z
git push origin vX.Y.Z
```
8. **GitHub release**: Create a release with `gh release create` including highlights and a link to the full changelog.
9. **Build and publish**:
```
uv build
uvx twine upload dist/responder-X.Y.Z*
```
Note: This requires a PyPI token. If twine fails due to auth, tell the user to set `TWINE_USERNAME=__token__` and `TWINE_PASSWORD` and re-run, or run `! uvx twine upload dist/responder-X.Y.Z*` interactively.
10. **Update GitHub release**: Edit the release to add a link to the PyPI page: `https://pypi.org/project/responder/X.Y.Z/`
11. **Report**: Print a summary with links to the GitHub release and PyPI page.
+4 -12
View File
@@ -19,24 +19,16 @@ jobs:
documentation:
name: "Documentation"
runs-on: ubuntu-latest
env:
UV_SYSTEM_PYTHON: true
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
- name: Install uv
uses: astral-sh/setup-uv@v7
with:
python-version: "3.13"
- name: Set up uv
uses: astral-sh/setup-uv@v5
with:
version: "latest"
activate-environment: true
enable-cache: true
cache-dependency-glob: |
pyproject.toml
python-version: "3.14"
- name: Install package and documentation dependencies
run: uv pip install '.[docs]'
+4 -15
View File
@@ -28,28 +28,17 @@ jobs:
"3.14t",
"pypy3.11",
]
env:
UV_SYSTEM_PYTHON: true
steps:
- uses: actions/checkout@v4
- uses: actions/setup-node@v4
with:
node-version: 22
- name: Set up Python
uses: actions/setup-python@v5
- name: Install uv
uses: astral-sh/setup-uv@v7
with:
python-version: ${{ matrix.python-version }}
- name: Set up uv
uses: astral-sh/setup-uv@v5
with:
version: "latest"
activate-environment: true
enable-cache: true
cache-suffix: ${{ matrix.python-version }}
cache-dependency-glob: |
pyproject.toml
python-version: ${{ matrix.python-version }}
- name: Install package
run: uv pip install '.[develop,test]'
+1
View File
@@ -8,6 +8,7 @@
.DS_Store
coverage.xml
.coverage*
*.lock
__pycache__
tests/__pycache__
+68 -3
View File
@@ -5,7 +5,71 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and
this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
## [v3.6.2] - 2026-04-12
### Fixed
- GraphQL error responses now correctly return 400 status instead of always 200
- OpenAPI docs UI now respects custom `openapi_route` instead of hardcoding `/schema.yml`
- `before_requests` default type mismatch that could crash routes called outside the router
- Blocking synchronous file I/O in `Response.stream_file()` — now uses async I/O via anyio
- Memory leak in rate limiter (empty bucket keys never cleaned up)
- Race condition in rate limiter `check()` — added thread-safe locking
- WSGI fallback catching all `TypeError`s instead of just call-signature mismatches
- Pydantic request/response model validation crashing on non-dict bodies
- Test assertions that could never fail (`or True`, `< 500` patterns)
- `CaseInsensitiveDict` missing `__delitem__`, `pop`, and `setdefault` overrides
- `assert` used for input validation in OpenAPI extension (stripped by `python -O`)
- Potential XSS in GraphiQL template endpoint injection
- Dead `or ""` in media format detection logic
### Changed
- `DELETE` requests now participate in Pydantic request body validation
- Simplified status code category check to use chained comparison
### Removed
- Unused `method` parameter from `load_target()`
- Unused Node.js setup step from CI test workflow
## [v3.6.1] - 2026-04-12
### Added
- Configurable GZip compression via `gzip` parameter on `API()` (defaults to `True`)
## [v3.6.0] - 2026-03-24
### Added
- Built-in structured logging with per-request context (`enable_logging=True`)
- `api.log` — always-available logger, enriched with request context when logging is enabled
- Automatic access logging with timing: `GET /path → 200 (1.2ms)`
- Request ID generation/forwarding via `X-Request-ID` header
- `contextvars`-based request context (ID, method, path, client IP) on every log record
- `responder.ext.logging` module: `get_logger()`, `RequestContext`, `RequestContextFilter`
- CLAUDE.md project guide and `/release` command
- Version number in docs sidebar
### Changed
- Comprehensive documentation improvements across all pages
- Deployment: health checks, Docker Compose, Caddy, Procfile, production checklist
- API reference: usage examples for every class
- Feature tour: Pydantic validation, content negotiation, structured logging sections
- Tutorials: modernized SQLAlchemy to `mapped_column()`, fixed deprecated `datetime.utcnow()`,
WebSocket `WebSocketDisconnect` handling, role-based auth, auth strategy guide
- Testing: rate limiting and WSGI mount examples
- Middleware: pure ASGI middleware example
- Quickstart: links to all tutorials
- Sandbox: full rewrite with project layout
- Docker example uses `uv` instead of pip
- Backlog updated: removed implemented features, replaced HTTP/2 server push with dependency injection
### Removed
- `uv.lock` — this is a library, not an application
## [v3.5.0] - 2026-03-24
@@ -14,7 +78,6 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- CI validation for Python 3.14, 3.14 free-threaded, and PyPy 3.11
- Marimo notebook mounting docs and example
- Type annotations for `routes.py`
- `uv.lock` for reproducible installs
### Changed
@@ -462,7 +525,9 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Conception!
[unreleased]: https://github.com/kennethreitz/responder/compare/v3.5.0..HEAD
[v3.6.2]: https://github.com/kennethreitz/responder/compare/v3.6.1..v3.6.2
[v3.6.1]: https://github.com/kennethreitz/responder/compare/v3.6.0..v3.6.1
[v3.6.0]: https://github.com/kennethreitz/responder/compare/v3.5.0..v3.6.0
[v3.5.0]: https://github.com/kennethreitz/responder/compare/v3.4.0..v3.5.0
[v3.4.0]: https://github.com/kennethreitz/responder/compare/v3.3.0..v3.4.0
[v3.3.0]: https://github.com/kennethreitz/responder/compare/v3.2.0..v3.3.0
+44
View File
@@ -0,0 +1,44 @@
# Responder
A familiar HTTP Service Framework for Python, by Kenneth Reitz.
## Commands
- **Tests**: `uv run pytest` (runs full suite with coverage)
- **Single test**: `uv run pytest tests/test_responder.py::test_name -xvs`
- **Lint**: `uv run ruff check .`
- **Type check**: `uv run mypy`
- **Build docs**: `cd docs && uv run make html`
- **Build package**: `uv build`
- **Lock deps**: `uv lock`
## Architecture
- `responder/api.py` — Main `API` class, the entry point for all apps
- `responder/routes.py``Router`, `Route`, `WebSocketRoute` dispatch
- `responder/models.py``Request` and `Response` wrappers around Starlette
- `responder/ext/` — Extensions: CLI, GraphQL, OpenAPI, rate limiting
- `responder/background.py` — Background task queue
- `responder/formats.py` — Content negotiation (JSON, YAML, msgpack)
- `responder/__version__.py` — Single source of truth for version string
## Conventions
- Python 3.10+ only. Use `from __future__ import annotations` where present.
- Use `inspect.iscoroutinefunction` (not `asyncio.iscoroutinefunction`).
- Tests use `api.requests` (Starlette TestClient) with `allowed_hosts=[";"]` or `["localhost"]`.
- Werkzeug 3.1.7+ rejects invalid Host headers — use `localhost` when mounting WSGI apps in tests.
- Version is in `responder/__version__.py`, bump it there.
- Changelog follows [Keep a Changelog](https://keepachangelog.com/) format in `CHANGELOG.md`.
- Compare links at the bottom of CHANGELOG.md must be updated when adding a release.
- All deps managed via `uv`. Lock file (`uv.lock`) is not committed.
## Release Process
1. Bump version in `responder/__version__.py`
2. Add changelog entry in `CHANGELOG.md` (update compare links too)
3. `uv lock` to refresh the lock file
4. Commit: `Bump version to X.Y.Z and update changelog`
5. `git tag vX.Y.Z && git push && git push origin vX.Y.Z`
6. `gh release create vX.Y.Z --title "vX.Y.Z" --notes "..."`
7. `uv build && uvx twine upload dist/responder-X.Y.Z*`
+2
View File
@@ -5,6 +5,8 @@
</p>
<p>
<strong>Responder</strong> — a familiar HTTP service framework for Python.
<br />
<small>v{{ version }}</small>
</p>
<h3>Useful Links</h3>
<ul>
+87 -4
View File
@@ -12,6 +12,20 @@ The central object of every Responder application. It holds your routes,
middleware, templates, and configuration. Create one at the top of your
module and use it to define your entire web service.
Quick example::
import responder
api = responder.API(
title="My Service", # OpenAPI title
version="1.0", # OpenAPI version
openapi="3.0.2", # enable OpenAPI
docs_route="/docs", # Swagger UI at /docs
cors=True, # enable CORS
secret_key="change-me", # session signing key
allowed_hosts=["example.com"],
)
.. module:: responder
.. autoclass:: API
@@ -28,6 +42,27 @@ parameters, the request body, cookies, and more.
Most properties are synchronous, but reading the body requires ``await``
because it involves I/O.
Common patterns::
# Headers (case-insensitive)
token = req.headers.get("Authorization")
# Query parameters: /search?q=python&page=2
query = req.params["q"]
# JSON body
data = await req.media()
# Form data
form = await req.media("form")
# File uploads
files = await req.media("files")
# Client info
ip, port = req.client
is_https = req.is_secure
.. autoclass:: Request
:inherited-members:
@@ -39,6 +74,19 @@ The response object is passed into every view as the second argument.
Mutate it to control what gets sent back to the client — the body,
status code, headers, and cookies.
Common patterns::
resp.text = "plain text" # text/plain
resp.html = "<h1>Hello</h1>" # text/html
resp.media = {"key": "value"} # application/json
resp.content = b"raw bytes" # application/octet-stream
resp.file("path/to/file.pdf") # auto content-type
resp.stream_file("large/export.csv") # streamed
resp.status_code = 201
resp.headers["X-Custom"] = "value"
resp.cookies["session"] = "abc123"
.. autoclass:: Response
:inherited-members:
@@ -47,7 +95,13 @@ Route Groups
------------
Group related routes under a shared URL prefix — useful for API versioning
and organizing large applications.
and organizing large applications::
v1 = api.group("/v1")
@v1.route("/users")
def list_users(req, resp):
resp.media = []
.. autoclass:: responder.api.RouteGroup
:members:
@@ -57,7 +111,19 @@ Background Queue
----------------
Run tasks in background threads without blocking the response. Available
as ``api.background``.
as ``api.background``::
@api.route("/submit")
async def submit(req, resp):
data = await req.media()
@api.background.task
def process(data):
# runs in a thread pool
...
process(data)
resp.media = {"status": "accepted"}
.. autoclass:: responder.background.BackgroundQueue
:members:
@@ -67,6 +133,8 @@ Query Dict
----------
A dictionary subclass for query string parameters with multi-value support.
Behaves like a normal dict for single values, but supports ``getlist()``
for parameters that appear multiple times (e.g. ``?tag=a&tag=b``).
.. autoclass:: responder.models.QueryDict
:members:
@@ -76,7 +144,15 @@ Rate Limiter
------------
In-memory token bucket rate limiter. Limits requests per client IP address
and returns ``429 Too Many Requests`` when exceeded.
and returns ``429 Too Many Requests`` when exceeded::
from responder.ext.ratelimit import RateLimiter
limiter = RateLimiter(requests=100, period=60) # 100 req/min
limiter.install(api)
Response headers: ``X-RateLimit-Limit``, ``X-RateLimit-Remaining``,
and ``Retry-After`` (when limited).
.. autoclass:: responder.ext.ratelimit.RateLimiter
:members:
@@ -86,7 +162,14 @@ Status Code Helpers
-------------------
Convenience functions for checking which category a status code falls
into. Useful in middleware and after-request hooks.
into. Useful in middleware and after-request hooks::
from responder.status_codes import is_200, is_400, is_500
@api.after_request()
def log_errors(req, resp):
if is_400(resp.status_code) or is_500(resp.status_code):
print(f"Error: {req.method} {req.url.path} -> {resp.status_code}")
.. autofunction:: responder.status_codes.is_100
+5 -4
View File
@@ -1,7 +1,8 @@
# Backlog
## Future Ideas
- Consider adding `after_request` hooks (complement to `before_request`)
- Explore WebSocket before_request short-circuit support
- Add rate limiting middleware
- Consider async template rendering by default
- WebSocket before_request short-circuit support (reject before accept)
- Per-route rate limiting (different limits for different endpoints)
- Built-in structured logging with request context
- OpenAPI 3.1 support
- Dependency injection for route handlers
+19
View File
@@ -71,6 +71,25 @@ For URLs, use a fragment::
$ responder run https://example.com/app.py#service
Environment Variables
---------------------
Responder automatically reads the ``PORT`` environment variable at
runtime:
- ``PORT`` — bind to ``0.0.0.0`` on this port (cloud platform convention)
When ``PORT`` is set, the server binds to all interfaces automatically.
This is how cloud platforms like Fly.io, Railway, and Heroku inject the
listen port.
For other settings like ``SECRET_KEY``, read them in your application
code and pass them to ``responder.API()``::
import os
api = responder.API(secret_key=os.environ["SECRET_KEY"])
Building Frontend Assets
-------------------------
+88 -4
View File
@@ -30,8 +30,9 @@ Here's a minimal Dockerfile::
FROM python:3.13-slim
WORKDIR /app
COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv
COPY . .
RUN pip install responder
RUN uv pip install --system responder
ENV PORT=80
EXPOSE 80
CMD ["python", "api.py"]
@@ -42,9 +43,10 @@ Build and run::
$ docker run -p 8000:80 myapi
The ``python:3.13-slim`` image is about 150MB — small enough for fast
deploys but includes everything you need. For even smaller images, you
can use ``python:3.13-alpine``, though some packages may need extra
build dependencies.
deploys but includes everything you need. Using ``uv`` for installs
is significantly faster than pip. For even smaller images, you can use
``python:3.13-alpine``, though some packages may need extra build
dependencies.
Cloud Platforms
@@ -67,6 +69,27 @@ The pattern is always the same: deploy your code, set the start command
to ``python api.py``, and the platform handles the rest.
Health Check Endpoint
---------------------
Every production deployment needs a health check — a lightweight endpoint
that monitoring tools, load balancers, and orchestrators can poll to verify
your service is running::
@api.route("/health")
def health(req, resp):
resp.media = {"status": "healthy"}
Keep it simple. Don't query the database or do expensive work — the health
check should return instantly. Cloud platforms, Docker, and Kubernetes all
look for an HTTP 200 to confirm your service is alive.
For Docker, add a ``HEALTHCHECK`` instruction::
HEALTHCHECK --interval=30s --timeout=3s \
CMD curl -f http://localhost/health || exit 1
Uvicorn Directly
----------------
@@ -82,6 +105,45 @@ Uvicorn supports many options — SSL certificates, access logging, graceful
shutdown timeouts, and more. See the
`uvicorn documentation <https://www.uvicorn.org/deployment/>`_ for details.
For platforms like Heroku or Railway that use a ``Procfile``::
web: uvicorn api:api --host 0.0.0.0 --port $PORT --workers 4
Docker Compose
--------------
For local development with databases and other services, Docker Compose
ties everything together::
# docker-compose.yml
services:
api:
build: .
ports:
- "5042:80"
environment:
- PORT=80
- DATABASE_URL=postgresql+asyncpg://user:pass@db/myapp
- SECRET_KEY=dev-secret
depends_on:
- db
db:
image: docker.io/postgres:16-alpine
environment:
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
POSTGRES_DB: myapp
volumes:
- pgdata:/var/lib/postgresql/data
volumes:
pgdata:
Run with ``docker compose up``. The API waits for ``db`` to start, then
connects using the ``DATABASE_URL`` environment variable.
Reverse Proxy
-------------
@@ -95,9 +157,31 @@ front of your application for:
- **Static asset serving** — offload static files to the proxy
- **Rate limiting** — at the infrastructure level
A minimal Caddy config that handles HTTPS automatically::
# Caddyfile
example.com {
reverse_proxy localhost:5042
}
Responder's ``TrustedHostMiddleware`` and ``HTTPSRedirectMiddleware`` work
correctly behind proxies that set standard forwarding headers
(``X-Forwarded-For``, ``X-Forwarded-Proto``).
That said, uvicorn is production-ready on its own. Many applications run
uvicorn directly without a reverse proxy and do just fine.
Production Checklist
--------------------
Before going live:
- **Set a secret key**``SECRET_KEY`` env var, never the default
- **Disable debug mode**``DEBUG=false`` or omit it entirely
- **Set allowed hosts** — restrict to your actual domain names
- **Use multiple workers**``--workers 4`` or more, depending on CPU cores
- **Add a health check**``/health`` endpoint for monitoring
- **Enable HTTPS** — via your proxy, cloud platform, or uvicorn's ``--ssl-*`` flags
- **Set up logging** — uvicorn logs requests by default; pipe them to your log aggregator
- **Pin your dependencies** — use a lock file or pinned requirements for reproducible deploys
+7 -2
View File
@@ -59,21 +59,26 @@ What You Get
One ``pip install``, batteries included:
- Pydantic request validation and response serialization.
- Mount Flask, Django, or any WSGI/ASGI app at a subroute.
- Gzip compression, HSTS, CORS, and trusted host validation.
- Before-request hooks that can short-circuit for auth guards.
- Before-request and after-request hooks for auth and logging.
- A test client for fast, in-process testing with pytest.
- Route parameters with f-string syntax and type convertors.
- Lifespan context managers for startup and shutdown logic.
- Custom exception handlers for clean error responses.
- `GraphQL`_ with Graphene and a built-in GraphiQL IDE.
- Server-Sent Events for real-time streaming.
- File serving with automatic content-type detection.
- Sync and async views — ``async`` is always optional.
- Class-based views with ``on_get``, ``on_post``, ``on_request``.
- Built-in rate limiting with ``X-RateLimit`` headers.
- Structured logging with per-request context.
- Content negotiation: JSON, YAML, and MessagePack.
- A pleasant API with a single import statement.
- OpenAPI schema generation with Swagger UI.
- A production `uvicorn`_ server, ready to deploy.
- HTTP method filtering for REST APIs.
- Route groups for API versioning.
- Signed cookie-based sessions.
- Background tasks in a thread pool.
- WebSocket support.
+6
View File
@@ -376,3 +376,9 @@ jump into the tutorials:
- :doc:`tutorial-rest` — build a full CRUD API with validation
- :doc:`tutorial-sqlalchemy` — connect to a database
- :doc:`tutorial-auth` — add authentication
- :doc:`tutorial-websockets` — real-time communication
- :doc:`tutorial-middleware` — hooks and middleware
- :doc:`tutorial-flask` — migrating from Flask
- :doc:`guide-config` — environment variables and secrets
- :doc:`deployment` — Docker, cloud platforms, and production
- :doc:`testing` — writing tests with pytest
+43 -17
View File
@@ -2,35 +2,61 @@
# Development Sandbox
## Setup
Set up a development sandbox.
Acquire sources and create virtualenv.
Clone the repo and install all dependencies:
```shell
git clone https://github.com/kennethreitz/responder.git
cd responder
uv venv
```
Install project in editable mode, including
all development tools.
```shell
uv venv && source .venv/bin/activate
uv pip install --upgrade --editable '.[develop,docs,release,test]'
```
## Operations
Run tests.
## Running Tests
```shell
source .venv/bin/activate
pytest
pytest # full suite with coverage
pytest tests/test_responder.py -xvs # single file, stop on first failure
pytest -k "test_mount" # run tests matching a pattern
```
Format code.
## Code Formatting
```shell
ruff format .
ruff check --fix .
ruff format . # auto-format
ruff check --fix . # lint and auto-fix
```
Documentation authoring.
## Type Checking
```shell
sphinx-autobuild --open-browser --watch docs/source docs/source docs/build
mypy
```
## Documentation
Live-reloading doc server (opens in browser):
```shell
cd docs
sphinx-autobuild --open-browser --watch source source build
```
Or build once:
```shell
cd docs
make html
# open build/html/index.html
```
## Project Layout
```
responder/
├── responder/ # main package
│ ├── api.py # API class — the entry point
│ ├── routes.py # Router, Route, WebSocketRoute
│ ├── models.py # Request and Response wrappers
│ ├── ext/ # extensions (CLI, GraphQL, OpenAPI, rate limiting)
│ ├── background.py # background task queue
│ └── formats.py # content negotiation (JSON, YAML, msgpack)
├── tests/ # pytest test suite
├── examples/ # runnable example apps
├── docs/source/ # Sphinx documentation
└── pyproject.toml # project metadata and tool config
```
+54
View File
@@ -269,6 +269,57 @@ just like in production. You can verify their effects on the response::
assert r.headers["X-Served-By"] == "responder"
Testing Rate Limiting
---------------------
Rate limiters are just hooks — they run automatically during tests.
Verify the headers and the 429 response::
from responder.ext.ratelimit import RateLimiter
def test_rate_limiting():
api = responder.API(allowed_hosts=["localhost"])
limiter = RateLimiter(requests=2, period=60)
limiter.install(api)
@api.route("/")
def view(req, resp):
resp.text = "ok"
# First two requests succeed
for _ in range(2):
r = api.requests.get("http://localhost/")
assert r.status_code == 200
assert "X-RateLimit-Remaining" in r.headers
# Third request is rate limited
r = api.requests.get("http://localhost/")
assert r.status_code == 429
Testing Mounted Apps
--------------------
When testing WSGI apps mounted at a subroute, use ``localhost`` as the
host to avoid Werkzeug's trusted host validation::
from flask import Flask
def test_flask_mount():
api = responder.API(allowed_hosts=["localhost"])
flask_app = Flask(__name__)
@flask_app.route("/")
def hello():
return "Hello from Flask!"
api.mount("/flask", flask_app)
r = api.requests.get("http://localhost/flask")
assert r.status_code == 200
assert "Hello from Flask" in r.text
Tips
----
@@ -284,3 +335,6 @@ Tips
- **Test the contract, not the implementation.** Assert on status codes,
response bodies, and headers — not on internal state.
- **Use ``localhost`` for mounted WSGI apps.** Werkzeug 3.1.7+ validates
the ``Host`` header, so avoid synthetic hosts like ``;`` in tests.
+127 -2
View File
@@ -596,6 +596,127 @@ can pace themselves.
The rate limiter is per-client, keyed by IP address.
Structured Logging
------------------
Production applications need structured, searchable logs. Responder
includes built-in logging that automatically attaches request context
— request ID, HTTP method, path, and client IP — to every log message
emitted during request handling::
api = responder.API(enable_logging=True)
This gives you:
- **Access logging** with timing for every request::
2026-03-24 12:00:00 [INFO] responder.access — GET /users → 200 (1.2ms)
- **A logger on the API instance** — use ``api.log`` anywhere in
your routes. Request context (ID, method, path, client IP) is
attached automatically::
@api.route("/users/{user_id:int}")
def get_user(req, resp, *, user_id):
api.log.info("fetching user %d", user_id)
# => [INFO] responder.app -- fetching user 42 [GET /users/42] [req:a1b2c3] [client:10.0.0.1]
resp.media = {"id": user_id}
- **Request IDs** generated automatically (or forwarded from the
``X-Request-ID`` header) and included in responses.
The logging uses Python's standard ``logging`` module, so it works with
any handler — files, syslog, JSON formatters, Datadog, Sentry, whatever
you already use.
For additional loggers (e.g. in helper modules), use ``get_logger``::
from responder.ext.logging import get_logger
logger = get_logger("myapp.db")
You can also access the current request context directly::
from responder.ext.logging import RequestContext
@api.route("/debug")
def debug(req, resp):
resp.media = {
"request_id": RequestContext.get_request_id(),
"client_ip": RequestContext.get_client_ip(),
}
When ``enable_logging=True`` is set, it supersedes ``request_id=True``
— the logging middleware handles request IDs itself, so you don't get
duplicate headers.
Pydantic Validation
-------------------
`Pydantic <https://docs.pydantic.dev/>`_ models integrate directly with
Responder's routing. Set ``request_model`` to validate incoming data and
``response_model`` to control the shape of outgoing data::
from pydantic import BaseModel
class ItemIn(BaseModel):
name: str
price: float
class ItemOut(BaseModel):
id: int
name: str
price: float
@api.route("/items", methods=["POST"],
request_model=ItemIn, response_model=ItemOut)
async def create_item(req, resp):
data = await req.media()
resp.media = {"id": 1, **data}
When ``request_model`` is set:
- Valid requests are parsed and the data is available via ``await req.media()``
- Invalid requests get an automatic ``422 Unprocessable Entity`` response
with detailed error messages — you don't write any validation code
When ``response_model`` is set:
- The response is serialized through the model before being sent
- Extra fields are stripped automatically
- Type coercion happens at the boundary
This is the recommended way to build validated REST APIs with Responder.
See the :doc:`tutorial-rest` for a complete walkthrough.
Content Negotiation
-------------------
Responder automatically negotiates the response format based on the
client's ``Accept`` header. Set ``resp.media`` to a Python object and
the right thing happens:
- ``Accept: application/json`` (default) → JSON
- ``Accept: application/x-yaml`` → YAML
- ``Accept: application/x-msgpack`` → MessagePack
This means a single endpoint serves multiple formats without any
conditional logic in your code::
@api.route("/data")
def data(req, resp):
resp.media = {"key": "value"}
Clients get the format they ask for::
$ curl http://localhost:5042/data
{"key": "value"}
$ curl -H "Accept: application/x-yaml" http://localhost:5042/data
key: value
MessagePack
-----------
@@ -608,6 +729,10 @@ Responder supports MessagePack alongside JSON and YAML::
# Decode a MessagePack request body
data = await req.media("msgpack")
Content negotiation works too — clients can send
# Respond with MessagePack
resp.media = {"result": [1, 2, 3]}
Content negotiation works automatically — clients can send
``Accept: application/x-msgpack`` to receive MessagePack responses
instead of JSON.
instead of JSON. You can also explicitly decode MessagePack request
bodies by passing ``"msgpack"`` to ``req.media()``.
+55 -2
View File
@@ -46,14 +46,14 @@ Install PyJWT::
Create a helper to encode and decode tokens::
import jwt
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
SECRET = "your-secret-key"
def create_token(user_id: int) -> str:
payload = {
"sub": user_id,
"exp": datetime.utcnow() + timedelta(hours=24),
"exp": datetime.now(timezone.utc) + timedelta(hours=24),
}
return jwt.encode(payload, SECRET, algorithm="HS256")
@@ -189,3 +189,56 @@ Remember to set a proper secret key::
The session data is signed (not encrypted) — users can read it but
can't tamper with it. Don't store sensitive data like passwords in
sessions.
Role-Based Access Control
--------------------------
For APIs where different users have different permissions, embed the
role in the token and check it in route-specific guards::
def create_token(user_id: int, role: str = "user") -> str:
payload = {
"sub": user_id,
"role": role,
"exp": datetime.now(timezone.utc) + timedelta(hours=24),
}
return jwt.encode(payload, SECRET, algorithm="HS256")
Create a helper that checks for a specific role::
def require_role(*roles):
"""Before-request hook factory that restricts by role."""
def check(req, resp):
user_role = getattr(req.state, "role", None)
if user_role not in roles:
resp.status_code = 403
resp.media = {"error": "Insufficient permissions"}
return check
Use it on specific routes::
@api.route("/admin/users", before_request=require_role("admin"))
def list_all_users(req, resp):
resp.media = {"users": [...]}
And store the role during token verification::
# In your auth_guard:
req.state.user_id = payload["sub"]
req.state.role = payload.get("role", "user")
Choosing an Auth Strategy
--------------------------
- **API keys** — simplest. Good for server-to-server, CLI tools, and
internal services. No expiration unless you build it.
- **JWT tokens** — standard for SPAs and mobile apps. Stateless, so
they scale well. Downside: you can't revoke them without a blocklist.
- **Sessions** — best for traditional web apps with HTML forms. The
browser manages cookies automatically. Stateful — the server controls
the session lifecycle.
Start with API keys for internal tools, JWT for public APIs, and
sessions for web apps with login pages.
+41
View File
@@ -117,6 +117,47 @@ the existing stack. Keep this in mind for ordering dependencies — if
middleware A depends on middleware B having run first, add B before A.
Writing Pure ASGI Middleware
----------------------------
For maximum performance and control, you can write middleware as a plain
ASGI application. This bypasses Starlette's ``BaseHTTPMiddleware``
abstraction — it's faster and gives you direct access to the ASGI
protocol::
class SecurityHeadersMiddleware:
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
async def send_with_headers(message):
if message["type"] == "http.response.start":
extra = [
(b"x-content-type-options", b"nosniff"),
(b"x-frame-options", b"DENY"),
(b"referrer-policy", b"strict-origin-when-cross-origin"),
]
message["headers"] = list(message["headers"]) + extra
await send(message)
await self.app(scope, receive, send_with_headers)
api.add_middleware(SecurityHeadersMiddleware)
This is the same pattern used internally by Starlette and uvicorn. The
middleware receives the ASGI ``scope``, ``receive``, and ``send`` callables,
and wraps ``send`` to inject headers into the response.
For most cases, ``BaseHTTPMiddleware`` is simpler and perfectly fine.
Use the pure ASGI approach when you need to handle WebSocket connections,
streaming responses, or want to avoid the overhead of request/response
object creation.
When to Use What
-----------------
+11 -10
View File
@@ -28,8 +28,8 @@ SQLAlchemy models map Python classes to database tables. Each attribute
becomes a column::
# models.py
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy import String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
pass
@@ -37,15 +37,16 @@ becomes a column::
class Book(Base):
__tablename__ = "books"
id = Column(Integer, primary_key=True, autoincrement=True)
title = Column(String, nullable=False)
author = Column(String, nullable=False)
year = Column(Integer, nullable=False)
isbn = Column(String, nullable=True)
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
title: Mapped[str] = mapped_column(String, nullable=False)
author: Mapped[str] = mapped_column(String, nullable=False)
year: Mapped[int] = mapped_column(nullable=False)
isbn: Mapped[str | None] = mapped_column(String, nullable=True)
``DeclarativeBase`` is SQLAlchemy's modern base class (SQLAlchemy 2.0+).
Each model class corresponds to a table, and each ``Column`` corresponds
to a column in that table.
This uses SQLAlchemy 2.0's ``Mapped`` type annotations and
``mapped_column()``, which give you type checker support and cleaner
syntax than the older ``Column()`` style. Each model class corresponds
to a table, and each ``mapped_column()`` corresponds to a column.
Database Setup
+50 -2
View File
@@ -58,6 +58,8 @@ A chat room needs to broadcast messages to all connected clients. We keep
a set of active connections and iterate through them when someone sends
a message::
from starlette.websockets import WebSocketDisconnect
connected = set()
@api.route("/chat", websocket=True)
@@ -70,13 +72,15 @@ a message::
# Broadcast to all connected clients
for client in connected:
await client.send_text(message)
except Exception:
except WebSocketDisconnect:
pass
finally:
connected.discard(ws)
The ``try/finally`` block ensures we remove disconnected clients from
the set, even if the connection drops unexpectedly.
the set, even if the connection drops unexpectedly. Catching
``WebSocketDisconnect`` specifically (rather than bare ``Exception``)
makes the intent clear and avoids swallowing real bugs.
Data Formats
@@ -154,6 +158,40 @@ WebSocket before-request hooks receive the ``ws`` object and must call
``await ws.accept()`` if they want the connection to proceed.
Connection Lifecycle
--------------------
WebSocket connections go through several states:
1. **Connecting** — the client sends an upgrade request
2. **Open** — after ``await ws.accept()``, both sides can send messages
3. **Closing** — either side initiates a close handshake
4. **Closed** — the connection is fully terminated
When a client disconnects (closes the tab, loses network), the next
``await ws.receive_text()`` raises ``WebSocketDisconnect``. Always
handle this — otherwise your server accumulates dead connections::
from starlette.websockets import WebSocketDisconnect
@api.route("/ws", websocket=True)
async def handler(ws):
await ws.accept()
try:
while True:
data = await ws.receive_text()
await ws.send_text(f"Got: {data}")
except WebSocketDisconnect:
print("Client disconnected")
You can also close connections from the server side::
await ws.close(code=1000) # 1000 = normal closure
Common close codes: ``1000`` (normal), ``1001`` (going away),
``1008`` (policy violation), ``1011`` (server error).
Testing WebSockets
------------------
@@ -169,3 +207,13 @@ Use Starlette's ``TestClient`` for WebSocket tests::
The ``websocket_connect`` context manager handles the connection
lifecycle — it connects on enter and disconnects on exit.
You can also test that connections are properly rejected::
from starlette.websockets import WebSocketDisconnect
def test_websocket_404():
client = TestClient(api)
with pytest.raises(WebSocketDisconnect):
with client.websocket_connect("/nonexistent"):
pass
+1 -1
View File
@@ -1 +1 @@
__version__ = "3.5.0"
__version__ = "3.6.2"
+23 -2
View File
@@ -31,6 +31,7 @@ class API:
:param templates_dir: The directory to use for templates. Will be created for you if it doesn't already exist.
:param auto_escape: If ``True``, HTML and XML templates will automatically be escaped.
:param enable_hsts: If ``True``, send all responses to HTTPS URLs.
:param gzip: If ``True`` (the default), compress responses with GZip.
:param openapi_theme: OpenAPI documentation theme, must be one of ``elements``, ``rapidoc``, ``redoc``, ``swagger_ui``
""" # noqa: E501
@@ -60,7 +61,9 @@ class API:
allowed_hosts=None,
openapi_theme=DEFAULT_OPENAPI_THEME,
lifespan=None,
gzip=True,
request_id=False,
enable_logging=False,
):
"""Create a new Responder API instance.
@@ -85,7 +88,9 @@ class API:
:param allowed_hosts: List of allowed hostnames (e.g. ``["example.com"]``). Defaults to ``["*"]``.
:param openapi_theme: Documentation UI theme: ``"swagger_ui"``, ``"redoc"``, ``"rapidoc"``, or ``"elements"``.
:param lifespan: An async context manager for startup/shutdown logic.
:param gzip: If ``True`` (the default), compress responses with GZip.
:param request_id: If ``True``, add ``X-Request-ID`` headers to all responses.
:param enable_logging: If ``True``, enable structured logging with per-request context (request ID, method, path, client IP).
""" # noqa: E501
self.background = BackgroundQueue()
@@ -120,7 +125,9 @@ class API:
self.default_endpoint = None
self.app = ExceptionMiddleware(self.router, debug=debug)
self.add_middleware(GZipMiddleware)
if gzip:
self.add_middleware(GZipMiddleware)
if self.hsts_enabled:
self.add_middleware(HTTPSRedirectMiddleware)
@@ -158,7 +165,7 @@ class API:
self.templates = Templates(directory=templates_dir)
if request_id:
if request_id and not enable_logging:
import uuid as _uuid
def _add_request_id(req, resp):
@@ -167,6 +174,20 @@ class API:
self.router.after_request(_add_request_id)
if enable_logging:
import logging as _logging
from .ext.logging import LoggingMiddleware, get_logger, setup_logging
log_level = _logging.DEBUG if debug else _logging.INFO
setup_logging(level=log_level)
self.add_middleware(LoggingMiddleware)
self.log = get_logger("responder.app")
else:
import logging as _logging
self.log = _logging.getLogger("responder.app")
@property
def requests(self):
"""A test client connected to the ASGI app. Lazily initialized."""
+1 -2
View File
@@ -101,8 +101,7 @@ class GraphQLView:
response_data["data"] = result.data
resp.media = response_data
status_code = 200 if not result.errors else 400
return (query, json.dumps(response_data), status_code)
resp.status_code = 200 if not result.errors else 400
async def on_request(self, req, resp):
await self.graphql_response(req, resp)
+1 -1
View File
@@ -25,7 +25,7 @@ GRAPHIQL = """
<script crossorigin src="//cdn.jsdelivr.net/npm/react-dom@{{ REACT_VERSION }}/umd/react-dom.production.min.js"></script>
<script src="//cdn.jsdelivr.net/npm/graphiql@{{ GRAPHIQL_VERSION }}/graphiql.min.js"></script>
<script>
const fetcher = GraphiQL.createFetcher({ url: '{{ endpoint }}' });
const fetcher = GraphiQL.createFetcher({ url: {{ endpoint | tojson }} });
const root = ReactDOM.createRoot(document.getElementById('graphiql'));
root.render(React.createElement(GraphiQL, { fetcher: fetcher }));
</script>
+181
View File
@@ -0,0 +1,181 @@
"""Structured logging with per-request context.
Provides a logging setup that automatically includes request metadata
(request ID, method, path, client IP) in every log message emitted
during request handling.
Usage::
api = responder.API(enable_logging=True)
# In any route or middleware:
from responder.ext.logging import get_logger
logger = get_logger(__name__)
@api.route("/")
def index(req, resp):
logger.info("handling request")
# => 2026-03-24 12:00:00 [INFO] app — handling request [GET /] [req:abc123] [client:127.0.0.1]
"""
from __future__ import annotations
import logging
import time
import uuid
from contextvars import ContextVar
__all__ = ["get_logger", "RequestContext", "RequestContextFilter", "LoggingMiddleware"]
# Context variables for per-request metadata.
_request_id: ContextVar[str] = ContextVar("request_id", default="-")
_request_method: ContextVar[str] = ContextVar("request_method", default="-")
_request_path: ContextVar[str] = ContextVar("request_path", default="-")
_client_ip: ContextVar[str] = ContextVar("client_ip", default="-")
class RequestContext:
"""Read-only access to the current request's logging context."""
@staticmethod
def get_request_id() -> str:
return _request_id.get()
@staticmethod
def get_method() -> str:
return _request_method.get()
@staticmethod
def get_path() -> str:
return _request_path.get()
@staticmethod
def get_client_ip() -> str:
return _client_ip.get()
class RequestContextFilter(logging.Filter):
"""A logging filter that injects request context into log records.
Adds ``request_id``, ``request_method``, ``request_path``, and
``client_ip`` attributes to every log record, so they can be used
in format strings.
"""
def filter(self, record: logging.LogRecord) -> bool:
record.request_id = _request_id.get() # type: ignore[attr-defined]
record.request_method = _request_method.get() # type: ignore[attr-defined]
record.request_path = _request_path.get() # type: ignore[attr-defined]
record.client_ip = _client_ip.get() # type: ignore[attr-defined]
return True
# Default format that includes request context.
DEFAULT_LOG_FORMAT = (
"%(asctime)s [%(levelname)s] %(name)s -- %(message)s "
"[%(request_method)s %(request_path)s] "
"[req:%(request_id)s] [client:%(client_ip)s]"
)
def get_logger(name: str | None = None) -> logging.Logger:
"""Get a logger with the request context filter attached.
:param name: Logger name (typically ``__name__``).
:returns: A :class:`logging.Logger` with request context available.
"""
logger = logging.getLogger(name)
# Avoid adding duplicate filters.
if not any(isinstance(f, RequestContextFilter) for f in logger.filters):
logger.addFilter(RequestContextFilter())
return logger
def setup_logging(level: int = logging.INFO) -> None:
"""Configure the root logger with request-context-aware formatting.
:param level: The logging level (default ``INFO``).
"""
root = logging.getLogger()
root.setLevel(level)
# Only add our handler if the root logger has no handlers yet,
# or if none of them use our filter.
has_context_handler = any(
any(isinstance(f, RequestContextFilter) for f in h.filters)
for h in root.handlers
)
if not has_context_handler:
handler = logging.StreamHandler()
handler.setLevel(level)
handler.addFilter(RequestContextFilter())
handler.setFormatter(logging.Formatter(DEFAULT_LOG_FORMAT))
root.addHandler(handler)
class LoggingMiddleware:
"""ASGI middleware that sets per-request context variables.
For each HTTP request, this middleware:
1. Extracts or generates a request ID
2. Sets context variables for method, path, and client IP
3. Logs the request and response with timing information
"""
def __init__(self, app, logger_name: str = "responder.access"):
self.app = app
self.logger = get_logger(logger_name)
async def __call__(self, scope, receive, send):
if scope["type"] not in ("http", "websocket"):
await self.app(scope, receive, send)
return
# Extract request metadata.
headers = dict(scope.get("headers", []))
request_id = (
headers.get(b"x-request-id", b"").decode() or uuid.uuid4().hex[:8]
)
method = scope.get("method", "WS")
path = scope.get("path", "/")
client = scope.get("client")
client_ip = client[0] if client else "-"
# Set context variables for the duration of this request.
tok_id = _request_id.set(request_id)
tok_method = _request_method.set(method)
tok_path = _request_path.set(path)
tok_ip = _client_ip.set(client_ip)
# Track response status.
status_code = None
async def send_wrapper(message):
nonlocal status_code
if message["type"] == "http.response.start":
status_code = message.get("status")
# Inject request ID into response headers.
headers_list = list(message.get("headers", []))
headers_list.append((b"x-request-id", request_id.encode()))
message = {**message, "headers": headers_list}
await send(message)
start = time.perf_counter()
try:
await self.app(scope, receive, send_wrapper)
finally:
duration_ms = (time.perf_counter() - start) * 1000
if scope["type"] == "http":
self.logger.info(
"%s %s%s (%.1fms)",
method,
path,
status_code or "?",
duration_ms,
)
# Reset context variables.
_request_id.reset(tok_id)
_request_method.reset(tok_method)
_request_path.reset(tok_path)
_client_ip.reset(tok_ip)
+5 -4
View File
@@ -174,8 +174,8 @@ class OpenAPISchema:
def add_schema(self, name, schema, check_existing=True):
"""Adds a marshmallow or Pydantic schema to the API specification."""
if check_existing:
assert name not in self.schemas
assert name not in self.pydantic_schemas
if name in self.schemas or name in self.pydantic_schemas:
raise ValueError(f"Schema '{name}' is already registered")
if _is_pydantic_model(schema):
self.pydantic_schemas[name] = schema
@@ -216,12 +216,13 @@ class OpenAPISchema:
f"{self.docs_theme}.html",
title=self.title,
version=self.version,
schema_url="/schema.yml",
schema_url=self.openapi_route,
)
def static_url(self, asset):
"""Given a static asset, return its URL path."""
assert self.static_route is not None
if self.static_route is None:
raise RuntimeError("Cannot generate static URL: static_route is disabled")
return f"{self.static_route}/{str(asset)}"
def docs_response(self, req, resp):
+15 -8
View File
@@ -1,5 +1,6 @@
"""Simple in-memory rate limiter for Responder."""
import threading
import time
from collections import defaultdict
@@ -28,6 +29,7 @@ class RateLimiter:
self.max_requests = requests
self.period = period
self._buckets: dict[str, list[float]] = defaultdict(list)
self._lock = threading.Lock()
def _client_key(self, req):
client = req.client
@@ -39,20 +41,25 @@ class RateLimiter:
now = time.time()
cutoff = now - self.period
self._buckets[key] = [t for t in self._buckets[key] if t > cutoff]
if not self._buckets[key]:
del self._buckets[key]
def check(self, req, resp):
"""Check rate limit. Sets 429 status if exceeded."""
key = self._client_key(req)
self._cleanup(key)
if len(self._buckets[key]) >= self.max_requests:
resp.status_code = 429
resp.media = {"error": "rate limit exceeded"}
resp.headers["Retry-After"] = str(self.period)
return False
with self._lock:
self._cleanup(key)
if len(self._buckets[key]) >= self.max_requests:
resp.status_code = 429
resp.media = {"error": "rate limit exceeded"}
resp.headers["Retry-After"] = str(self.period)
return False
self._buckets[key].append(time.time())
remaining = self.max_requests - len(self._buckets[key])
self._buckets[key].append(time.time())
remaining = self.max_requests - len(self._buckets[key])
resp.headers["X-RateLimit-Limit"] = str(self.max_requests)
resp.headers["X-RateLimit-Remaining"] = str(remaining)
return True
+15 -4
View File
@@ -34,12 +34,21 @@ class CaseInsensitiveDict(dict):
def __getitem__(self, key):
return super().__getitem__(key.lower())
def __delitem__(self, key):
super().__delitem__(key.lower())
def __contains__(self, key):
return super().__contains__(key.lower())
def get(self, key, default=None):
return super().get(key.lower(), default)
def pop(self, key, *args):
return super().pop(key.lower(), *args)
def setdefault(self, key, default=None):
return super().setdefault(key.lower(), default)
def update(self, other=None, **kwargs):
if other:
for key, value in other.items():
@@ -299,8 +308,8 @@ class Request:
"""
if format is None:
format = "yaml" if "yaml" in self.mimetype or "" else "json" # noqa: A001
format = "form" if "form" in self.mimetype or "" else format # noqa: A001
format = "yaml" if "yaml" in self.mimetype else "json" # noqa: A001
format = "form" if "form" in self.mimetype else format # noqa: A001
formatter: Callable
if isinstance(format, str):
@@ -464,9 +473,11 @@ class Response:
self.mimetype = guessed or "application/octet-stream"
async def file_generator():
with open(path, "rb") as f:
import anyio
async with await anyio.open_file(path, "rb") as f:
while True:
chunk = f.read(chunk_size)
chunk = await f.read(chunk_size)
if not chunk:
break
yield chunk
+21 -10
View File
@@ -130,7 +130,7 @@ class Route(BaseRoute):
response = Response(req=request, formats=get_formats())
path_params = scope.get("path_params", {})
before_requests = scope.get("before_requests", [])
before_requests = scope.get("before_requests", {"http": [], "ws": []})
for before_request in before_requests.get("http", []):
if inspect.iscoroutinefunction(before_request):
@@ -144,9 +144,11 @@ class Route(BaseRoute):
# Auto-validate request body with Pydantic model
req_model = getattr(self.endpoint, "_request_model", None)
if req_model is not None and request.method in ("post", "put", "patch"):
if req_model is not None and request.method in ("post", "put", "patch", "delete"):
try:
body = await request.media()
if not isinstance(body, dict):
raise TypeError("Request body must be a JSON object")
req_model(**body)
except Exception as exc:
response.status_code = 422
@@ -188,7 +190,7 @@ class Route(BaseRoute):
# Auto-serialize response with Pydantic model
resp_model = getattr(self.endpoint, "_response_model", None)
if resp_model is not None and response.media is not None:
if resp_model is not None and isinstance(response.media, dict):
try:
validated = resp_model(**response.media)
response.media = validated.model_dump()
@@ -266,7 +268,7 @@ class WebSocketRoute(BaseRoute):
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
ws = WebSocket(scope, receive, send)
before_requests = scope.get("before_requests", [])
before_requests = scope.get("before_requests", {"http": [], "ws": []})
for before_request in before_requests.get("ws", []):
await before_request(ws)
@@ -459,13 +461,22 @@ class Router:
if path.startswith(path_prefix):
scope["path"] = path[len(path_prefix) :] or "/"
scope["root_path"] = root_path + path_prefix
try:
await app(scope, receive, send)
return
except TypeError:
from a2wsgi import WSGIMiddleware
app = WSGIMiddleware(app)
if not (inspect.iscoroutinefunction(app) or hasattr(app, "__asgi_app__")):
# Check if it looks like a WSGI app (callable with fewer params)
try:
await app(scope, receive, send)
return
except TypeError as exc:
# Only fall back to WSGI if the error is about call signature
if "argument" not in str(exc) and "positional" not in str(exc):
raise
from a2wsgi import WSGIMiddleware
app = WSGIMiddleware(app)
await app(scope, receive, send)
return
else:
await app(scope, receive, send)
return
+1 -1
View File
@@ -85,7 +85,7 @@ for number in codes:
def _is_category(category, status_code):
return all([(status_code >= category), (status_code < category + 100)])
return category <= status_code < category + 100
def is_100(status_code):
+2 -3
View File
@@ -12,7 +12,7 @@ __all__ = [
logger = logging.getLogger(__name__)
def load_target(target: str, default_property: str = "api", method: str = "run") -> t.Any:
def load_target(target: str, default_property: str = "api") -> t.Any:
"""
Load Python code from a file path or module name.
@@ -24,7 +24,6 @@ def load_target(target: str, default_property: str = "api", method: str = "run")
target: Module address (e.g., 'acme.app:foo'), file path (e.g., '/path/to/acme/app.py'),
or URL.
default_property: Name of the property to load if not specified in target (default: "api")
method: Name of the method to invoke on the API instance (default: "run")
Returns:
The API instance, loaded from the given property.
@@ -32,7 +31,7 @@ def load_target(target: str, default_property: str = "api", method: str = "run")
Raises:
ValueError: If target format is invalid
ImportError: If module cannot be imported
AttributeError: If property or method is not found
AttributeError: If property is not found
Example:
>>> api = load_target("myapp.api:server")
+6 -1
View File
@@ -60,7 +60,12 @@ def test_cli_version(capfd):
)
stdout = capfd.readouterr().out.strip()
assert stdout == __version__
# TODO: Accommodate PyPy as installed by `uv`, it emits spurious output on stdout before the version number.
# AssertionError: assert '(_common_types_metatype, 9088, 128, 128)\n(cython_function_or_method, 157568, 128, 128)\n3.6.0' == '3.6.0'
lines = [line.strip() for line in stdout.splitlines() if line.strip()]
assert lines, "Expected version output on stdout"
assert lines[-1] == __version__
def responder_build(path: Path, capfd: CaptureFixture) -> t.Tuple[str, str]:
+4 -3
View File
@@ -85,7 +85,7 @@ def test_background_task_exception(capsys):
time.sleep(0.2) # let the done callback fire
captured = capsys.readouterr()
assert "ValueError" in captured.err or True # traceback goes to stderr
assert "ValueError" in captured.err or "ValueError" in captured.out
def test_background_run():
@@ -112,7 +112,8 @@ def test_form_uploads_without_multipart(api):
content="name=hello&value=world",
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
assert r.json() == {"name": "world", "value": "world"} or r.status_code < 500
assert r.status_code == 200
assert r.json() == {"name": ["hello"], "value": ["world"]}
# --- models.py coverage ---
@@ -469,7 +470,7 @@ def test_graphql_text_query(api):
content="{ hello }",
headers={"Content-Type": "text/plain"},
)
assert r.status_code < 500
assert r.status_code == 200
def test_openapi_info_fields():
+158
View File
@@ -0,0 +1,158 @@
"""Tests for structured logging with request context."""
import logging
import responder
from responder.ext.logging import (
LoggingMiddleware,
RequestContext,
RequestContextFilter,
get_logger,
)
def test_logging_middleware_sets_request_id():
"""LoggingMiddleware adds X-Request-ID to responses."""
api = responder.API(allowed_hosts=["localhost"], enable_logging=True)
@api.route("/")
def index(req, resp):
resp.text = "ok"
r = api.requests.get("http://localhost/")
assert r.status_code == 200
assert "x-request-id" in r.headers
assert len(r.headers["x-request-id"]) > 0
def test_logging_middleware_forwards_request_id():
"""LoggingMiddleware forwards client-provided X-Request-ID."""
api = responder.API(allowed_hosts=["localhost"], enable_logging=True)
@api.route("/")
def index(req, resp):
resp.text = "ok"
r = api.requests.get(
"http://localhost/", headers={"X-Request-ID": "custom-id-123"}
)
assert r.headers["x-request-id"] == "custom-id-123"
def test_logging_context_available_in_route():
"""Request context is available inside route handlers."""
api = responder.API(allowed_hosts=["localhost"], enable_logging=True)
captured = {}
@api.route("/ctx")
def index(req, resp):
captured["request_id"] = RequestContext.get_request_id()
captured["method"] = RequestContext.get_method()
captured["path"] = RequestContext.get_path()
captured["client_ip"] = RequestContext.get_client_ip()
resp.text = "ok"
api.requests.get("http://localhost/ctx")
assert captured["method"] == "GET"
assert captured["path"] == "/ctx"
assert captured["request_id"] != "-"
assert captured["client_ip"] != "-"
def test_logging_filter_injects_attributes():
"""RequestContextFilter adds context fields to log records."""
logger = get_logger("test.filter")
records = []
class CaptureHandler(logging.Handler):
def emit(self, record):
records.append(record)
handler = CaptureHandler()
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
api = responder.API(allowed_hosts=["localhost"], enable_logging=True)
@api.route("/log")
def index(req, resp):
logger.info("test message")
resp.text = "ok"
api.requests.get("http://localhost/log")
logger.removeHandler(handler)
assert len(records) > 0
record = records[0]
assert hasattr(record, "request_id")
assert hasattr(record, "request_method")
assert hasattr(record, "request_path")
assert hasattr(record, "client_ip")
assert record.request_method == "GET"
assert record.request_path == "/log"
def test_get_logger_avoids_duplicate_filters():
"""get_logger doesn't add duplicate filters."""
logger = get_logger("test.dedup")
count_before = sum(1 for f in logger.filters if isinstance(f, RequestContextFilter))
get_logger("test.dedup")
count_after = sum(1 for f in logger.filters if isinstance(f, RequestContextFilter))
assert count_before == count_after == 1
def test_enable_logging_supersedes_request_id():
"""enable_logging handles request IDs itself (no duplicate headers)."""
api = responder.API(
allowed_hosts=["localhost"], request_id=True, enable_logging=True
)
@api.route("/")
def index(req, resp):
resp.text = "ok"
r = api.requests.get("http://localhost/")
# Should have exactly one X-Request-ID header.
assert "x-request-id" in r.headers
def test_api_logger_attribute():
"""api.log is available when enable_logging=True."""
api = responder.API(allowed_hosts=["localhost"], enable_logging=True)
assert api.log is not None
assert api.log.name == "responder.app"
def test_api_log_always_available():
"""api.log works even without enable_logging — just no request context."""
api = responder.API(allowed_hosts=["localhost"])
assert api.log is not None
assert api.log.name == "responder.app"
api.log.info("works without enable_logging")
def test_api_logger_works_in_routes():
"""api.log can be used inside route handlers with context."""
api = responder.API(allowed_hosts=["localhost"], enable_logging=True)
records = []
class CaptureHandler(logging.Handler):
def emit(self, record):
records.append(record)
handler = CaptureHandler()
api.log.addHandler(handler)
@api.route("/")
def index(req, resp):
api.log.info("hello from route")
resp.text = "ok"
api.requests.get("http://localhost/")
api.log.removeHandler(handler)
assert any(r.msg == "hello from route" for r in records)
record = next(r for r in records if r.msg == "hello from route")
assert record.request_method == "GET"
assert record.request_path == "/"
Generated
-3444
View File
File diff suppressed because it is too large Load Diff