Compare commits

...

28 Commits

Author SHA1 Message Date
Andreas Motl 5055fe7974 CI: Validate on Python 3.15 2026-03-26 01:50:31 +01:00
Andreas Motl c1d789f279 CI: Use uv exclusively (#606)
## About
That's just maintenance. In this case, streamline the CI configuration
for improved ergonomics and future proofing, see GH-605.

## Details
- `activate-environment: true` of newer `astral-sh/setup-uv@v7` is the
killer feature here, which wasn't available before.
- Configuring `cache-dependency-glob: pyproject.toml` isn't needed,
because the recipe already employs an excellent default configuration
that includes this and other project metadata files.
- The adjustments in `test_cli` were needed because of a different PyPy
version installed by `setup-uv`. This will yield a report to Astral
maintainers, unrelated to this patch.
2026-03-25 19:48:54 -04:00
kennethreitz 691f6b4d5c Bump version to 3.6.0 and update changelog
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 16:24:03 -04:00
kennethreitz 90a082a0ac Add structured logging with per-request context (#604)
## Summary

New `enable_logging=True` parameter on `responder.API()` that provides
structured, request-scoped logging using stdlib `logging` and
`contextvars`.

### What it does

- **`api.log`** — always available on every API instance. Works as a
plain logger by default; gains per-request context enrichment with
`enable_logging=True`
- **Per-request context** — every log message automatically includes
request ID, HTTP method, path, and client IP
- **Access logging** — logs every request with timing: `GET /path → 200
(1.2ms)`
- **Request ID** — generates or forwards `X-Request-ID` headers
(supersedes `request_id=True` when both are set)
- **stdlib logging** — works with any existing handler, formatter, or
log aggregator

### Usage

```python
# api.log always works — no setup required
api = responder.API()
api.log.info("starting up")  # plain logger, no context

# With enable_logging=True, log messages get request context automatically
api = responder.API(enable_logging=True)

@api.route("/")
def index(req, resp):
    api.log.info("handling request")
    # => 2026-03-24 12:00:00 [INFO] responder.app — handling request [GET /] [req:abc123] [client:127.0.0.1]
```

For additional loggers in helper modules:

```python
from responder.ext.logging import get_logger
logger = get_logger("myapp.db")
```

### Architecture

- `responder/ext/logging.py` — self-contained module with:
- `LoggingMiddleware` — pure ASGI middleware that sets contextvars and
logs access
- `RequestContextFilter` — logging filter that injects context into
records
  - `RequestContext` — read-only access to current request metadata
  - `get_logger()` / `setup_logging()` — convenience functions
- `api.log` — always a valid logger; context-aware when
`enable_logging=True`, plain stdlib logger otherwise
- Wired into `API.__init__` via the `enable_logging` parameter

### Files

- `responder/ext/logging.py` — new module
- `responder/api.py` — added `enable_logging` parameter and `api.log`
- `tests/test_logging.py` — 9 tests
- `docs/source/tour.rst` — new Structured Logging section
- `docs/source/index.rst` — added to feature list

## Test plan
- [x] 9 logging tests pass
- [x] Full suite: 208 passed

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 16:22:57 -04:00
kennethreitz 5ee0de6458 Replace HTTP/2 server push with dependency injection in backlog
HTTP/2 server push was removed from the spec and dropped by browsers.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 16:02:56 -04:00
kennethreitz 1c729c8542 Docs: second improvement pass (#603)
## Summary

- **Auth tutorial**: fix deprecated `datetime.utcnow()` →
`datetime.now(timezone.utc)`, add role-based access control section, add
auth strategy comparison guide
- **WebSocket tutorial**: use `WebSocketDisconnect` instead of bare
`Exception` in chat example, add connection lifecycle section with close
codes, add rejected connection test example
- **SQLAlchemy tutorial**: modernize from `Column()` to
`mapped_column()` / `Mapped[]` (SQLAlchemy 2.0 style with type checker
support)
- **Deployment**: use `uv` in Docker example instead of pip, fix stale
`uv.lock` reference in production checklist
- **Quickstart**: link to all tutorials in "next steps" (was only
linking to 3 of 9)
- **Sandbox**: rewrite with project layout tree, mypy command, and
pattern-matching test examples

## Test plan
- [x] `make html` builds cleanly
- [x] All 199 tests pass

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 16:00:56 -04:00
kennethreitz 536428a787 Remove Unreleased section from changelog
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 15:54:13 -04:00
kennethreitz 3d5f3c7e93 Improve documentation across the board (#602)
## Summary

- **Deployment guide**: health check endpoint, Docker Compose example,
Caddy reverse proxy config, Procfile pattern, production checklist
- **API reference**: quick usage examples for every class (API, Request,
Response, RouteGroup, BackgroundQueue, RateLimiter, status helpers)
- **Feature tour**: new Pydantic validation and content negotiation
sections, expanded MessagePack docs
- **Testing guide**: rate limiting and mounted WSGI app test examples,
Werkzeug 3.1.7 tip
- **Middleware tutorial**: pure ASGI middleware example (no
BaseHTTPMiddleware dependency)
- **CLI guide**: environment variables section (PORT, SECRET_KEY)
- **Homepage**: updated feature list with SSE, rate limiting, Pydantic,
content negotiation, route groups
- **Backlog**: removed already-implemented items, added current ideas

+362 lines of docs, no code changes.

## Test plan
- [x] `make html` builds cleanly with no warnings
- [x] All 199 tests pass

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Signed-off-by: Kenneth Reitz <me@kennethreitz.org>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-authored-by: Andreas Motl <andreas.motl@panodata.org>
2026-03-24 15:51:55 -04:00
kennethreitz e0cce231ea Update CLAUDE.md to reflect lock file status
Signed-off-by: Kenneth Reitz <me@kennethreitz.org>
2026-03-24 15:42:14 -04:00
Andreas Motl 44c33475b2 Remove dependency lock file uv.lock. This is a library. (#601)
## About
We think using an `uv.lock` file is only applicable for applications,
and otherwise gives wrong impressions and expectations.

## References
- https://discuss.python.org/t/the-purpose-of-a-lock-file/38756
- https://github.com/orgs/python-poetry/discussions/7847
2026-03-24 15:35:41 -04:00
kennethreitz f4a292108b Move version below tagline in docs sidebar
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 15:34:49 -04:00
kennethreitz 25ea333ad4 Prefix version with v in docs sidebar
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 15:33:12 -04:00
kennethreitz 6279835040 Show version number in docs sidebar
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 15:32:48 -04:00
kennethreitz 0e493ad8d1 Add CLAUDE.md and /release command
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 15:31:42 -04:00
kennethreitz ce3ab46d59 Bump version to 3.5.0 and update changelog
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 15:27:36 -04:00
kennethreitz 6f9c87d71c Fix broad exception handling and future.result() call
- Call future.result() instead of bare property access in test (#596)
- Catch (ValueError, TypeError) instead of broad Exception in
  response model serialization (#597)
- Catch WebSocketDisconnect instead of broad Exception in
  websocket chat example (#598)

Closes #596, closes #597, closes #598

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 15:25:12 -04:00
kennethreitz 29d0621d98 Replace deprecated asyncio.iscoroutinefunction with inspect.iscoroutinefunction
Closes #599

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 15:23:24 -04:00
kennethreitz 30fa2dfda7 Add uv.lock
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 15:22:06 -04:00
kennethreitz 43c803a426 Restore print statements in lifespan example
These serve as illustrative markers for users reading the example.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 15:21:21 -04:00
Andreas Motl ff6d530338 Chore: Code formatting (#594)
## About
A few cosmetic adjustments aka. code formatting.
Also validate the outcome on CI/GHA.
Feel free to improve now or later at your disposal.

## Details
The updates are based on using the most recent versions of pyproject-fmt
and ruff.
Specifically, spots marked with `noqa` might need further love, also at
your disposal.

---------

Co-authored-by: Kenneth Reitz <me@kennethreitz.org>
2026-03-24 15:21:04 -04:00
kennethreitz a375984310 Fix WSGI mount returning 400 at mount root (#600)
## Summary
- When a WSGI app (e.g. Flask) is mounted at `/prefix` and a request
hits exactly `/prefix`, the path prefix was stripped to `""` instead of
`"/"`, causing frameworks like Flask to return 400.
- One-character fix: default the stripped path to `"/"` when empty.

## Test plan
- [x] `tests/test_responder.py::test_mount_wsgi_app` now passes

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 15:16:29 -04:00
Andreas Motl 46c6f440c5 Chore: Remove configuration for Read the Docs (#595)
https://responder.kennethreitz.org/ is here to stay.
This patch concludes the decision in GH-564.
2026-03-23 20:19:15 -04:00
Andreas Motl c87e8c876d CI: Validate on Python 3.14 vanilla, free-threaded, and PyPy (#593) 2026-03-23 18:28:27 -04:00
kennethreitz f86c7eed70 Fix RST title underline warning breaking docs CI
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-22 22:10:51 -04:00
kennethreitz 9d492a383c Add marimo notebook mounting docs and example
- Document mounting marimo ASGI apps in the feature tour
- Add examples/marimo_mount.py showing the integration
- Verified working: marimo.create_asgi_app() mounts cleanly via api.mount()

Fixes #580.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-22 22:09:01 -04:00
kennethreitz 77ae49aaef Improve GraphQL API interface, expand tests, drop 3.9 from CI
- Extract variables and operationName from query params and form data,
  not just JSON bodies. Fixes #571.
- Add docstrings to GraphQLView class and methods. Fixes #572.
- Add 10 new GraphQL tests: variables, operationName, mutations,
  context access, malformed queries, raw text, invalid variables
  param. Fixes #568.
- Remove Python 3.9 from CI matrix (dropped in 3.4.0).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-22 22:06:00 -04:00
kennethreitz 74c872ed57 Add type annotations to routes.py
Add comprehensive type hints to compile_path, BaseRoute, Route,
WebSocketRoute, and Router classes. Uses Starlette's Scope, Receive,
Send types and properly types the ASGI/WSGI union in Router.apps.
Fixes #566.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-22 21:47:21 -04:00
kennethreitz 724b769c9e Fix OpenAPI template packaging, add static file tests
- Include ext/openapi/docs/*.html in package_data so OpenAPI docs
  themes (swagger_ui, redoc, rapidoc, elements) ship with the wheel.
  Fixes #583.
- Add tests for static file serving and index.html fallback. Fixes #563.
- Bump version to 3.4.1.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-22 21:44:02 -04:00
39 changed files with 1643 additions and 276 deletions
+42
View File
@@ -0,0 +1,42 @@
Release a new version of responder to PyPI and GitHub.
Usage: /release <version> (e.g. /release 3.6.0)
If no version is provided, ask the user what version to release.
## Steps
1. **Verify clean state**: Run `git status` and ensure the working tree is clean. If not, stop and ask the user.
2. **Run tests**: Run `uv run pytest -x --no-header -q`. If any fail, stop and report.
3. **Bump version**: Update `responder/__version__.py` to the new version.
4. **Update changelog**:
- Run `git log --oneline $(git describe --tags --abbrev=0)..HEAD` to get commits since last release.
- Add a new section in `CHANGELOG.md` under `## [Unreleased]` with the date, categorized into Added/Changed/Fixed/Removed.
- Update the compare links at the bottom of the file.
5. **Lock deps**: Run `uv lock`.
6. **Commit**: Stage `responder/__version__.py`, `CHANGELOG.md`, and `uv.lock`. Commit with message `Bump version to X.Y.Z and update changelog`.
7. **Push and tag**:
```
git push
git tag vX.Y.Z
git push origin vX.Y.Z
```
8. **GitHub release**: Create a release with `gh release create` including highlights and a link to the full changelog.
9. **Build and publish**:
```
uv build
uvx twine upload dist/responder-X.Y.Z*
```
Note: This requires a PyPI token. If twine fails due to auth, tell the user to set `TWINE_USERNAME=__token__` and `TWINE_PASSWORD` and re-run, or run `! uvx twine upload dist/responder-X.Y.Z*` interactively.
10. **Update GitHub release**: Edit the release to add a link to the PyPI page: `https://pypi.org/project/responder/X.Y.Z/`
11. **Report**: Print a summary with links to the GitHub release and PyPI page.
+4 -12
View File
@@ -19,24 +19,16 @@ jobs:
documentation:
name: "Documentation"
runs-on: ubuntu-latest
env:
UV_SYSTEM_PYTHON: true
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
- name: Install uv
uses: astral-sh/setup-uv@v7
with:
python-version: "3.13"
- name: Set up uv
uses: astral-sh/setup-uv@v5
with:
version: "latest"
activate-environment: true
enable-cache: true
cache-dependency-glob: |
pyproject.toml
python-version: "3.14"
- name: Install package and documentation dependencies
run: uv pip install '.[docs]'
+8 -13
View File
@@ -20,14 +20,15 @@ jobs:
fail-fast: false
matrix:
python-version: [
"3.9",
"3.10",
"3.11",
"3.12",
"3.13",
"3.14",
"3.15",
"3.15t",
"pypy3.11",
]
env:
UV_SYSTEM_PYTHON: true
steps:
- uses: actions/checkout@v4
@@ -35,19 +36,13 @@ jobs:
with:
node-version: 22
- name: Set up Python
uses: actions/setup-python@v5
- name: Install uv
uses: astral-sh/setup-uv@v7
with:
python-version: ${{ matrix.python-version }}
- name: Set up uv
uses: astral-sh/setup-uv@v5
with:
version: "latest"
activate-environment: true
enable-cache: true
cache-suffix: ${{ matrix.python-version }}
cache-dependency-glob: |
pyproject.toml
python-version: ${{ matrix.python-version }}
- name: Install package
run: uv pip install '.[develop,test]'
+1
View File
@@ -8,6 +8,7 @@
.DS_Store
coverage.xml
.coverage*
*.lock
__pycache__
tests/__pycache__
-33
View File
@@ -1,33 +0,0 @@
# .readthedocs.yml
# Read the Docs configuration file
# Details
# - https://docs.readthedocs.io/en/stable/config-file/v2.html
# Required
version: 2
build:
os: "ubuntu-24.04"
tools:
python: "3.12"
python:
install:
- method: pip
path: .
extra_requirements:
- docs
sphinx:
configuration: docs/source/conf.py
# Use standard HTML builder.
builder: html
# Fail on all warnings to avoid broken references.
fail_on_warning: true
# Optionally build your docs in additional formats such as PDF
#formats:
# - pdf
+86 -2
View File
@@ -5,7 +5,87 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and
this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
## [v3.6.0] - 2026-03-24
### Added
- Built-in structured logging with per-request context (`enable_logging=True`)
- `api.log` — always-available logger, enriched with request context when logging is enabled
- Automatic access logging with timing: `GET /path → 200 (1.2ms)`
- Request ID generation/forwarding via `X-Request-ID` header
- `contextvars`-based request context (ID, method, path, client IP) on every log record
- `responder.ext.logging` module: `get_logger()`, `RequestContext`, `RequestContextFilter`
- CLAUDE.md project guide and `/release` command
- Version number in docs sidebar
### Changed
- Comprehensive documentation improvements across all pages
- Deployment: health checks, Docker Compose, Caddy, Procfile, production checklist
- API reference: usage examples for every class
- Feature tour: Pydantic validation, content negotiation, structured logging sections
- Tutorials: modernized SQLAlchemy to `mapped_column()`, fixed deprecated `datetime.utcnow()`,
WebSocket `WebSocketDisconnect` handling, role-based auth, auth strategy guide
- Testing: rate limiting and WSGI mount examples
- Middleware: pure ASGI middleware example
- Quickstart: links to all tutorials
- Sandbox: full rewrite with project layout
- Docker example uses `uv` instead of pip
- Backlog updated: removed implemented features, replaced HTTP/2 server push with dependency injection
### Removed
- `uv.lock` — this is a library, not an application
## [v3.5.0] - 2026-03-24
### Added
- CI validation for Python 3.14, 3.14 free-threaded, and PyPy 3.11
- Marimo notebook mounting docs and example
- Type annotations for `routes.py`
### Changed
- Replaced deprecated `asyncio.iscoroutinefunction` with `inspect.iscoroutinefunction` ahead of Python 3.16 removal
- Narrowed broad `except Exception` to specific exceptions in response model serialization and websocket chat example
- Improved GraphQL API interface with expanded test coverage
- Code formatting cleanup via pyproject-fmt and ruff
- Dropped Python 3.9 from CI
### Fixed
- WSGI mount returning 400 when requesting the exact mount root path
- Werkzeug 3.1.7 compatibility for trusted host validation in tests
- `future.result` bare property access in background task test (now properly calls `future.result()`)
- OpenAPI template packaging and static file serving
- RST title underline warning breaking docs CI
### Removed
- Read the Docs configuration (docs hosted on GitHub Pages)
## [v3.4.0] - 2026-03-22
### Changed
- Upgraded to Starlette 1.0
- Added comprehensive docstrings across the codebase
- Expanded API reference documentation
## [v3.3.0] - 2026-03-22
### Added
- Full documentation rewrite: tutorials for REST APIs, SQLAlchemy, Flask migration
- Auth, WebSocket, middleware, and configuration guides
- Testing docs with prose, examples, and tips
- GitHub Pages deployment for docs
### Changed
- Reworked homepage prose
- Rewrote CLI and API reference docs
## [v3.2.0] - 2026-03-22
@@ -411,7 +491,11 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Conception!
[unreleased]: https://github.com/kennethreitz/responder/compare/v3.0.0..HEAD
[v3.6.0]: https://github.com/kennethreitz/responder/compare/v3.5.0..v3.6.0
[v3.5.0]: https://github.com/kennethreitz/responder/compare/v3.4.0..v3.5.0
[v3.4.0]: https://github.com/kennethreitz/responder/compare/v3.3.0..v3.4.0
[v3.3.0]: https://github.com/kennethreitz/responder/compare/v3.2.0..v3.3.0
[v3.2.0]: https://github.com/kennethreitz/responder/compare/v3.0.0..v3.2.0
[v3.0.0]: https://github.com/kennethreitz/responder/compare/v2.0.5..v3.0.0
[v2.0.5]: https://github.com/kennethreitz/responder/compare/v2.0.4..v2.0.5
[v2.0.4]: https://github.com/kennethreitz/responder/compare/v2.0.3..v2.0.4
+44
View File
@@ -0,0 +1,44 @@
# Responder
A familiar HTTP Service Framework for Python, by Kenneth Reitz.
## Commands
- **Tests**: `uv run pytest` (runs full suite with coverage)
- **Single test**: `uv run pytest tests/test_responder.py::test_name -xvs`
- **Lint**: `uv run ruff check .`
- **Type check**: `uv run mypy`
- **Build docs**: `cd docs && uv run make html`
- **Build package**: `uv build`
- **Lock deps**: `uv lock`
## Architecture
- `responder/api.py` — Main `API` class, the entry point for all apps
- `responder/routes.py``Router`, `Route`, `WebSocketRoute` dispatch
- `responder/models.py``Request` and `Response` wrappers around Starlette
- `responder/ext/` — Extensions: CLI, GraphQL, OpenAPI, rate limiting
- `responder/background.py` — Background task queue
- `responder/formats.py` — Content negotiation (JSON, YAML, msgpack)
- `responder/__version__.py` — Single source of truth for version string
## Conventions
- Python 3.10+ only. Use `from __future__ import annotations` where present.
- Use `inspect.iscoroutinefunction` (not `asyncio.iscoroutinefunction`).
- Tests use `api.requests` (Starlette TestClient) with `allowed_hosts=[";"]` or `["localhost"]`.
- Werkzeug 3.1.7+ rejects invalid Host headers — use `localhost` when mounting WSGI apps in tests.
- Version is in `responder/__version__.py`, bump it there.
- Changelog follows [Keep a Changelog](https://keepachangelog.com/) format in `CHANGELOG.md`.
- Compare links at the bottom of CHANGELOG.md must be updated when adding a release.
- All deps managed via `uv`. Lock file (`uv.lock`) is not committed.
## Release Process
1. Bump version in `responder/__version__.py`
2. Add changelog entry in `CHANGELOG.md` (update compare links too)
3. `uv lock` to refresh the lock file
4. Commit: `Bump version to X.Y.Z and update changelog`
5. `git tag vX.Y.Z && git push && git push origin vX.Y.Z`
6. `gh release create vX.Y.Z --title "vX.Y.Z" --notes "..."`
7. `uv build && uvx twine upload dist/responder-X.Y.Z*`
+2
View File
@@ -5,6 +5,8 @@
</p>
<p>
<strong>Responder</strong> — a familiar HTTP service framework for Python.
<br />
<small>v{{ version }}</small>
</p>
<h3>Useful Links</h3>
<ul>
+87 -4
View File
@@ -12,6 +12,20 @@ The central object of every Responder application. It holds your routes,
middleware, templates, and configuration. Create one at the top of your
module and use it to define your entire web service.
Quick example::
import responder
api = responder.API(
title="My Service", # OpenAPI title
version="1.0", # OpenAPI version
openapi="3.0.2", # enable OpenAPI
docs_route="/docs", # Swagger UI at /docs
cors=True, # enable CORS
secret_key="change-me", # session signing key
allowed_hosts=["example.com"],
)
.. module:: responder
.. autoclass:: API
@@ -28,6 +42,27 @@ parameters, the request body, cookies, and more.
Most properties are synchronous, but reading the body requires ``await``
because it involves I/O.
Common patterns::
# Headers (case-insensitive)
token = req.headers.get("Authorization")
# Query parameters: /search?q=python&page=2
query = req.params["q"]
# JSON body
data = await req.media()
# Form data
form = await req.media("form")
# File uploads
files = await req.media("files")
# Client info
ip, port = req.client
is_https = req.is_secure
.. autoclass:: Request
:inherited-members:
@@ -39,6 +74,19 @@ The response object is passed into every view as the second argument.
Mutate it to control what gets sent back to the client — the body,
status code, headers, and cookies.
Common patterns::
resp.text = "plain text" # text/plain
resp.html = "<h1>Hello</h1>" # text/html
resp.media = {"key": "value"} # application/json
resp.content = b"raw bytes" # application/octet-stream
resp.file("path/to/file.pdf") # auto content-type
resp.stream_file("large/export.csv") # streamed
resp.status_code = 201
resp.headers["X-Custom"] = "value"
resp.cookies["session"] = "abc123"
.. autoclass:: Response
:inherited-members:
@@ -47,7 +95,13 @@ Route Groups
------------
Group related routes under a shared URL prefix — useful for API versioning
and organizing large applications.
and organizing large applications::
v1 = api.group("/v1")
@v1.route("/users")
def list_users(req, resp):
resp.media = []
.. autoclass:: responder.api.RouteGroup
:members:
@@ -57,7 +111,19 @@ Background Queue
----------------
Run tasks in background threads without blocking the response. Available
as ``api.background``.
as ``api.background``::
@api.route("/submit")
async def submit(req, resp):
data = await req.media()
@api.background.task
def process(data):
# runs in a thread pool
...
process(data)
resp.media = {"status": "accepted"}
.. autoclass:: responder.background.BackgroundQueue
:members:
@@ -67,6 +133,8 @@ Query Dict
----------
A dictionary subclass for query string parameters with multi-value support.
Behaves like a normal dict for single values, but supports ``getlist()``
for parameters that appear multiple times (e.g. ``?tag=a&tag=b``).
.. autoclass:: responder.models.QueryDict
:members:
@@ -76,7 +144,15 @@ Rate Limiter
------------
In-memory token bucket rate limiter. Limits requests per client IP address
and returns ``429 Too Many Requests`` when exceeded.
and returns ``429 Too Many Requests`` when exceeded::
from responder.ext.ratelimit import RateLimiter
limiter = RateLimiter(requests=100, period=60) # 100 req/min
limiter.install(api)
Response headers: ``X-RateLimit-Limit``, ``X-RateLimit-Remaining``,
and ``Retry-After`` (when limited).
.. autoclass:: responder.ext.ratelimit.RateLimiter
:members:
@@ -86,7 +162,14 @@ Status Code Helpers
-------------------
Convenience functions for checking which category a status code falls
into. Useful in middleware and after-request hooks.
into. Useful in middleware and after-request hooks::
from responder.status_codes import is_200, is_400, is_500
@api.after_request()
def log_errors(req, resp):
if is_400(resp.status_code) or is_500(resp.status_code):
print(f"Error: {req.method} {req.url.path} -> {resp.status_code}")
.. autofunction:: responder.status_codes.is_100
+5 -4
View File
@@ -1,7 +1,8 @@
# Backlog
## Future Ideas
- Consider adding `after_request` hooks (complement to `before_request`)
- Explore WebSocket before_request short-circuit support
- Add rate limiting middleware
- Consider async template rendering by default
- WebSocket before_request short-circuit support (reject before accept)
- Per-route rate limiting (different limits for different endpoints)
- Built-in structured logging with request context
- OpenAPI 3.1 support
- Dependency injection for route handlers
+19
View File
@@ -71,6 +71,25 @@ For URLs, use a fragment::
$ responder run https://example.com/app.py#service
Environment Variables
---------------------
Responder automatically reads the ``PORT`` environment variable at
runtime:
- ``PORT`` — bind to ``0.0.0.0`` on this port (cloud platform convention)
When ``PORT`` is set, the server binds to all interfaces automatically.
This is how cloud platforms like Fly.io, Railway, and Heroku inject the
listen port.
For other settings like ``SECRET_KEY``, read them in your application
code and pass them to ``responder.API()``::
import os
api = responder.API(secret_key=os.environ["SECRET_KEY"])
Building Frontend Assets
-------------------------
+88 -4
View File
@@ -30,8 +30,9 @@ Here's a minimal Dockerfile::
FROM python:3.13-slim
WORKDIR /app
COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv
COPY . .
RUN pip install responder
RUN uv pip install --system responder
ENV PORT=80
EXPOSE 80
CMD ["python", "api.py"]
@@ -42,9 +43,10 @@ Build and run::
$ docker run -p 8000:80 myapi
The ``python:3.13-slim`` image is about 150MB — small enough for fast
deploys but includes everything you need. For even smaller images, you
can use ``python:3.13-alpine``, though some packages may need extra
build dependencies.
deploys but includes everything you need. Using ``uv`` for installs
is significantly faster than pip. For even smaller images, you can use
``python:3.13-alpine``, though some packages may need extra build
dependencies.
Cloud Platforms
@@ -67,6 +69,27 @@ The pattern is always the same: deploy your code, set the start command
to ``python api.py``, and the platform handles the rest.
Health Check Endpoint
---------------------
Every production deployment needs a health check — a lightweight endpoint
that monitoring tools, load balancers, and orchestrators can poll to verify
your service is running::
@api.route("/health")
def health(req, resp):
resp.media = {"status": "healthy"}
Keep it simple. Don't query the database or do expensive work — the health
check should return instantly. Cloud platforms, Docker, and Kubernetes all
look for an HTTP 200 to confirm your service is alive.
For Docker, add a ``HEALTHCHECK`` instruction::
HEALTHCHECK --interval=30s --timeout=3s \
CMD curl -f http://localhost/health || exit 1
Uvicorn Directly
----------------
@@ -82,6 +105,45 @@ Uvicorn supports many options — SSL certificates, access logging, graceful
shutdown timeouts, and more. See the
`uvicorn documentation <https://www.uvicorn.org/deployment/>`_ for details.
For platforms like Heroku or Railway that use a ``Procfile``::
web: uvicorn api:api --host 0.0.0.0 --port $PORT --workers 4
Docker Compose
--------------
For local development with databases and other services, Docker Compose
ties everything together::
# docker-compose.yml
services:
api:
build: .
ports:
- "5042:80"
environment:
- PORT=80
- DATABASE_URL=postgresql+asyncpg://user:pass@db/myapp
- SECRET_KEY=dev-secret
depends_on:
- db
db:
image: docker.io/postgres:16-alpine
environment:
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
POSTGRES_DB: myapp
volumes:
- pgdata:/var/lib/postgresql/data
volumes:
pgdata:
Run with ``docker compose up``. The API waits for ``db`` to start, then
connects using the ``DATABASE_URL`` environment variable.
Reverse Proxy
-------------
@@ -95,9 +157,31 @@ front of your application for:
- **Static asset serving** — offload static files to the proxy
- **Rate limiting** — at the infrastructure level
A minimal Caddy config that handles HTTPS automatically::
# Caddyfile
example.com {
reverse_proxy localhost:5042
}
Responder's ``TrustedHostMiddleware`` and ``HTTPSRedirectMiddleware`` work
correctly behind proxies that set standard forwarding headers
(``X-Forwarded-For``, ``X-Forwarded-Proto``).
That said, uvicorn is production-ready on its own. Many applications run
uvicorn directly without a reverse proxy and do just fine.
Production Checklist
--------------------
Before going live:
- **Set a secret key**``SECRET_KEY`` env var, never the default
- **Disable debug mode**``DEBUG=false`` or omit it entirely
- **Set allowed hosts** — restrict to your actual domain names
- **Use multiple workers**``--workers 4`` or more, depending on CPU cores
- **Add a health check**``/health`` endpoint for monitoring
- **Enable HTTPS** — via your proxy, cloud platform, or uvicorn's ``--ssl-*`` flags
- **Set up logging** — uvicorn logs requests by default; pipe them to your log aggregator
- **Pin your dependencies** — use a lock file or pinned requirements for reproducible deploys
+7 -2
View File
@@ -59,21 +59,26 @@ What You Get
One ``pip install``, batteries included:
- Pydantic request validation and response serialization.
- Mount Flask, Django, or any WSGI/ASGI app at a subroute.
- Gzip compression, HSTS, CORS, and trusted host validation.
- Before-request hooks that can short-circuit for auth guards.
- Before-request and after-request hooks for auth and logging.
- A test client for fast, in-process testing with pytest.
- Route parameters with f-string syntax and type convertors.
- Lifespan context managers for startup and shutdown logic.
- Custom exception handlers for clean error responses.
- `GraphQL`_ with Graphene and a built-in GraphiQL IDE.
- Server-Sent Events for real-time streaming.
- File serving with automatic content-type detection.
- Sync and async views — ``async`` is always optional.
- Class-based views with ``on_get``, ``on_post``, ``on_request``.
- Built-in rate limiting with ``X-RateLimit`` headers.
- Structured logging with per-request context.
- Content negotiation: JSON, YAML, and MessagePack.
- A pleasant API with a single import statement.
- OpenAPI schema generation with Swagger UI.
- A production `uvicorn`_ server, ready to deploy.
- HTTP method filtering for REST APIs.
- Route groups for API versioning.
- Signed cookie-based sessions.
- Background tasks in a thread pool.
- WebSocket support.
+6
View File
@@ -376,3 +376,9 @@ jump into the tutorials:
- :doc:`tutorial-rest` — build a full CRUD API with validation
- :doc:`tutorial-sqlalchemy` — connect to a database
- :doc:`tutorial-auth` — add authentication
- :doc:`tutorial-websockets` — real-time communication
- :doc:`tutorial-middleware` — hooks and middleware
- :doc:`tutorial-flask` — migrating from Flask
- :doc:`guide-config` — environment variables and secrets
- :doc:`deployment` — Docker, cloud platforms, and production
- :doc:`testing` — writing tests with pytest
+43 -17
View File
@@ -2,35 +2,61 @@
# Development Sandbox
## Setup
Set up a development sandbox.
Acquire sources and create virtualenv.
Clone the repo and install all dependencies:
```shell
git clone https://github.com/kennethreitz/responder.git
cd responder
uv venv
```
Install project in editable mode, including
all development tools.
```shell
uv venv && source .venv/bin/activate
uv pip install --upgrade --editable '.[develop,docs,release,test]'
```
## Operations
Run tests.
## Running Tests
```shell
source .venv/bin/activate
pytest
pytest # full suite with coverage
pytest tests/test_responder.py -xvs # single file, stop on first failure
pytest -k "test_mount" # run tests matching a pattern
```
Format code.
## Code Formatting
```shell
ruff format .
ruff check --fix .
ruff format . # auto-format
ruff check --fix . # lint and auto-fix
```
Documentation authoring.
## Type Checking
```shell
sphinx-autobuild --open-browser --watch docs/source docs/source docs/build
mypy
```
## Documentation
Live-reloading doc server (opens in browser):
```shell
cd docs
sphinx-autobuild --open-browser --watch source source build
```
Or build once:
```shell
cd docs
make html
# open build/html/index.html
```
## Project Layout
```
responder/
├── responder/ # main package
│ ├── api.py # API class — the entry point
│ ├── routes.py # Router, Route, WebSocketRoute
│ ├── models.py # Request and Response wrappers
│ ├── ext/ # extensions (CLI, GraphQL, OpenAPI, rate limiting)
│ ├── background.py # background task queue
│ └── formats.py # content negotiation (JSON, YAML, msgpack)
├── tests/ # pytest test suite
├── examples/ # runnable example apps
├── docs/source/ # Sphinx documentation
└── pyproject.toml # project metadata and tool config
```
+54
View File
@@ -269,6 +269,57 @@ just like in production. You can verify their effects on the response::
assert r.headers["X-Served-By"] == "responder"
Testing Rate Limiting
---------------------
Rate limiters are just hooks — they run automatically during tests.
Verify the headers and the 429 response::
from responder.ext.ratelimit import RateLimiter
def test_rate_limiting():
api = responder.API(allowed_hosts=["localhost"])
limiter = RateLimiter(requests=2, period=60)
limiter.install(api)
@api.route("/")
def view(req, resp):
resp.text = "ok"
# First two requests succeed
for _ in range(2):
r = api.requests.get("http://localhost/")
assert r.status_code == 200
assert "X-RateLimit-Remaining" in r.headers
# Third request is rate limited
r = api.requests.get("http://localhost/")
assert r.status_code == 429
Testing Mounted Apps
--------------------
When testing WSGI apps mounted at a subroute, use ``localhost`` as the
host to avoid Werkzeug's trusted host validation::
from flask import Flask
def test_flask_mount():
api = responder.API(allowed_hosts=["localhost"])
flask_app = Flask(__name__)
@flask_app.route("/")
def hello():
return "Hello from Flask!"
api.mount("/flask", flask_app)
r = api.requests.get("http://localhost/flask")
assert r.status_code == 200
assert "Hello from Flask" in r.text
Tips
----
@@ -284,3 +335,6 @@ Tips
- **Test the contract, not the implementation.** Assert on status codes,
response bodies, and headers — not on internal state.
- **Use ``localhost`` for mounted WSGI apps.** Werkzeug 3.1.7+ validates
the ``Host`` header, so avoid synthetic hosts like ``;`` in tests.
+143 -2
View File
@@ -416,6 +416,22 @@ Requests to ``/flask/`` will be handled by Flask. Everything else goes
through Responder. Both WSGI and ASGI apps are supported — Responder
wraps WSGI apps in an ASGI adapter automatically.
You can also mount `marimo <https://marimo.io/>`_ notebooks as
interactive dashboards within your API::
import marimo
server = (
marimo.create_asgi_app()
.with_app(path="", root="./notebooks/dashboard.py")
.with_app(path="/analysis", root="./notebooks/analysis.py")
)
api.mount("/notebooks", server.build())
Notebooks are served at ``/notebooks/`` and ``/notebooks/analysis``,
with full interactivity — reactive cells, widgets, plots, and all.
Cookies
-------
@@ -580,6 +596,127 @@ can pace themselves.
The rate limiter is per-client, keyed by IP address.
Structured Logging
------------------
Production applications need structured, searchable logs. Responder
includes built-in logging that automatically attaches request context
— request ID, HTTP method, path, and client IP — to every log message
emitted during request handling::
api = responder.API(enable_logging=True)
This gives you:
- **Access logging** with timing for every request::
2026-03-24 12:00:00 [INFO] responder.access — GET /users → 200 (1.2ms)
- **A logger on the API instance** — use ``api.log`` anywhere in
your routes. Request context (ID, method, path, client IP) is
attached automatically::
@api.route("/users/{user_id:int}")
def get_user(req, resp, *, user_id):
api.log.info("fetching user %d", user_id)
# => [INFO] responder.app -- fetching user 42 [GET /users/42] [req:a1b2c3] [client:10.0.0.1]
resp.media = {"id": user_id}
- **Request IDs** generated automatically (or forwarded from the
``X-Request-ID`` header) and included in responses.
The logging uses Python's standard ``logging`` module, so it works with
any handler — files, syslog, JSON formatters, Datadog, Sentry, whatever
you already use.
For additional loggers (e.g. in helper modules), use ``get_logger``::
from responder.ext.logging import get_logger
logger = get_logger("myapp.db")
You can also access the current request context directly::
from responder.ext.logging import RequestContext
@api.route("/debug")
def debug(req, resp):
resp.media = {
"request_id": RequestContext.get_request_id(),
"client_ip": RequestContext.get_client_ip(),
}
When ``enable_logging=True`` is set, it supersedes ``request_id=True``
— the logging middleware handles request IDs itself, so you don't get
duplicate headers.
Pydantic Validation
-------------------
`Pydantic <https://docs.pydantic.dev/>`_ models integrate directly with
Responder's routing. Set ``request_model`` to validate incoming data and
``response_model`` to control the shape of outgoing data::
from pydantic import BaseModel
class ItemIn(BaseModel):
name: str
price: float
class ItemOut(BaseModel):
id: int
name: str
price: float
@api.route("/items", methods=["POST"],
request_model=ItemIn, response_model=ItemOut)
async def create_item(req, resp):
data = await req.media()
resp.media = {"id": 1, **data}
When ``request_model`` is set:
- Valid requests are parsed and the data is available via ``await req.media()``
- Invalid requests get an automatic ``422 Unprocessable Entity`` response
with detailed error messages — you don't write any validation code
When ``response_model`` is set:
- The response is serialized through the model before being sent
- Extra fields are stripped automatically
- Type coercion happens at the boundary
This is the recommended way to build validated REST APIs with Responder.
See the :doc:`tutorial-rest` for a complete walkthrough.
Content Negotiation
-------------------
Responder automatically negotiates the response format based on the
client's ``Accept`` header. Set ``resp.media`` to a Python object and
the right thing happens:
- ``Accept: application/json`` (default) → JSON
- ``Accept: application/x-yaml`` → YAML
- ``Accept: application/x-msgpack`` → MessagePack
This means a single endpoint serves multiple formats without any
conditional logic in your code::
@api.route("/data")
def data(req, resp):
resp.media = {"key": "value"}
Clients get the format they ask for::
$ curl http://localhost:5042/data
{"key": "value"}
$ curl -H "Accept: application/x-yaml" http://localhost:5042/data
key: value
MessagePack
-----------
@@ -592,6 +729,10 @@ Responder supports MessagePack alongside JSON and YAML::
# Decode a MessagePack request body
data = await req.media("msgpack")
Content negotiation works too — clients can send
# Respond with MessagePack
resp.media = {"result": [1, 2, 3]}
Content negotiation works automatically — clients can send
``Accept: application/x-msgpack`` to receive MessagePack responses
instead of JSON.
instead of JSON. You can also explicitly decode MessagePack request
bodies by passing ``"msgpack"`` to ``req.media()``.
+55 -2
View File
@@ -46,14 +46,14 @@ Install PyJWT::
Create a helper to encode and decode tokens::
import jwt
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
SECRET = "your-secret-key"
def create_token(user_id: int) -> str:
payload = {
"sub": user_id,
"exp": datetime.utcnow() + timedelta(hours=24),
"exp": datetime.now(timezone.utc) + timedelta(hours=24),
}
return jwt.encode(payload, SECRET, algorithm="HS256")
@@ -189,3 +189,56 @@ Remember to set a proper secret key::
The session data is signed (not encrypted) — users can read it but
can't tamper with it. Don't store sensitive data like passwords in
sessions.
Role-Based Access Control
--------------------------
For APIs where different users have different permissions, embed the
role in the token and check it in route-specific guards::
def create_token(user_id: int, role: str = "user") -> str:
payload = {
"sub": user_id,
"role": role,
"exp": datetime.now(timezone.utc) + timedelta(hours=24),
}
return jwt.encode(payload, SECRET, algorithm="HS256")
Create a helper that checks for a specific role::
def require_role(*roles):
"""Before-request hook factory that restricts by role."""
def check(req, resp):
user_role = getattr(req.state, "role", None)
if user_role not in roles:
resp.status_code = 403
resp.media = {"error": "Insufficient permissions"}
return check
Use it on specific routes::
@api.route("/admin/users", before_request=require_role("admin"))
def list_all_users(req, resp):
resp.media = {"users": [...]}
And store the role during token verification::
# In your auth_guard:
req.state.user_id = payload["sub"]
req.state.role = payload.get("role", "user")
Choosing an Auth Strategy
--------------------------
- **API keys** — simplest. Good for server-to-server, CLI tools, and
internal services. No expiration unless you build it.
- **JWT tokens** — standard for SPAs and mobile apps. Stateless, so
they scale well. Downside: you can't revoke them without a blocklist.
- **Sessions** — best for traditional web apps with HTML forms. The
browser manages cookies automatically. Stateful — the server controls
the session lifecycle.
Start with API keys for internal tools, JWT for public APIs, and
sessions for web apps with login pages.
+42 -1
View File
@@ -1,5 +1,5 @@
Writing Middleware
=================
==================
Middleware sits between the server and your route handlers, processing
every request and response that flows through your application. It's the
@@ -117,6 +117,47 @@ the existing stack. Keep this in mind for ordering dependencies — if
middleware A depends on middleware B having run first, add B before A.
Writing Pure ASGI Middleware
----------------------------
For maximum performance and control, you can write middleware as a plain
ASGI application. This bypasses Starlette's ``BaseHTTPMiddleware``
abstraction — it's faster and gives you direct access to the ASGI
protocol::
class SecurityHeadersMiddleware:
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
async def send_with_headers(message):
if message["type"] == "http.response.start":
extra = [
(b"x-content-type-options", b"nosniff"),
(b"x-frame-options", b"DENY"),
(b"referrer-policy", b"strict-origin-when-cross-origin"),
]
message["headers"] = list(message["headers"]) + extra
await send(message)
await self.app(scope, receive, send_with_headers)
api.add_middleware(SecurityHeadersMiddleware)
This is the same pattern used internally by Starlette and uvicorn. The
middleware receives the ASGI ``scope``, ``receive``, and ``send`` callables,
and wraps ``send`` to inject headers into the response.
For most cases, ``BaseHTTPMiddleware`` is simpler and perfectly fine.
Use the pure ASGI approach when you need to handle WebSocket connections,
streaming responses, or want to avoid the overhead of request/response
object creation.
When to Use What
-----------------
+11 -10
View File
@@ -28,8 +28,8 @@ SQLAlchemy models map Python classes to database tables. Each attribute
becomes a column::
# models.py
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy import String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
pass
@@ -37,15 +37,16 @@ becomes a column::
class Book(Base):
__tablename__ = "books"
id = Column(Integer, primary_key=True, autoincrement=True)
title = Column(String, nullable=False)
author = Column(String, nullable=False)
year = Column(Integer, nullable=False)
isbn = Column(String, nullable=True)
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
title: Mapped[str] = mapped_column(String, nullable=False)
author: Mapped[str] = mapped_column(String, nullable=False)
year: Mapped[int] = mapped_column(nullable=False)
isbn: Mapped[str | None] = mapped_column(String, nullable=True)
``DeclarativeBase`` is SQLAlchemy's modern base class (SQLAlchemy 2.0+).
Each model class corresponds to a table, and each ``Column`` corresponds
to a column in that table.
This uses SQLAlchemy 2.0's ``Mapped`` type annotations and
``mapped_column()``, which give you type checker support and cleaner
syntax than the older ``Column()`` style. Each model class corresponds
to a table, and each ``mapped_column()`` corresponds to a column.
Database Setup
+50 -2
View File
@@ -58,6 +58,8 @@ A chat room needs to broadcast messages to all connected clients. We keep
a set of active connections and iterate through them when someone sends
a message::
from starlette.websockets import WebSocketDisconnect
connected = set()
@api.route("/chat", websocket=True)
@@ -70,13 +72,15 @@ a message::
# Broadcast to all connected clients
for client in connected:
await client.send_text(message)
except Exception:
except WebSocketDisconnect:
pass
finally:
connected.discard(ws)
The ``try/finally`` block ensures we remove disconnected clients from
the set, even if the connection drops unexpectedly.
the set, even if the connection drops unexpectedly. Catching
``WebSocketDisconnect`` specifically (rather than bare ``Exception``)
makes the intent clear and avoids swallowing real bugs.
Data Formats
@@ -154,6 +158,40 @@ WebSocket before-request hooks receive the ``ws`` object and must call
``await ws.accept()`` if they want the connection to proceed.
Connection Lifecycle
--------------------
WebSocket connections go through several states:
1. **Connecting** — the client sends an upgrade request
2. **Open** — after ``await ws.accept()``, both sides can send messages
3. **Closing** — either side initiates a close handshake
4. **Closed** — the connection is fully terminated
When a client disconnects (closes the tab, loses network), the next
``await ws.receive_text()`` raises ``WebSocketDisconnect``. Always
handle this — otherwise your server accumulates dead connections::
from starlette.websockets import WebSocketDisconnect
@api.route("/ws", websocket=True)
async def handler(ws):
await ws.accept()
try:
while True:
data = await ws.receive_text()
await ws.send_text(f"Got: {data}")
except WebSocketDisconnect:
print("Client disconnected")
You can also close connections from the server side::
await ws.close(code=1000) # 1000 = normal closure
Common close codes: ``1000`` (normal), ``1001`` (going away),
``1008`` (policy violation), ``1011`` (server error).
Testing WebSockets
------------------
@@ -169,3 +207,13 @@ Use Starlette's ``TestClient`` for WebSocket tests::
The ``websocket_connect`` context manager handles the connection
lifecycle — it connects on enter and disconnects on exit.
You can also test that connections are properly rejected::
from starlette.websockets import WebSocketDisconnect
def test_websocket_404():
client = TestClient(api)
with pytest.raises(WebSocketDisconnect):
with client.websocket_connect("/nonexistent"):
pass
+31
View File
@@ -0,0 +1,31 @@
"""Mount marimo notebooks inside a Responder API.
Requirements:
pip install responder marimo
Usage:
python examples/marimo_mount.py
Then visit:
http://127.0.0.1:5042/hello → Responder JSON endpoint
http://127.0.0.1:5042/notebooks/ → Interactive marimo notebook
"""
import marimo
import responder
api = responder.API()
@api.route("/hello")
def hello(req, resp):
resp.media = {"message": "Hello from Responder!"}
# Mount marimo notebooks at /notebooks
server = marimo.create_asgi_app().with_app(path="", root="notebooks/hello.py")
api.mount("/notebooks", server.build())
if __name__ == "__main__":
api.run()
+9 -3
View File
@@ -1,8 +1,9 @@
# Complete REST API example with Pydantic validation.
# https://responder.kennethreitz.org/tutorial-rest.html
import responder
from pydantic import BaseModel
import responder
class BookIn(BaseModel):
title: str
@@ -35,8 +36,13 @@ def list_books(req, resp):
resp.media = list(books_db.values())
@api.route("/books", methods=["POST"], check_existing=False,
request_model=BookIn, response_model=BookOut)
@api.route(
"/books",
methods=["POST"],
check_existing=False,
request_model=BookIn,
response_model=BookOut,
)
async def create_book(req, resp):
global next_id
data = await req.media()
+4 -2
View File
@@ -1,5 +1,7 @@
# WebSocket chat room example.
# https://responder.kennethreitz.org/tutorial-websockets.html
from starlette.websockets import WebSocketDisconnect
import responder
api = responder.API()
@@ -35,7 +37,7 @@ def index(req, resp):
</script>
</body>
</html>
"""
""" # noqa: E501
@api.route("/chat", websocket=True)
@@ -47,7 +49,7 @@ async def chat(ws):
message = await ws.receive_text()
for client in connected:
await client.send_text(message)
except Exception:
except WebSocketDisconnect:
pass
finally:
connected.discard(ws)
+57 -72
View File
@@ -8,7 +8,7 @@ requires = [
name = "responder"
description = "A familiar HTTP Service Framework for Python."
readme = "README.md"
license = {text = "Apache 2.0"}
license = { text = "Apache 2.0" }
authors = [
{ name = "Kenneth Reitz", email = "me@kennethreitz.org" },
]
@@ -20,18 +20,22 @@ classifiers = [
"License :: OSI Approved :: Apache Software License",
"Operating System :: OS Independent",
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Programming Language :: Python :: 3.14",
"Programming Language :: Python :: 3.15",
"Programming Language :: Python :: Free Threading",
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy",
"Topic :: Internet :: WWW/HTTP",
]
dynamic = ["version"]
dynamic = [ "version" ]
dependencies = [
"a2wsgi",
"apispec>=1.0.0",
"apispec>=1",
"chardet",
"docopt-ng",
"graphene>=3",
@@ -41,17 +45,15 @@ dependencies = [
"pueblo[sfa-full]>=0.0.11",
"pydantic>=2",
"python-multipart",
"starlette[full]>=1.0",
"starlette[full]>=1",
"uvicorn[standard]",
]
[project.optional-dependencies]
develop = [
optional-dependencies.develop = [
"pyproject-fmt",
"ruff",
"validate-pyproject",
]
docs = [
optional-dependencies.docs = [
"alabaster<1.1",
"myst-parser",
"sphinx>=5,<9",
@@ -59,8 +61,8 @@ docs = [
"sphinx-copybutton",
"sphinx-design-elements",
]
release = ["build", "twine"]
test = [
optional-dependencies.release = [ "build", "twine" ]
optional-dependencies.test = [
"flask",
"mypy",
"pytest",
@@ -68,32 +70,22 @@ test = [
"pytest-mock",
"pytest-rerunfailures",
]
urls.Documentation = "https://responder.kennethreitz.org"
urls.Homepage = "https://github.com/kennethreitz/responder"
urls.Issues = "https://github.com/kennethreitz/responder/issues"
urls.Repository = "https://github.com/kennethreitz/responder"
scripts.responder = "responder.ext.cli:cli"
[project.scripts]
responder = "responder.ext.cli:cli"
[project.urls]
Homepage = "https://github.com/kennethreitz/responder"
Documentation = "https://responder.kennethreitz.org"
Repository = "https://github.com/kennethreitz/responder"
Issues = "https://github.com/kennethreitz/responder/issues"
[tool.setuptools.dynamic]
version = {attr = "responder.__version__.__version__"}
[tool.setuptools.package-data]
responder = ["py.typed"]
[tool.setuptools.packages.find]
exclude = ["tests"]
[tool.setuptools]
dynamic.version = { attr = "responder.__version__.__version__" }
package-data.responder = [ "py.typed", "ext/openapi/docs/*.html" ]
packages.find.exclude = [ "tests" ]
[tool.ruff]
line-length = 90
extend-exclude = [
"docs/source/conf.py",
]
lint.select = [
# Builtins
"A",
@@ -121,59 +113,20 @@ lint.select = [
# flake8-2020
"YTT",
]
lint.extend-ignore = [
"S101", # Allow use of `assert`.
"S101", # Allow use of `assert`.
]
lint.per-file-ignores."responder/util/cmd.py" = [ "A005" ] # Module shadows a Python standard-library module
lint.per-file-ignores."responder/util/cmd.py" = [ "A005" ] # Module shadows a Python standard-library module
lint.per-file-ignores."tests/*" = [
"ERA001", # Found commented-out code.
"S101", # Allow use of `assert`, and `print`.
]
[tool.pytest.ini_options]
addopts = """
-rfEXs -p pytester --strict-markers --verbosity=3
--cov --cov-report=term-missing --cov-report=xml
"""
filterwarnings = [
"error::UserWarning",
]
log_level = "DEBUG"
log_cli_level = "DEBUG"
log_format = "%(asctime)-15s [%(name)-36s] %(levelname)-8s: %(message)s"
minversion = "2.0"
testpaths = [
"responder",
"tests",
]
markers = [
]
xfail_strict = true
[tool.coverage.run]
branch = false
omit = [
"*.html",
"tests/*",
]
[tool.coverage.report]
fail_under = 0
show_missing = true
exclude_lines = [
"# pragma: no cover",
"raise NotImplemented",
]
[tool.mypy]
packages = [
"responder",
]
exclude = [
]
exclude = []
check_untyped_defs = true
explicit_package_bases = true
ignore_missing_imports = true
@@ -181,3 +134,35 @@ implicit_optional = true
install_types = true
namespace_packages = true
non_interactive = true
[tool.pytest]
ini_options.addopts = """
-rfEXs -p pytester --strict-markers --verbosity=3
--cov --cov-report=term-missing --cov-report=xml
"""
ini_options.filterwarnings = [
"error::UserWarning",
]
ini_options.log_level = "DEBUG"
ini_options.log_cli_level = "DEBUG"
ini_options.log_format = "%(asctime)-15s [%(name)-36s] %(levelname)-8s: %(message)s"
ini_options.minversion = "2.0"
ini_options.testpaths = [
"responder",
"tests",
]
ini_options.markers = []
ini_options.xfail_strict = true
[tool.coverage]
run.branch = false
run.omit = [
"*.html",
"tests/*",
]
report.exclude_lines = [
"# pragma: no cover",
"raise NotImplemented",
]
report.fail_under = 0
report.show_missing = true
+1 -1
View File
@@ -1 +1 @@
__version__ = "3.4.0"
__version__ = "3.6.0"
+21 -6
View File
@@ -1,4 +1,5 @@
import asyncio
import inspect
import os
from pathlib import Path
@@ -60,6 +61,7 @@ class API:
openapi_theme=DEFAULT_OPENAPI_THEME,
lifespan=None,
request_id=False,
enable_logging=False,
):
"""Create a new Responder API instance.
@@ -85,6 +87,7 @@ class API:
:param openapi_theme: Documentation UI theme: ``"swagger_ui"``, ``"redoc"``, ``"rapidoc"``, or ``"elements"``.
:param lifespan: An async context manager for startup/shutdown logic.
:param request_id: If ``True``, add ``X-Request-ID`` headers to all responses.
:param enable_logging: If ``True``, enable structured logging with per-request context (request ID, method, path, client IP).
""" # noqa: E501
self.background = BackgroundQueue()
@@ -157,17 +160,29 @@ class API:
self.templates = Templates(directory=templates_dir)
if request_id:
if request_id and not enable_logging:
import uuid as _uuid
def _add_request_id(req, resp):
rid = req.headers.get(
"X-Request-ID", str(_uuid.uuid4())
)
rid = req.headers.get("X-Request-ID", str(_uuid.uuid4()))
resp.headers["X-Request-ID"] = rid
self.router.after_request(_add_request_id)
if enable_logging:
import logging as _logging
from .ext.logging import LoggingMiddleware, get_logger, setup_logging
log_level = _logging.DEBUG if debug else _logging.INFO
setup_logging(level=log_level)
self.add_middleware(LoggingMiddleware)
self.log = get_logger("responder.app")
else:
import logging as _logging
self.log = _logging.getLogger("responder.app")
@property
def requests(self):
"""A test client connected to the ASGI app. Lazily initialized."""
@@ -258,7 +273,7 @@ class API:
req = Request(request.scope, request.receive, formats=get_formats())
resp = Response(req=req, formats=get_formats())
if asyncio.iscoroutinefunction(func):
if inspect.iscoroutinefunction(func):
await func(req, resp, exc)
else:
func(req, resp, exc)
@@ -562,7 +577,7 @@ class API:
"""Run the application. Shorthand for :meth:`serve` that inherits the ``debug`` setting.
:param kwargs: Keyword arguments passed through to :meth:`serve`.
"""
""" # noqa: E501
if "debug" not in kwargs:
kwargs.update({"debug": self.debug})
self.serve(**kwargs)
+2 -1
View File
@@ -1,5 +1,6 @@
import asyncio
import concurrent.futures
import inspect
import multiprocessing
import traceback
@@ -76,6 +77,6 @@ class BackgroundQueue:
return do_task
async def __call__(self, func, *args, **kwargs) -> None:
if asyncio.iscoroutinefunction(func):
if inspect.iscoroutinefunction(func):
return await asyncio.create_task(func(*args, **kwargs))
return await run_in_threadpool(func, *args, **kwargs)
+47 -2
View File
@@ -4,12 +4,43 @@ from .templates import GRAPHIQL
class GraphQLView:
"""A class-based view that serves a GraphQL API.
Handles query resolution from multiple sources (JSON body, query
parameters, raw request text) and renders the GraphiQL IDE for
browser requests.
:param api: The Responder API instance.
:param schema: A Graphene schema instance.
"""
def __init__(self, *, api, schema):
self.api = api
self.schema = schema
@staticmethod
def _parse_variables(raw):
"""Parse variables from a string (query param) or return as-is (dict)."""
if raw is None:
return None
if isinstance(raw, str):
try:
return json.loads(raw)
except (json.JSONDecodeError, ValueError):
return None
return raw
@staticmethod
async def _resolve_graphql_query(req, resp):
"""Extract query, variables, and operationName from the request.
Supports multiple input sources, checked in order:
1. JSON body (``Content-Type: application/json``)
2. Form data (``Content-Type: application/x-www-form-urlencoded``)
3. Query parameters (``?query=...&variables=...&operationName=...``)
4. Raw request text
"""
if "json" in req.mimetype:
json_media = await req.media("json")
if "query" not in json_media:
@@ -22,9 +53,22 @@ class GraphQLView:
json_media.get("operationName"),
)
# Support query/q in params.
if "form" in req.mimetype:
form_data = await req.media("form")
if "query" in form_data:
return (
form_data["query"],
GraphQLView._parse_variables(form_data.get("variables")),
form_data.get("operationName"),
)
# Support query/variables/operationName in query params.
if "query" in req.params:
return req.params["query"], None, None
return (
req.params["query"],
GraphQLView._parse_variables(req.params.get("variables")),
req.params.get("operationName"),
)
if "q" in req.params:
return req.params["q"], None, None
@@ -32,6 +76,7 @@ class GraphQLView:
return await req.text, None, None
async def graphql_response(self, req, resp):
"""Process a GraphQL request and populate the response."""
show_graphiql = req.method == "get" and req.accepts("text/html")
if show_graphiql:
+181
View File
@@ -0,0 +1,181 @@
"""Structured logging with per-request context.
Provides a logging setup that automatically includes request metadata
(request ID, method, path, client IP) in every log message emitted
during request handling.
Usage::
api = responder.API(enable_logging=True)
# In any route or middleware:
from responder.ext.logging import get_logger
logger = get_logger(__name__)
@api.route("/")
def index(req, resp):
logger.info("handling request")
# => 2026-03-24 12:00:00 [INFO] app — handling request [GET /] [req:abc123] [client:127.0.0.1]
"""
from __future__ import annotations
import logging
import time
import uuid
from contextvars import ContextVar
__all__ = ["get_logger", "RequestContext", "RequestContextFilter", "LoggingMiddleware"]
# Context variables for per-request metadata.
_request_id: ContextVar[str] = ContextVar("request_id", default="-")
_request_method: ContextVar[str] = ContextVar("request_method", default="-")
_request_path: ContextVar[str] = ContextVar("request_path", default="-")
_client_ip: ContextVar[str] = ContextVar("client_ip", default="-")
class RequestContext:
"""Read-only access to the current request's logging context."""
@staticmethod
def get_request_id() -> str:
return _request_id.get()
@staticmethod
def get_method() -> str:
return _request_method.get()
@staticmethod
def get_path() -> str:
return _request_path.get()
@staticmethod
def get_client_ip() -> str:
return _client_ip.get()
class RequestContextFilter(logging.Filter):
"""A logging filter that injects request context into log records.
Adds ``request_id``, ``request_method``, ``request_path``, and
``client_ip`` attributes to every log record, so they can be used
in format strings.
"""
def filter(self, record: logging.LogRecord) -> bool:
record.request_id = _request_id.get() # type: ignore[attr-defined]
record.request_method = _request_method.get() # type: ignore[attr-defined]
record.request_path = _request_path.get() # type: ignore[attr-defined]
record.client_ip = _client_ip.get() # type: ignore[attr-defined]
return True
# Default format that includes request context.
DEFAULT_LOG_FORMAT = (
"%(asctime)s [%(levelname)s] %(name)s -- %(message)s "
"[%(request_method)s %(request_path)s] "
"[req:%(request_id)s] [client:%(client_ip)s]"
)
def get_logger(name: str | None = None) -> logging.Logger:
"""Get a logger with the request context filter attached.
:param name: Logger name (typically ``__name__``).
:returns: A :class:`logging.Logger` with request context available.
"""
logger = logging.getLogger(name)
# Avoid adding duplicate filters.
if not any(isinstance(f, RequestContextFilter) for f in logger.filters):
logger.addFilter(RequestContextFilter())
return logger
def setup_logging(level: int = logging.INFO) -> None:
"""Configure the root logger with request-context-aware formatting.
:param level: The logging level (default ``INFO``).
"""
root = logging.getLogger()
root.setLevel(level)
# Only add our handler if the root logger has no handlers yet,
# or if none of them use our filter.
has_context_handler = any(
any(isinstance(f, RequestContextFilter) for f in h.filters)
for h in root.handlers
)
if not has_context_handler:
handler = logging.StreamHandler()
handler.setLevel(level)
handler.addFilter(RequestContextFilter())
handler.setFormatter(logging.Formatter(DEFAULT_LOG_FORMAT))
root.addHandler(handler)
class LoggingMiddleware:
"""ASGI middleware that sets per-request context variables.
For each HTTP request, this middleware:
1. Extracts or generates a request ID
2. Sets context variables for method, path, and client IP
3. Logs the request and response with timing information
"""
def __init__(self, app, logger_name: str = "responder.access"):
self.app = app
self.logger = get_logger(logger_name)
async def __call__(self, scope, receive, send):
if scope["type"] not in ("http", "websocket"):
await self.app(scope, receive, send)
return
# Extract request metadata.
headers = dict(scope.get("headers", []))
request_id = (
headers.get(b"x-request-id", b"").decode() or uuid.uuid4().hex[:8]
)
method = scope.get("method", "WS")
path = scope.get("path", "/")
client = scope.get("client")
client_ip = client[0] if client else "-"
# Set context variables for the duration of this request.
tok_id = _request_id.set(request_id)
tok_method = _request_method.set(method)
tok_path = _request_path.set(path)
tok_ip = _client_ip.set(client_ip)
# Track response status.
status_code = None
async def send_wrapper(message):
nonlocal status_code
if message["type"] == "http.response.start":
status_code = message.get("status")
# Inject request ID into response headers.
headers_list = list(message.get("headers", []))
headers_list.append((b"x-request-id", request_id.encode()))
message = {**message, "headers": headers_list}
await send(message)
start = time.perf_counter()
try:
await self.app(scope, receive, send_wrapper)
finally:
duration_ms = (time.perf_counter() - start) * 1000
if scope["type"] == "http":
self.logger.info(
"%s %s%s (%.1fms)",
method,
path,
status_code or "?",
duration_ms,
)
# Reset context variables.
_request_id.reset(tok_id)
_request_method.reset(tok_method)
_request_path.reset(tok_path)
_client_ip.reset(tok_ip)
+3 -1
View File
@@ -129,7 +129,9 @@ class OpenAPISchema:
op["requestBody"] = {
"content": {
"application/json": {
"schema": {"$ref": f"#/components/schemas/{model_name}"}
"schema": {
"$ref": f"#/components/schemas/{model_name}"
}
}
}
}
+1 -3
View File
@@ -38,9 +38,7 @@ class RateLimiter:
def _cleanup(self, key):
now = time.time()
cutoff = now - self.period
self._buckets[key] = [
t for t in self._buckets[key] if t > cutoff
]
self._buckets[key] = [t for t in self._buckets[key] if t > cutoff]
def check(self, req, resp):
"""Check rate limit. Sets 429 status if exceeded."""
+81 -56
View File
@@ -1,14 +1,18 @@
from __future__ import annotations
import asyncio
import inspect
import re
import traceback
from collections import defaultdict
from collections.abc import Callable
from typing import Any, Union
__all__ = ["Route", "WebSocketRoute", "Router"]
from starlette.concurrency import run_in_threadpool
from starlette.exceptions import HTTPException
from starlette.types import ASGIApp
from starlette.types import ASGIApp, Receive, Scope, Send
from starlette.websockets import WebSocket, WebSocketClose
from . import status_codes
@@ -28,9 +32,9 @@ _CONVERTORS = {
PARAM_RE = re.compile("{([a-zA-Z_][a-zA-Z0-9_]*)(:[a-zA-Z_][a-zA-Z0-9_]*)?}")
def compile_path(path):
def compile_path(path: str) -> tuple[re.Pattern, dict[str, type]]:
path_re = "^"
param_convertors = {}
param_convertors: dict[str, type] = {}
idx = 0
for match in PARAM_RE.finditer(path):
@@ -54,10 +58,10 @@ def compile_path(path):
class BaseRoute:
def matches(self, scope):
def matches(self, scope: Scope) -> tuple[bool, dict]:
raise NotImplementedError()
async def __call__(self, scope, receive, send):
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
raise NotImplementedError()
@@ -68,32 +72,41 @@ class Route(BaseRoute):
``{pk:uuid}``, ``{value:float}``, ``{rest:path}``).
"""
def __init__(self, route, endpoint, *, before_request=False, methods=None):
def __init__(
self,
route: str,
endpoint: Callable,
*,
before_request: bool = False,
methods: list[str] | None = None,
) -> None:
assert route.startswith("/"), "Route path must start with '/'"
self.route = route
self.endpoint = endpoint
self.before_request = before_request
self.methods = {m.upper() for m in methods} if methods else None
self.methods: set[str] | None = {m.upper() for m in methods} if methods else None
self.path_re: re.Pattern
self.param_convertors: dict[str, type]
self.path_re, self.param_convertors = compile_path(route)
# Strip type annotations for URL generation (e.g. {id:int} -> {id})
self._url_template = PARAM_RE.sub(r"{\1}", route)
def __repr__(self):
def __repr__(self) -> str:
return f"<Route {self.route!r}={self.endpoint!r}>"
def url(self, **params):
def url(self, **params: Any) -> str:
return self._url_template.format(**params)
@property
def endpoint_name(self):
def endpoint_name(self) -> str:
return self.endpoint.__name__
@property
def description(self):
def description(self) -> str | None:
return self.endpoint.__doc__
def matches(self, scope):
def matches(self, scope: Scope) -> tuple[bool, dict]:
if scope["type"] != "http":
return False, {}
@@ -112,7 +125,7 @@ class Route(BaseRoute):
return True, {"path_params": {**matched_params}}
async def __call__(self, scope, receive, send):
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
request = Request(scope, receive, formats=get_formats())
response = Response(req=request, formats=get_formats())
@@ -120,7 +133,7 @@ class Route(BaseRoute):
before_requests = scope.get("before_requests", [])
for before_request in before_requests.get("http", []):
if asyncio.iscoroutinefunction(before_request):
if inspect.iscoroutinefunction(before_request):
await before_request(request, response)
else:
await run_in_threadpool(before_request, request, response)
@@ -166,7 +179,7 @@ class Route(BaseRoute):
for view in views:
# Check __call__ for class-based views (e.g. GraphQL)
if asyncio.iscoroutinefunction(view) or asyncio.iscoroutinefunction(
if inspect.iscoroutinefunction(view) or inspect.iscoroutinefunction(
view.__call__
):
await view(request, response, **path_params)
@@ -179,13 +192,13 @@ class Route(BaseRoute):
try:
validated = resp_model(**response.media)
response.media = validated.model_dump()
except Exception:
except (ValueError, TypeError):
pass # Don't break the response if serialization fails
# Run after-request hooks
after_requests = scope.get("after_requests", [])
for after_request in after_requests:
if asyncio.iscoroutinefunction(after_request):
if inspect.iscoroutinefunction(after_request):
await after_request(request, response)
else:
await run_in_threadpool(after_request, request, response)
@@ -195,40 +208,46 @@ class Route(BaseRoute):
await response(scope, receive, send)
def __eq__(self, other):
def __eq__(self, other: object) -> bool:
if not isinstance(other, Route):
return NotImplemented
return self.route == other.route and self.endpoint == other.endpoint
def __hash__(self):
def __hash__(self) -> int:
return hash(self.route) ^ hash(self.endpoint) ^ hash(self.before_request)
class WebSocketRoute(BaseRoute):
"""A WebSocket route that maps a URL pattern to a WebSocket handler."""
def __init__(self, route, endpoint, *, before_request=False):
def __init__(
self, route: str, endpoint: Callable, *, before_request: bool = False
) -> None:
assert route.startswith("/"), "Route path must start with '/'"
self.route = route
self.endpoint = endpoint
self.before_request = before_request
self.path_re: re.Pattern
self.param_convertors: dict[str, type]
self.path_re, self.param_convertors = compile_path(route)
self._url_template = PARAM_RE.sub(r"{\1}", route)
def __repr__(self):
def __repr__(self) -> str:
return f"<Route {self.route!r}={self.endpoint!r}>"
def url(self, **params):
def url(self, **params: Any) -> str:
return self._url_template.format(**params)
@property
def endpoint_name(self):
def endpoint_name(self) -> str:
return self.endpoint.__name__
@property
def description(self):
def description(self) -> str | None:
return self.endpoint.__doc__
def matches(self, scope):
def matches(self, scope: Scope) -> tuple[bool, dict]:
if scope["type"] != "websocket":
return False, {}
@@ -244,7 +263,7 @@ class WebSocketRoute(BaseRoute):
return True, {"path_params": {**matched_params}}
async def __call__(self, scope, receive, send):
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
ws = WebSocket(scope, receive, send)
before_requests = scope.get("before_requests", [])
@@ -253,10 +272,12 @@ class WebSocketRoute(BaseRoute):
await self.endpoint(ws)
def __eq__(self, other):
def __eq__(self, other: object) -> bool:
if not isinstance(other, WebSocketRoute):
return NotImplemented
return self.route == other.route and self.endpoint == other.endpoint
def __hash__(self):
def __hash__(self) -> int:
return hash(self.route) ^ hash(self.endpoint) ^ hash(self.before_request)
@@ -268,32 +289,36 @@ class Router:
"""
def __init__(
self, routes=None, default_response=None, before_requests=None, lifespan=None
):
self.routes = [] if routes is None else list(routes)
self,
routes: list[BaseRoute] | None = None,
default_response: Callable | None = None,
before_requests: dict[str, list[Callable]] | None = None,
lifespan: Callable | None = None,
) -> None:
self.routes: list[BaseRoute] = [] if routes is None else list(routes)
self.apps: dict[str, ASGIApp] = {}
self.default_endpoint = (
self.apps: dict[str, Union[ASGIApp, Any]] = {}
self.default_endpoint: Callable = (
self.default_response if default_response is None else default_response
)
self.before_requests = (
self.before_requests: dict[str, list[Callable]] = (
{"http": [], "ws": []} if before_requests is None else before_requests
)
self.after_requests: list = []
self.events = defaultdict(list)
self.after_requests: list[Callable] = []
self.events: defaultdict[str, list[Callable]] = defaultdict(list)
self._lifespan_handler = lifespan
def add_route(
self,
route=None,
endpoint=None,
route: str | None = None,
endpoint: Callable | None = None,
*,
default=False,
websocket=False,
before_request=False,
check_existing=False,
methods=None,
):
default: bool = False,
websocket: bool = False,
before_request: bool = False,
check_existing: bool = False,
methods: list[str] | None = None,
) -> None:
"""Adds a route to the router.
:param route: A string representation of the route
:param endpoint: The endpoint for the route -- can be callable, or class.
@@ -322,40 +347,40 @@ class Router:
self.routes.append(route)
def mount(self, route, app):
def mount(self, route: str, app: Any) -> None:
"""Mounts ASGI / WSGI applications at a given route"""
self.apps.update({route: app})
def add_event_handler(self, event_type, handler):
def add_event_handler(self, event_type: str, handler: Callable) -> None:
assert event_type in (
"startup",
"shutdown",
), f"Only 'startup' and 'shutdown' events are supported, not {event_type}."
self.events[event_type].append(handler)
async def trigger_event(self, event_type):
async def trigger_event(self, event_type: str) -> None:
for handler in self.events.get(event_type, []):
if asyncio.iscoroutinefunction(handler):
if inspect.iscoroutinefunction(handler):
await handler()
else:
handler()
def before_request(self, endpoint, websocket=False):
def before_request(self, endpoint: Callable, websocket: bool = False) -> None:
if websocket:
self.before_requests.setdefault("ws", []).append(endpoint)
else:
self.before_requests.setdefault("http", []).append(endpoint)
def after_request(self, endpoint):
def after_request(self, endpoint: Callable) -> None:
self.after_requests.append(endpoint)
def url_for(self, endpoint, **params):
def url_for(self, endpoint: Callable | str, **params: Any) -> str | None:
for route in self.routes:
if endpoint in (route.endpoint, route.endpoint.__name__):
return route.url(**params)
return None
async def default_response(self, scope, receive, send):
async def default_response(self, scope: Scope, receive: Receive, send: Send) -> None:
if scope["type"] == "websocket":
websocket_close = WebSocketClose()
await websocket_close(scope, receive, send)
@@ -366,7 +391,7 @@ class Router:
raise HTTPException(status_code=status_codes.HTTP_404) # type: ignore[attr-defined]
def _resolve_route(self, scope):
def _resolve_route(self, scope: Scope) -> BaseRoute | None:
for route in self.routes:
matches, child_scope = route.matches(scope)
if matches:
@@ -374,7 +399,7 @@ class Router:
return route
return None
async def lifespan(self, scope, receive, send):
async def lifespan(self, scope: Scope, receive: Receive, send: Send) -> None:
message = await receive()
assert message["type"] == "lifespan.startup"
@@ -409,7 +434,7 @@ class Router:
await send({"type": "lifespan.shutdown.complete"})
async def __call__(self, scope, receive, send):
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
assert scope["type"] in ("http", "websocket", "lifespan")
if scope["type"] == "lifespan":
@@ -432,7 +457,7 @@ class Router:
# Call into a submounted app, if one exists.
for path_prefix, app in self.apps.items():
if path.startswith(path_prefix):
scope["path"] = path[len(path_prefix) :]
scope["path"] = path[len(path_prefix) :] or "/"
scope["root_path"] = root_path + path_prefix
try:
await app(scope, receive, send)
+6 -1
View File
@@ -60,7 +60,12 @@ def test_cli_version(capfd):
)
stdout = capfd.readouterr().out.strip()
assert stdout == __version__
# TODO: Accommodate PyPy as installed by `uv`, it emits spurious output on stdout before the version number.
# AssertionError: assert '(_common_types_metatype, 9088, 128, 128)\n(cython_function_or_method, 157568, 128, 128)\n3.6.0' == '3.6.0'
lines = [line.strip() for line in stdout.splitlines() if line.strip()]
assert lines, "Expected version output on stdout"
assert lines[-1] == __version__
def responder_build(path: Path, capfd: CaptureFixture) -> t.Tuple[str, str]:
+63 -11
View File
@@ -4,14 +4,14 @@ import time
import pytest
from starlette.testclient import TestClient as StarletteTestClient
from starlette.websockets import WebSocketDisconnect
import responder
from responder.background import BackgroundQueue
from responder.models import CaseInsensitiveDict, QueryDict, Response
from responder.models import QueryDict
from responder.routes import Route, WebSocketRoute
from responder.templates import Templates
# --- api.py coverage ---
@@ -78,7 +78,10 @@ def test_background_task_exception(capsys):
raise ValueError("task failed")
future = failing_task()
future.result # wait for completion
try:
future.result() # wait for completion
except ValueError:
pass
time.sleep(0.2) # let the done callback fire
captured = capsys.readouterr()
@@ -302,7 +305,7 @@ def test_yaml_content_negotiation(api):
def test_websocket_404(api):
"""Lines 308-310: WebSocket to unknown route gets closed."""
client = StarletteTestClient(api)
with pytest.raises(Exception):
with pytest.raises(WebSocketDisconnect):
with client.websocket_connect("ws://;/nonexistent"):
pass
@@ -325,9 +328,7 @@ def test_websocket_route_params():
pass
route = WebSocketRoute("/ws/{room_id:int}", handler)
matches, scope = route.matches(
{"type": "websocket", "path": "/ws/42"}
)
matches, scope = route.matches({"type": "websocket", "path": "/ws/42"})
assert matches is True
assert scope["path_params"] == {"room_id": 42}
@@ -591,7 +592,10 @@ def test_pydantic_schema():
from pydantic import BaseModel
api = responder.API(
title="Test", version="1.0", openapi="3.0.2", allowed_hosts=[";"],
title="Test",
version="1.0",
openapi="3.0.2",
allowed_hosts=[";"],
)
@api.schema("Pet")
@@ -611,7 +615,10 @@ def test_pydantic_request_response_models():
from pydantic import BaseModel
api = responder.API(
title="Test", version="1.0", openapi="3.0.2", allowed_hosts=[";"],
title="Test",
version="1.0",
openapi="3.0.2",
allowed_hosts=[";"],
)
class ItemIn(BaseModel):
@@ -623,8 +630,7 @@ def test_pydantic_request_response_models():
name: str
price: float
@api.route("/items", methods=["POST"],
request_model=ItemIn, response_model=ItemOut)
@api.route("/items", methods=["POST"], request_model=ItemIn, response_model=ItemOut)
async def create(req, resp):
data = await req.media()
resp.media = {"id": 1, **data}
@@ -660,3 +666,49 @@ def test_templates_context(tmp_path):
result = templates.render("test.html")
assert "hello" in result
assert "world" in result
def test_static_file_serving(tmp_path):
"""Verify static files are served correctly from the static directory."""
static_dir = tmp_path / "static"
static_dir.mkdir()
(static_dir / "style.css").write_text("body { color: red; }")
(static_dir / "app.js").write_text("console.log('hello');")
api = responder.API(
static_dir=str(static_dir),
static_route="/static",
allowed_hosts=[";"],
)
# CSS file served with correct content
r = api.requests.get("http://;/static/style.css")
assert r.status_code == 200
assert "body { color: red; }" in r.text
assert "text/css" in r.headers.get("content-type", "")
# JS file served with correct content
r = api.requests.get("http://;/static/app.js")
assert r.status_code == 200
assert "console.log" in r.text
# Missing file returns 404
r = api.requests.get("http://;/static/missing.txt")
assert r.status_code == 404
def test_static_index_fallback(tmp_path):
"""Verify static index.html is served as default route."""
static_dir = tmp_path / "static"
static_dir.mkdir()
(static_dir / "index.html").write_text("<h1>SPA</h1>")
api = responder.API(
static_dir=str(static_dir),
allowed_hosts=[";"],
)
api.add_route("/", static=True)
r = api.requests.get("http://;/")
assert r.status_code == 200
assert "<h1>SPA</h1>" in r.text
+171
View File
@@ -1,4 +1,6 @@
# ruff: noqa: E402
import json
import pytest
graphene = pytest.importorskip("graphene")
@@ -17,6 +19,45 @@ def schema():
return graphene.Schema(query=Query)
@pytest.fixture
def mutation_schema():
class Query(graphene.ObjectType):
hello = graphene.String(name=graphene.String(default_value="stranger"))
def resolve_hello(self, info, name):
return f"Hello {name}"
class CreateUser(graphene.Mutation):
class Arguments:
name = graphene.String(required=True)
ok = graphene.Boolean()
name = graphene.String()
def mutate(self, info, name):
return CreateUser(ok=True, name=name)
class Mutation(graphene.ObjectType):
create_user = CreateUser.Field()
return graphene.Schema(query=Query, mutation=Mutation)
@pytest.fixture
def multi_op_schema():
class Query(graphene.ObjectType):
hello = graphene.String(name=graphene.String(default_value="stranger"))
goodbye = graphene.String(name=graphene.String(default_value="stranger"))
def resolve_hello(self, info, name):
return f"Hello {name}"
def resolve_goodbye(self, info, name):
return f"Goodbye {name}"
return graphene.Schema(query=Query)
def test_graphql_schema_query_querying(api, schema):
api.add_route("/", GraphQLView(schema=schema, api=api))
r = api.requests.get("http://;/?q={ hello }", headers={"Accept": "json"})
@@ -63,3 +104,133 @@ def test_graphql_error_response(api, schema):
api.add_route("/", GraphQLView(schema=schema, api=api))
r = api.requests.post("http://;/", json={"query": "{ nonexistent }"})
assert "errors" in r.json()
def test_graphql_variables_json(api, schema):
"""Variables passed via JSON body."""
api.add_route("/", GraphQLView(schema=schema, api=api))
r = api.requests.post(
"http://;/",
json={
"query": "query Hello($name: String!) { hello(name: $name) }",
"variables": {"name": "Alice"},
},
)
assert r.json() == {"data": {"hello": "Hello Alice"}}
def test_graphql_variables_query_param(api, schema):
"""Variables passed as JSON string in query parameter."""
api.add_route("/", GraphQLView(schema=schema, api=api))
variables = json.dumps({"name": "Bob"})
r = api.requests.get(
f"http://;/?query=query Hello($name: String!) "
f"{{ hello(name: $name) }}&variables={variables}",
headers={"Accept": "json"},
)
assert r.json() == {"data": {"hello": "Hello Bob"}}
def test_graphql_operation_name_json(api, multi_op_schema):
"""operationName selects which operation to run."""
api.add_route("/", GraphQLView(schema=multi_op_schema, api=api))
query = """
query SayHello { hello }
query SayGoodbye { goodbye }
"""
r = api.requests.post(
"http://;/",
json={
"query": query,
"operationName": "SayHello",
},
)
data = r.json()
assert data["data"]["hello"] == "Hello stranger"
def test_graphql_operation_name_query_param(api, multi_op_schema):
"""operationName via query parameter."""
api.add_route("/", GraphQLView(schema=multi_op_schema, api=api))
query = "query SayHello { hello } query SayGoodbye { goodbye }"
r = api.requests.get(
f"http://;/?query={query}&operationName=SayGoodbye",
headers={"Accept": "json"},
)
data = r.json()
assert data["data"]["goodbye"] == "Goodbye stranger"
def test_graphql_mutation(api, mutation_schema):
"""Mutations work via JSON body."""
api.add_route("/", GraphQLView(schema=mutation_schema, api=api))
r = api.requests.post(
"http://;/",
json={
"query": 'mutation { createUser(name: "Eve") { ok name } }',
},
)
data = r.json()
assert data["data"]["createUser"]["ok"] is True
assert data["data"]["createUser"]["name"] == "Eve"
def test_graphql_mutation_with_variables(api, mutation_schema):
"""Mutations with variables."""
api.add_route("/", GraphQLView(schema=mutation_schema, api=api))
r = api.requests.post(
"http://;/",
json={
"query": "mutation CreateUser($name: String!) "
"{ createUser(name: $name) { ok name } }",
"variables": {"name": "Frank"},
},
)
data = r.json()
assert data["data"]["createUser"]["ok"] is True
assert data["data"]["createUser"]["name"] == "Frank"
def test_graphql_context_access(api):
"""Resolvers can access request and response via info.context."""
class Query(graphene.ObjectType):
method = graphene.String()
def resolve_method(self, info):
return info.context["request"].method
schema = graphene.Schema(query=Query)
api.add_route("/", GraphQLView(schema=schema, api=api))
r = api.requests.post("http://;/", json={"query": "{ method }"})
assert r.json() == {"data": {"method": "post"}}
def test_graphql_malformed_query(api, schema):
"""Malformed GraphQL syntax returns errors."""
api.add_route("/", GraphQLView(schema=schema, api=api))
r = api.requests.post("http://;/", json={"query": "{ this is not valid"})
data = r.json()
assert "errors" in data
assert len(data["errors"]) > 0
def test_graphql_raw_text_query(api, schema):
"""Query sent as raw text body."""
api.add_route("/", GraphQLView(schema=schema, api=api))
r = api.requests.post(
"http://;/",
content=b"{ hello }",
headers={"Content-Type": "text/plain"},
)
assert r.json() == {"data": {"hello": "Hello stranger"}}
def test_graphql_invalid_variables_query_param(api, schema):
"""Invalid JSON in variables query param is treated as None."""
api.add_route("/", GraphQLView(schema=schema, api=api))
r = api.requests.get(
"http://;/?query={ hello }&variables=not-json",
headers={"Accept": "json"},
)
assert r.json() == {"data": {"hello": "Hello stranger"}}
+158
View File
@@ -0,0 +1,158 @@
"""Tests for structured logging with request context."""
import logging
import responder
from responder.ext.logging import (
LoggingMiddleware,
RequestContext,
RequestContextFilter,
get_logger,
)
def test_logging_middleware_sets_request_id():
"""LoggingMiddleware adds X-Request-ID to responses."""
api = responder.API(allowed_hosts=["localhost"], enable_logging=True)
@api.route("/")
def index(req, resp):
resp.text = "ok"
r = api.requests.get("http://localhost/")
assert r.status_code == 200
assert "x-request-id" in r.headers
assert len(r.headers["x-request-id"]) > 0
def test_logging_middleware_forwards_request_id():
"""LoggingMiddleware forwards client-provided X-Request-ID."""
api = responder.API(allowed_hosts=["localhost"], enable_logging=True)
@api.route("/")
def index(req, resp):
resp.text = "ok"
r = api.requests.get(
"http://localhost/", headers={"X-Request-ID": "custom-id-123"}
)
assert r.headers["x-request-id"] == "custom-id-123"
def test_logging_context_available_in_route():
"""Request context is available inside route handlers."""
api = responder.API(allowed_hosts=["localhost"], enable_logging=True)
captured = {}
@api.route("/ctx")
def index(req, resp):
captured["request_id"] = RequestContext.get_request_id()
captured["method"] = RequestContext.get_method()
captured["path"] = RequestContext.get_path()
captured["client_ip"] = RequestContext.get_client_ip()
resp.text = "ok"
api.requests.get("http://localhost/ctx")
assert captured["method"] == "GET"
assert captured["path"] == "/ctx"
assert captured["request_id"] != "-"
assert captured["client_ip"] != "-"
def test_logging_filter_injects_attributes():
"""RequestContextFilter adds context fields to log records."""
logger = get_logger("test.filter")
records = []
class CaptureHandler(logging.Handler):
def emit(self, record):
records.append(record)
handler = CaptureHandler()
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
api = responder.API(allowed_hosts=["localhost"], enable_logging=True)
@api.route("/log")
def index(req, resp):
logger.info("test message")
resp.text = "ok"
api.requests.get("http://localhost/log")
logger.removeHandler(handler)
assert len(records) > 0
record = records[0]
assert hasattr(record, "request_id")
assert hasattr(record, "request_method")
assert hasattr(record, "request_path")
assert hasattr(record, "client_ip")
assert record.request_method == "GET"
assert record.request_path == "/log"
def test_get_logger_avoids_duplicate_filters():
"""get_logger doesn't add duplicate filters."""
logger = get_logger("test.dedup")
count_before = sum(1 for f in logger.filters if isinstance(f, RequestContextFilter))
get_logger("test.dedup")
count_after = sum(1 for f in logger.filters if isinstance(f, RequestContextFilter))
assert count_before == count_after == 1
def test_enable_logging_supersedes_request_id():
"""enable_logging handles request IDs itself (no duplicate headers)."""
api = responder.API(
allowed_hosts=["localhost"], request_id=True, enable_logging=True
)
@api.route("/")
def index(req, resp):
resp.text = "ok"
r = api.requests.get("http://localhost/")
# Should have exactly one X-Request-ID header.
assert "x-request-id" in r.headers
def test_api_logger_attribute():
"""api.log is available when enable_logging=True."""
api = responder.API(allowed_hosts=["localhost"], enable_logging=True)
assert api.log is not None
assert api.log.name == "responder.app"
def test_api_log_always_available():
"""api.log works even without enable_logging — just no request context."""
api = responder.API(allowed_hosts=["localhost"])
assert api.log is not None
assert api.log.name == "responder.app"
api.log.info("works without enable_logging")
def test_api_logger_works_in_routes():
"""api.log can be used inside route handlers with context."""
api = responder.API(allowed_hosts=["localhost"], enable_logging=True)
records = []
class CaptureHandler(logging.Handler):
def emit(self, record):
records.append(record)
handler = CaptureHandler()
api.log.addHandler(handler)
@api.route("/")
def index(req, resp):
api.log.info("hello from route")
resp.text = "ok"
api.requests.get("http://localhost/")
api.log.removeHandler(handler)
assert any(r.msg == "hello from route" for r in records)
record = next(r for r in records if r.msg == "hello from route")
assert record.request_method == "GET"
assert record.request_path == "/"
+5 -7
View File
@@ -1,13 +1,10 @@
"""Tests for new features: validation, SSE, after_request, route groups, etc."""
import pytest
from pydantic import BaseModel
from starlette.testclient import TestClient as StarletteTestClient
import responder
from responder.ext.ratelimit import RateLimiter
# --- Pydantic auto-validation ---
@@ -42,7 +39,9 @@ def test_pydantic_request_validation():
assert "errors" in r.json()
# Invalid request — wrong type
r = api.requests.post("http://;/items", json={"name": "widget", "price": "not_a_number"})
r = api.requests.post(
"http://;/items", json={"name": "widget", "price": "not_a_number"}
)
assert r.status_code == 422
@@ -50,8 +49,7 @@ def test_pydantic_response_serialization():
"""Auto-serialize response through response_model."""
api = responder.API(allowed_hosts=[";"])
@api.route("/items", methods=["POST"],
request_model=ItemIn, response_model=ItemOut)
@api.route("/items", methods=["POST"], request_model=ItemIn, response_model=ItemOut)
async def create(req, resp):
data = await req.media()
# Include an extra field that should be stripped by the model
@@ -257,7 +255,7 @@ def test_rate_limiter():
def view(req, resp):
resp.text = "ok"
for i in range(3):
for _i in range(3):
r = api.requests.get("http://;/")
assert r.status_code == 200
assert "X-RateLimit-Remaining" in r.headers
+5 -2
View File
@@ -546,14 +546,17 @@ def test_documentation(needs_openapi):
assert "html" in r.text
def test_mount_wsgi_app(api, flask):
def test_mount_wsgi_app(flask):
# Use localhost so Werkzeug's trusted-host check accepts the request.
api = responder.API(allowed_hosts=["localhost"])
@api.route("/")
def hello(req, resp):
resp.text = "hello"
api.mount("/flask", flask)
r = api.requests.get("http://;/flask")
r = api.requests.get("http://localhost/flask")
assert r.status_code < 300