mirror of
https://github.com/kennethreitz/responder.git
synced 2026-06-05 06:46:14 +00:00
Add auth, WebSocket, middleware, config guides. Examples and changelog.
New documentation pages: - Authentication: API keys, JWT tokens, session auth, custom exceptions - WebSocket Tutorial: echo server, chat room, HTML client, data formats - Writing Middleware: hooks vs middleware, Starlette integration, ordering - Configuration: env vars, .env files, secret keys, debug mode, production setup Also: - Complete working example at end of quickstart with cross-references - Three new example files: rest_api.py, websocket_chat.py, sse_stream.py - Changelog updated for v3.2.0 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -7,6 +7,44 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
## [v3.2.0] - 2026-03-22
|
||||
|
||||
### Added
|
||||
|
||||
- Pydantic auto-validation: `request_model` validates input, returns 422 on failure
|
||||
- Pydantic auto-serialization: `response_model` strips extra fields from responses
|
||||
- Server-Sent Events: `@resp.sse` for real-time streaming
|
||||
- `resp.stream_file()` for streaming large files without loading into memory
|
||||
- `@api.after_request()` hooks
|
||||
- `api.group("/prefix")` for route groups and API versioning
|
||||
- `api.graphql("/path", schema=schema)` one-liner GraphQL setup
|
||||
- `api = responder.API(request_id=True)` for automatic request ID generation
|
||||
- Built-in rate limiter: `RateLimiter(requests=100, period=60).install(api)`
|
||||
- MessagePack format support: `await req.media("msgpack")`
|
||||
- `req.is_json`, `req.path_params`, `req.client` properties
|
||||
- `api.exception_handler()` decorator for custom error handling
|
||||
- Lifespan context manager support
|
||||
- `uuid` and `path` route convertors
|
||||
- PEP 561 `py.typed` marker
|
||||
- Pydantic support for OpenAPI schema generation
|
||||
|
||||
### Changed
|
||||
|
||||
- Dependencies flattened: `pip install responder` gets everything
|
||||
- Core deps reduced to starlette + uvicorn
|
||||
- TestClient lazy-loaded (no httpx import in production)
|
||||
- Before-request hooks can short-circuit by setting status code
|
||||
- Removed poethepoet task runner
|
||||
|
||||
### Fixed
|
||||
|
||||
- Multipart parser losing headers when parts have multiple headers
|
||||
- `url_for()` with typed route params (`{id:int}`)
|
||||
- `resp.body` encoding crash on bytes content
|
||||
- GraphQL text query missing `await`
|
||||
- Streaming responses not sending Content-Type headers
|
||||
- Python 3.9 compatibility for union type syntax
|
||||
|
||||
## [v3.0.0] - 2026-03-22
|
||||
|
||||
### Added
|
||||
|
||||
@@ -0,0 +1,172 @@
|
||||
Configuration
|
||||
=============
|
||||
|
||||
Every application needs different settings for different environments —
|
||||
debug mode in development, real secrets in production, different database
|
||||
URLs for testing. This guide covers how to manage configuration cleanly.
|
||||
|
||||
|
||||
Environment Variables
|
||||
---------------------
|
||||
|
||||
The simplest and most universal approach. Environment variables work
|
||||
everywhere — locally, in Docker, on cloud platforms — and keep secrets
|
||||
out of your source code::
|
||||
|
||||
import os
|
||||
import responder
|
||||
|
||||
api = responder.API(
|
||||
debug=os.getenv("DEBUG", "false").lower() == "true",
|
||||
secret_key=os.environ["SECRET_KEY"],
|
||||
cors=os.getenv("CORS_ENABLED", "false").lower() == "true",
|
||||
)
|
||||
|
||||
Some variables Responder handles automatically:
|
||||
|
||||
- ``PORT`` — when set, the server binds to ``0.0.0.0`` on this port
|
||||
|
||||
Set variables in your shell::
|
||||
|
||||
$ export SECRET_KEY="your-secret-here"
|
||||
$ export DEBUG=true
|
||||
$ python app.py
|
||||
|
||||
Or in a ``.env`` file (don't commit this to git)::
|
||||
|
||||
SECRET_KEY=your-secret-here
|
||||
DEBUG=true
|
||||
|
||||
|
||||
Using .env Files
|
||||
----------------
|
||||
|
||||
For local development, a ``.env`` file is convenient. Install
|
||||
``python-dotenv`` and load it at the top of your app::
|
||||
|
||||
$ uv pip install python-dotenv
|
||||
|
||||
::
|
||||
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv()
|
||||
|
||||
import os
|
||||
import responder
|
||||
|
||||
api = responder.API(
|
||||
secret_key=os.environ["SECRET_KEY"],
|
||||
)
|
||||
|
||||
Add ``.env`` to your ``.gitignore`` — never commit secrets.
|
||||
|
||||
|
||||
Configuration Class Pattern
|
||||
----------------------------
|
||||
|
||||
For larger applications, a configuration class keeps things organized::
|
||||
|
||||
import os
|
||||
|
||||
class Config:
|
||||
SECRET_KEY = os.environ.get("SECRET_KEY", "dev-secret")
|
||||
DEBUG = os.environ.get("DEBUG", "false").lower() == "true"
|
||||
DATABASE_URL = os.environ.get("DATABASE_URL", "sqlite:///dev.db")
|
||||
CORS_ORIGINS = os.environ.get("CORS_ORIGINS", "").split(",")
|
||||
|
||||
config = Config()
|
||||
|
||||
api = responder.API(
|
||||
debug=config.DEBUG,
|
||||
secret_key=config.SECRET_KEY,
|
||||
cors=bool(config.CORS_ORIGINS[0]),
|
||||
cors_params={"allow_origins": config.CORS_ORIGINS},
|
||||
)
|
||||
|
||||
This makes it easy to see all your settings in one place.
|
||||
|
||||
|
||||
Secret Key
|
||||
----------
|
||||
|
||||
The ``secret_key`` is used to sign session cookies. If someone knows your
|
||||
secret key, they can forge session data and impersonate any user.
|
||||
|
||||
Rules:
|
||||
|
||||
- **Never use the default** in production
|
||||
- **Generate a random key**: ``python -c "import secrets; print(secrets.token_hex(32))"``
|
||||
- **Store it in an environment variable**, not in code
|
||||
- **Rotate it** if it's ever compromised (this invalidates all sessions)
|
||||
|
||||
::
|
||||
|
||||
api = responder.API(secret_key=os.environ["SECRET_KEY"])
|
||||
|
||||
|
||||
Debug Mode
|
||||
----------
|
||||
|
||||
Debug mode controls error page behavior:
|
||||
|
||||
- **On** (``debug=True``): detailed error pages with tracebacks. Never
|
||||
use this in production — it exposes your source code.
|
||||
- **Off** (``debug=False``): generic error pages. This is the default.
|
||||
|
||||
::
|
||||
|
||||
api = responder.API(debug=True) # development only
|
||||
|
||||
A common pattern is to read it from the environment::
|
||||
|
||||
api = responder.API(debug=os.getenv("DEBUG") == "true")
|
||||
|
||||
|
||||
Allowed Hosts
|
||||
-------------
|
||||
|
||||
In production, always set ``allowed_hosts`` to prevent Host header
|
||||
attacks. This should match the domain names your application serves::
|
||||
|
||||
api = responder.API(
|
||||
allowed_hosts=["example.com", "www.example.com"],
|
||||
)
|
||||
|
||||
In development, you can use ``["*"]`` (the default) or specific local
|
||||
addresses::
|
||||
|
||||
api = responder.API(allowed_hosts=["localhost", "127.0.0.1"])
|
||||
|
||||
|
||||
Putting It All Together
|
||||
-----------------------
|
||||
|
||||
A production-ready configuration setup::
|
||||
|
||||
import os
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv()
|
||||
|
||||
import responder
|
||||
|
||||
api = responder.API(
|
||||
debug=os.getenv("DEBUG", "false") == "true",
|
||||
secret_key=os.environ["SECRET_KEY"],
|
||||
allowed_hosts=os.getenv("ALLOWED_HOSTS", "*").split(","),
|
||||
cors=bool(os.getenv("CORS_ORIGINS")),
|
||||
cors_params={
|
||||
"allow_origins": os.getenv("CORS_ORIGINS", "").split(","),
|
||||
"allow_methods": ["GET", "POST", "PUT", "DELETE"],
|
||||
},
|
||||
)
|
||||
|
||||
With a ``.env`` file for local development::
|
||||
|
||||
SECRET_KEY=dev-secret-do-not-use-in-prod
|
||||
DEBUG=true
|
||||
ALLOWED_HOSTS=localhost,127.0.0.1
|
||||
CORS_ORIGINS=http://localhost:3000
|
||||
|
||||
And environment variables set properly in production (via your cloud
|
||||
platform's dashboard, Docker secrets, or a secrets manager).
|
||||
@@ -106,7 +106,11 @@ Python 3.9 and above. That's it.
|
||||
|
||||
tutorial-rest
|
||||
tutorial-sqlalchemy
|
||||
tutorial-auth
|
||||
tutorial-websockets
|
||||
tutorial-middleware
|
||||
tutorial-flask
|
||||
guide-config
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 1
|
||||
|
||||
@@ -330,3 +330,49 @@ for lightweight use cases where you don't need a full message broker.
|
||||
your application, which makes them fast to start but means CPU-intensive
|
||||
work will block the event loop. For heavy computation, consider a proper
|
||||
task queue.
|
||||
|
||||
|
||||
Putting It All Together
|
||||
-----------------------
|
||||
|
||||
Here's a complete, working Responder application that combines everything
|
||||
from this guide::
|
||||
|
||||
import responder
|
||||
|
||||
api = responder.API()
|
||||
|
||||
@api.route("/")
|
||||
def index(req, resp):
|
||||
resp.text = "Welcome to the API"
|
||||
|
||||
@api.route("/hello/{name}")
|
||||
def greet(req, resp, *, name):
|
||||
resp.media = {"message": f"hello, {name}!"}
|
||||
|
||||
@api.route("/add/{a:int}/{b:int}")
|
||||
def add(req, resp, *, a, b):
|
||||
resp.media = {"result": a + b}
|
||||
|
||||
@api.route("/echo", methods=["POST"])
|
||||
async def echo(req, resp):
|
||||
data = await req.media()
|
||||
resp.media = {"received": data}
|
||||
|
||||
if __name__ == "__main__":
|
||||
api.run()
|
||||
|
||||
Save this as ``app.py``, run it with ``python app.py``, and try::
|
||||
|
||||
$ curl http://localhost:5042/
|
||||
$ curl http://localhost:5042/hello/world
|
||||
$ curl http://localhost:5042/add/3/4
|
||||
$ curl -X POST http://localhost:5042/echo \
|
||||
-H "Content-Type: application/json" -d '{"key": "value"}'
|
||||
|
||||
From here, explore the :doc:`tour` for the full range of features, or
|
||||
jump into the tutorials:
|
||||
|
||||
- :doc:`tutorial-rest` — build a full CRUD API with validation
|
||||
- :doc:`tutorial-sqlalchemy` — connect to a database
|
||||
- :doc:`tutorial-auth` — add authentication
|
||||
|
||||
@@ -0,0 +1,191 @@
|
||||
Authentication
|
||||
==============
|
||||
|
||||
Every API that handles user data needs authentication — a way to verify
|
||||
who is making a request. This guide covers the most common patterns:
|
||||
API keys, JWT tokens, and how to build reusable auth guards with
|
||||
Responder's before-request hooks.
|
||||
|
||||
|
||||
API Key Authentication
|
||||
----------------------
|
||||
|
||||
The simplest approach. The client sends a secret key in a header, and
|
||||
your server checks it against a known value. This is common for
|
||||
server-to-server communication and simple APIs::
|
||||
|
||||
API_KEYS = {"sk-abc123", "sk-def456"}
|
||||
|
||||
@api.route(before_request=True)
|
||||
def check_api_key(req, resp):
|
||||
key = req.headers.get("X-API-Key")
|
||||
if key not in API_KEYS:
|
||||
resp.status_code = 401
|
||||
resp.media = {"error": "Invalid or missing API key"}
|
||||
|
||||
Because the before-request hook sets ``resp.status_code``, the route
|
||||
handler is skipped entirely for unauthorized requests. The client never
|
||||
reaches your endpoint — the guard catches them first.
|
||||
|
||||
The client sends the key like this::
|
||||
|
||||
$ curl -H "X-API-Key: sk-abc123" http://localhost:5042/protected
|
||||
|
||||
|
||||
Bearer Token Authentication
|
||||
----------------------------
|
||||
|
||||
Bearer tokens are the standard for modern APIs. The client sends a token
|
||||
in the ``Authorization`` header, and the server validates it. The most
|
||||
common format is `JWT <https://jwt.io/>`_ (JSON Web Tokens).
|
||||
|
||||
Install PyJWT::
|
||||
|
||||
$ uv pip install pyjwt
|
||||
|
||||
Create a helper to encode and decode tokens::
|
||||
|
||||
import jwt
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
SECRET = "your-secret-key"
|
||||
|
||||
def create_token(user_id: int) -> str:
|
||||
payload = {
|
||||
"sub": user_id,
|
||||
"exp": datetime.utcnow() + timedelta(hours=24),
|
||||
}
|
||||
return jwt.encode(payload, SECRET, algorithm="HS256")
|
||||
|
||||
def verify_token(token: str) -> dict | None:
|
||||
try:
|
||||
return jwt.decode(token, SECRET, algorithms=["HS256"])
|
||||
except jwt.InvalidTokenError:
|
||||
return None
|
||||
|
||||
Add a login endpoint that issues tokens, and a before-request hook that
|
||||
verifies them::
|
||||
|
||||
@api.route("/login", methods=["POST"])
|
||||
async def login(req, resp):
|
||||
data = await req.media()
|
||||
# In a real app, check credentials against a database
|
||||
if data.get("username") == "admin" and data.get("password") == "secret":
|
||||
token = create_token(user_id=1)
|
||||
resp.media = {"token": token}
|
||||
else:
|
||||
resp.status_code = 401
|
||||
resp.media = {"error": "Invalid credentials"}
|
||||
|
||||
@api.route(before_request=True)
|
||||
def auth_guard(req, resp):
|
||||
# Skip auth for the login endpoint itself
|
||||
if req.url.path == "/login":
|
||||
return
|
||||
|
||||
auth = req.headers.get("Authorization", "")
|
||||
if not auth.startswith("Bearer "):
|
||||
resp.status_code = 401
|
||||
resp.media = {"error": "Missing bearer token"}
|
||||
return
|
||||
|
||||
token = auth[7:] # Strip "Bearer "
|
||||
payload = verify_token(token)
|
||||
if payload is None:
|
||||
resp.status_code = 401
|
||||
resp.media = {"error": "Invalid or expired token"}
|
||||
return
|
||||
|
||||
# Store the authenticated user on the request state
|
||||
req.state.user_id = payload["sub"]
|
||||
|
||||
Now any route can access the authenticated user::
|
||||
|
||||
@api.route("/me")
|
||||
def get_me(req, resp):
|
||||
resp.media = {"user_id": req.state.user_id}
|
||||
|
||||
The client flow:
|
||||
|
||||
1. ``POST /login`` with credentials → receive a token
|
||||
2. Include ``Authorization: Bearer <token>`` on every subsequent request
|
||||
3. The token expires after 24 hours — the client must log in again
|
||||
|
||||
|
||||
Skipping Auth for Public Routes
|
||||
--------------------------------
|
||||
|
||||
The example above skips auth for ``/login`` by checking the path. For
|
||||
more control, you can use a set of public paths::
|
||||
|
||||
PUBLIC_PATHS = {"/login", "/signup", "/health", "/docs", "/schema.yml"}
|
||||
|
||||
@api.route(before_request=True)
|
||||
def auth_guard(req, resp):
|
||||
if req.url.path in PUBLIC_PATHS:
|
||||
return
|
||||
# ... check token
|
||||
|
||||
|
||||
Custom Exception for Auth Errors
|
||||
---------------------------------
|
||||
|
||||
For cleaner code, define a custom exception and register a handler::
|
||||
|
||||
class AuthError(Exception):
|
||||
def __init__(self, message="Unauthorized", status_code=401):
|
||||
self.message = message
|
||||
self.status_code = status_code
|
||||
|
||||
@api.exception_handler(AuthError)
|
||||
async def handle_auth_error(req, resp, exc):
|
||||
resp.status_code = exc.status_code
|
||||
resp.media = {"error": exc.message}
|
||||
|
||||
Now your auth guard can simply raise::
|
||||
|
||||
@api.route(before_request=True)
|
||||
def auth_guard(req, resp):
|
||||
if req.url.path in PUBLIC_PATHS:
|
||||
return
|
||||
if "Authorization" not in req.headers:
|
||||
raise AuthError("Missing authorization header")
|
||||
|
||||
|
||||
Using Sessions for Web Apps
|
||||
----------------------------
|
||||
|
||||
For traditional web applications (with HTML pages and forms), cookie-based
|
||||
sessions are simpler than tokens. The browser handles cookies automatically
|
||||
— no client-side token management needed::
|
||||
|
||||
@api.route("/login", methods=["POST"])
|
||||
async def login(req, resp):
|
||||
data = await req.media("form")
|
||||
if data["username"] == "admin" and data["password"] == "secret":
|
||||
resp.session["user"] = data["username"]
|
||||
api.redirect(resp, location="/dashboard")
|
||||
else:
|
||||
resp.status_code = 401
|
||||
resp.html = "<p>Invalid credentials</p>"
|
||||
|
||||
@api.route("/dashboard")
|
||||
def dashboard(req, resp):
|
||||
user = req.session.get("user")
|
||||
if not user:
|
||||
api.redirect(resp, location="/login")
|
||||
return
|
||||
resp.html = f"<h1>Welcome, {user}!</h1>"
|
||||
|
||||
@api.route("/logout")
|
||||
def logout(req, resp):
|
||||
resp.session.clear()
|
||||
api.redirect(resp, location="/login")
|
||||
|
||||
Remember to set a proper secret key::
|
||||
|
||||
api = responder.API(secret_key="your-production-secret-key")
|
||||
|
||||
The session data is signed (not encrypted) — users can read it but
|
||||
can't tamper with it. Don't store sensitive data like passwords in
|
||||
sessions.
|
||||
@@ -0,0 +1,129 @@
|
||||
Writing Middleware
|
||||
=================
|
||||
|
||||
Middleware sits between the server and your route handlers, processing
|
||||
every request and response that flows through your application. It's the
|
||||
right tool for cross-cutting concerns — things that apply to *all*
|
||||
requests, not just specific routes.
|
||||
|
||||
Common middleware use cases:
|
||||
|
||||
- Request logging and timing
|
||||
- Authentication and authorization
|
||||
- Adding security headers
|
||||
- Request ID generation
|
||||
- Rate limiting
|
||||
- Response compression (built-in)
|
||||
|
||||
|
||||
Hooks vs. Middleware
|
||||
--------------------
|
||||
|
||||
Responder gives you two levels of request processing:
|
||||
|
||||
**Hooks** (``before_request`` / ``after_request``) run inside Responder's
|
||||
routing layer. They receive Responder's ``req`` and ``resp`` objects and
|
||||
are the simplest way to add behavior::
|
||||
|
||||
@api.route(before_request=True)
|
||||
def add_header(req, resp):
|
||||
resp.headers["X-Powered-By"] = "Responder"
|
||||
|
||||
@api.after_request()
|
||||
def log_request(req, resp):
|
||||
print(f"{req.method} {req.url.path} -> {resp.status_code}")
|
||||
|
||||
**Middleware** runs at the ASGI level, wrapping the entire application.
|
||||
It's more powerful but more complex — you work with raw ASGI scopes
|
||||
instead of Responder objects. Use middleware when you need to process
|
||||
requests *before* they reach Responder's routing, or when you need to
|
||||
integrate with Starlette middleware.
|
||||
|
||||
|
||||
Using Starlette Middleware
|
||||
--------------------------
|
||||
|
||||
Responder is built on Starlette, so any Starlette middleware works
|
||||
out of the box::
|
||||
|
||||
from starlette.middleware.base import BaseHTTPMiddleware
|
||||
|
||||
class TimingMiddleware(BaseHTTPMiddleware):
|
||||
async def dispatch(self, request, call_next):
|
||||
import time
|
||||
start = time.time()
|
||||
response = await call_next(request)
|
||||
duration = time.time() - start
|
||||
response.headers["X-Response-Time"] = f"{duration:.3f}s"
|
||||
return response
|
||||
|
||||
api.add_middleware(TimingMiddleware)
|
||||
|
||||
The ``dispatch`` method receives a Starlette ``Request`` and a
|
||||
``call_next`` function. Call ``call_next(request)`` to pass the request
|
||||
to the next middleware (or to your route handler). The return value is
|
||||
a Starlette ``Response`` that you can modify before it's sent.
|
||||
|
||||
|
||||
Built-in Middleware
|
||||
-------------------
|
||||
|
||||
Responder configures several middleware components automatically:
|
||||
|
||||
- **GZipMiddleware** — compresses responses larger than 500 bytes
|
||||
- **TrustedHostMiddleware** — validates the ``Host`` header
|
||||
- **ServerErrorMiddleware** — catches unhandled exceptions
|
||||
- **ExceptionMiddleware** — routes exceptions to your handlers
|
||||
- **SessionMiddleware** — manages signed cookie sessions
|
||||
|
||||
Optional middleware you can enable:
|
||||
|
||||
- **CORSMiddleware** — ``api = responder.API(cors=True)``
|
||||
- **HTTPSRedirectMiddleware** — ``api = responder.API(enable_hsts=True)``
|
||||
|
||||
|
||||
Adding Third-Party Middleware
|
||||
-----------------------------
|
||||
|
||||
Any ASGI middleware can be added with ``api.add_middleware()``::
|
||||
|
||||
from some_package import SomeMiddleware
|
||||
|
||||
api.add_middleware(SomeMiddleware, option1="value", option2=True)
|
||||
|
||||
Keyword arguments are passed to the middleware's constructor.
|
||||
|
||||
|
||||
Middleware Order
|
||||
----------------
|
||||
|
||||
Middleware wraps your application like layers of an onion. The *last*
|
||||
middleware added is the *outermost* layer — it sees the request first
|
||||
and the response last.
|
||||
|
||||
Responder's built-in middleware stack (from outermost to innermost):
|
||||
|
||||
1. SessionMiddleware
|
||||
2. ServerErrorMiddleware
|
||||
3. CORSMiddleware (if enabled)
|
||||
4. TrustedHostMiddleware
|
||||
5. HTTPSRedirectMiddleware (if enabled)
|
||||
6. GZipMiddleware
|
||||
7. ExceptionMiddleware
|
||||
8. Your routes
|
||||
|
||||
When you call ``api.add_middleware()``, your middleware is added *outside*
|
||||
the existing stack. Keep this in mind for ordering dependencies — if
|
||||
middleware A depends on middleware B having run first, add B before A.
|
||||
|
||||
|
||||
When to Use What
|
||||
-----------------
|
||||
|
||||
- **Simple header additions, logging, auth checks** → use hooks
|
||||
- **Response transformation, timing, third-party integrations** → use middleware
|
||||
- **Rate limiting** → use the built-in ``RateLimiter`` (it uses hooks internally)
|
||||
- **Request ID** → use ``api = responder.API(request_id=True)``
|
||||
|
||||
Start with hooks. They're simpler and cover most cases. Graduate to
|
||||
middleware when hooks aren't enough.
|
||||
@@ -0,0 +1,171 @@
|
||||
WebSocket Tutorial
|
||||
==================
|
||||
|
||||
HTTP is request-response — the client asks, the server answers, and the
|
||||
connection closes. WebSockets upgrade that into a persistent, bidirectional
|
||||
channel where both sides can send messages at any time. This is what powers
|
||||
chat apps, live dashboards, multiplayer games, and collaborative editors.
|
||||
|
||||
This tutorial builds a simple chat room to show how WebSockets work in
|
||||
Responder.
|
||||
|
||||
|
||||
How WebSockets Work
|
||||
-------------------
|
||||
|
||||
1. The client sends a normal HTTP request with an ``Upgrade: websocket``
|
||||
header.
|
||||
2. The server accepts the upgrade and the connection switches protocols.
|
||||
3. Both sides can now send messages freely — no more request/response.
|
||||
4. Either side can close the connection at any time.
|
||||
|
||||
In Responder, WebSocket routes receive a ``ws`` object instead of
|
||||
``req`` and ``resp``. The ``ws`` object has methods for accepting the
|
||||
connection, sending and receiving data, and closing.
|
||||
|
||||
|
||||
Echo Server
|
||||
-----------
|
||||
|
||||
The simplest WebSocket — echoes everything back::
|
||||
|
||||
@api.route("/ws", websocket=True)
|
||||
async def echo(ws):
|
||||
await ws.accept()
|
||||
while True:
|
||||
data = await ws.receive_text()
|
||||
await ws.send_text(f"Echo: {data}")
|
||||
|
||||
The ``await ws.accept()`` call completes the WebSocket handshake. After
|
||||
that, you're in a loop — receive a message, send a response.
|
||||
|
||||
Test it with a WebSocket client::
|
||||
|
||||
$ pip install websocket-client
|
||||
$ python -c "
|
||||
import websocket
|
||||
ws = websocket.create_connection('ws://localhost:5042/ws')
|
||||
ws.send('hello')
|
||||
print(ws.recv()) # Echo: hello
|
||||
ws.close()
|
||||
"
|
||||
|
||||
|
||||
Chat Room
|
||||
---------
|
||||
|
||||
A chat room needs to broadcast messages to all connected clients. We keep
|
||||
a set of active connections and iterate through them when someone sends
|
||||
a message::
|
||||
|
||||
connected = set()
|
||||
|
||||
@api.route("/chat", websocket=True)
|
||||
async def chat(ws):
|
||||
await ws.accept()
|
||||
connected.add(ws)
|
||||
try:
|
||||
while True:
|
||||
message = await ws.receive_text()
|
||||
# Broadcast to all connected clients
|
||||
for client in connected:
|
||||
await client.send_text(message)
|
||||
except Exception:
|
||||
pass
|
||||
finally:
|
||||
connected.discard(ws)
|
||||
|
||||
The ``try/finally`` block ensures we remove disconnected clients from
|
||||
the set, even if the connection drops unexpectedly.
|
||||
|
||||
|
||||
Data Formats
|
||||
------------
|
||||
|
||||
WebSockets support three data formats:
|
||||
|
||||
**Text** — plain strings::
|
||||
|
||||
await ws.send_text("hello")
|
||||
message = await ws.receive_text()
|
||||
|
||||
**JSON** — auto-serialized Python objects::
|
||||
|
||||
await ws.send_json({"type": "update", "data": [1, 2, 3]})
|
||||
message = await ws.receive_json()
|
||||
|
||||
**Binary** — raw bytes, useful for images, audio, or custom protocols::
|
||||
|
||||
await ws.send_bytes(b"\x00\x01\x02")
|
||||
data = await ws.receive_bytes()
|
||||
|
||||
|
||||
HTML Client
|
||||
-----------
|
||||
|
||||
Here's a minimal HTML page that connects to the chat room. The browser's
|
||||
built-in ``WebSocket`` API handles everything — no libraries needed:
|
||||
|
||||
.. code-block:: html
|
||||
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<body>
|
||||
<div id="messages"></div>
|
||||
<input id="input" placeholder="Type a message..." />
|
||||
<script>
|
||||
const ws = new WebSocket("ws://localhost:5042/chat");
|
||||
const messages = document.getElementById("messages");
|
||||
const input = document.getElementById("input");
|
||||
|
||||
ws.onmessage = (event) => {
|
||||
const p = document.createElement("p");
|
||||
p.textContent = event.data;
|
||||
messages.appendChild(p);
|
||||
};
|
||||
|
||||
input.addEventListener("keypress", (e) => {
|
||||
if (e.key === "Enter") {
|
||||
ws.send(input.value);
|
||||
input.value = "";
|
||||
}
|
||||
});
|
||||
</script>
|
||||
</body>
|
||||
</html>
|
||||
|
||||
Save this as ``static/index.html`` and serve it with Responder's
|
||||
built-in static file support.
|
||||
|
||||
|
||||
Before-Request Hooks for WebSockets
|
||||
------------------------------------
|
||||
|
||||
You can run code before a WebSocket connection is established, just like
|
||||
HTTP before-request hooks. This is useful for authentication::
|
||||
|
||||
@api.before_request(websocket=True)
|
||||
async def ws_auth(ws):
|
||||
# Check for a token in the query string
|
||||
# (WebSocket headers are limited in browsers)
|
||||
await ws.accept()
|
||||
|
||||
WebSocket before-request hooks receive the ``ws`` object and must call
|
||||
``await ws.accept()`` if they want the connection to proceed.
|
||||
|
||||
|
||||
Testing WebSockets
|
||||
------------------
|
||||
|
||||
Use Starlette's ``TestClient`` for WebSocket tests::
|
||||
|
||||
from starlette.testclient import TestClient
|
||||
|
||||
def test_echo():
|
||||
client = TestClient(api)
|
||||
with client.websocket_connect("/ws") as ws:
|
||||
ws.send_text("hello")
|
||||
assert ws.receive_text() == "Echo: hello"
|
||||
|
||||
The ``websocket_connect`` context manager handles the connection
|
||||
lifecycle — it connects on enter and disconnects on exit.
|
||||
@@ -0,0 +1,70 @@
|
||||
# Complete REST API example with Pydantic validation.
|
||||
# https://responder.kennethreitz.org/tutorial-rest.html
|
||||
import responder
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class BookIn(BaseModel):
|
||||
title: str
|
||||
author: str
|
||||
year: int
|
||||
isbn: str | None = None
|
||||
|
||||
|
||||
class BookOut(BaseModel):
|
||||
id: int
|
||||
title: str
|
||||
author: str
|
||||
year: int
|
||||
isbn: str | None = None
|
||||
|
||||
|
||||
api = responder.API(
|
||||
title="Book Catalog",
|
||||
version="1.0",
|
||||
openapi="3.0.2",
|
||||
docs_route="/docs",
|
||||
)
|
||||
|
||||
books_db: dict[int, dict] = {}
|
||||
next_id = 1
|
||||
|
||||
|
||||
@api.route("/books", methods=["GET"])
|
||||
def list_books(req, resp):
|
||||
resp.media = list(books_db.values())
|
||||
|
||||
|
||||
@api.route("/books", methods=["POST"], check_existing=False,
|
||||
request_model=BookIn, response_model=BookOut)
|
||||
async def create_book(req, resp):
|
||||
global next_id
|
||||
data = await req.media()
|
||||
book = {"id": next_id, **data}
|
||||
books_db[next_id] = book
|
||||
next_id += 1
|
||||
resp.media = book
|
||||
resp.status_code = 201
|
||||
|
||||
|
||||
@api.route("/books/{book_id:int}", methods=["GET"])
|
||||
def get_book(req, resp, *, book_id):
|
||||
if book_id not in books_db:
|
||||
resp.status_code = 404
|
||||
resp.media = {"error": f"Book {book_id} not found"}
|
||||
return
|
||||
resp.media = books_db[book_id]
|
||||
|
||||
|
||||
@api.route("/books/{book_id:int}", methods=["DELETE"], check_existing=False)
|
||||
def delete_book(req, resp, *, book_id):
|
||||
if book_id not in books_db:
|
||||
resp.status_code = 404
|
||||
resp.media = {"error": f"Book {book_id} not found"}
|
||||
return
|
||||
del books_db[book_id]
|
||||
resp.status_code = 204
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
api.run()
|
||||
@@ -0,0 +1,42 @@
|
||||
# Server-Sent Events streaming example.
|
||||
# https://responder.kennethreitz.org/tour.html#server-sent-events-sse
|
||||
import asyncio
|
||||
|
||||
import responder
|
||||
|
||||
api = responder.API()
|
||||
|
||||
|
||||
@api.route("/")
|
||||
def index(req, resp):
|
||||
resp.html = """
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<body>
|
||||
<h1>SSE Stream</h1>
|
||||
<div id="events"></div>
|
||||
<script>
|
||||
const source = new EventSource("/stream");
|
||||
const events = document.getElementById("events");
|
||||
source.onmessage = (e) => {
|
||||
const p = document.createElement("p");
|
||||
p.textContent = e.data;
|
||||
events.appendChild(p);
|
||||
};
|
||||
</script>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
|
||||
|
||||
@api.route("/stream")
|
||||
async def stream(req, resp):
|
||||
@resp.sse
|
||||
async def events():
|
||||
for i in range(20):
|
||||
yield {"data": f"Event #{i}"}
|
||||
await asyncio.sleep(0.5)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
api.run()
|
||||
@@ -0,0 +1,57 @@
|
||||
# WebSocket chat room example.
|
||||
# https://responder.kennethreitz.org/tutorial-websockets.html
|
||||
import responder
|
||||
|
||||
api = responder.API()
|
||||
|
||||
connected = set()
|
||||
|
||||
|
||||
@api.route("/")
|
||||
def index(req, resp):
|
||||
resp.html = """
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<body>
|
||||
<h1>Chat Room</h1>
|
||||
<div id="messages" style="height:300px;overflow-y:scroll;border:1px solid #ccc;padding:10px;"></div>
|
||||
<input id="input" placeholder="Type a message..." style="width:300px;" />
|
||||
<script>
|
||||
const ws = new WebSocket(`ws://${location.host}/chat`);
|
||||
const messages = document.getElementById("messages");
|
||||
const input = document.getElementById("input");
|
||||
ws.onmessage = (e) => {
|
||||
const p = document.createElement("p");
|
||||
p.textContent = e.data;
|
||||
messages.appendChild(p);
|
||||
messages.scrollTop = messages.scrollHeight;
|
||||
};
|
||||
input.addEventListener("keypress", (e) => {
|
||||
if (e.key === "Enter" && input.value) {
|
||||
ws.send(input.value);
|
||||
input.value = "";
|
||||
}
|
||||
});
|
||||
</script>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
|
||||
|
||||
@api.route("/chat", websocket=True)
|
||||
async def chat(ws):
|
||||
await ws.accept()
|
||||
connected.add(ws)
|
||||
try:
|
||||
while True:
|
||||
message = await ws.receive_text()
|
||||
for client in connected:
|
||||
await client.send_text(message)
|
||||
except Exception:
|
||||
pass
|
||||
finally:
|
||||
connected.discard(ws)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
api.run()
|
||||
Reference in New Issue
Block a user