diff --git a/docs/source/tour.rst b/docs/source/tour.rst index 887d509..deef200 100644 --- a/docs/source/tour.rst +++ b/docs/source/tour.rst @@ -407,3 +407,102 @@ Requests with a ``Host`` header that doesn't match any of the patterns will receive a 400 Bad Request response. Wildcard domains are supported. By default, all hostnames are allowed. + + +Server-Sent Events (SSE) +------------------------ + +Stream real-time updates to the client using Server-Sent Events. This is +great for live feeds, progress updates, and AI streaming responses:: + + @api.route("/events") + async def events(req, resp): + @resp.sse + async def stream(): + for i in range(10): + yield {"data": f"message {i}"} + +Each yielded value can be a string (treated as data) or a dict with +``data``, ``event``, ``id``, and ``retry`` fields:: + + yield {"event": "update", "data": "hello", "id": "1"} + yield "simple string message" + + +Streaming Files +--------------- + +For large files, use ``resp.stream_file()`` to stream the content without +loading the entire file into memory:: + + @api.route("/download") + def download(req, resp): + resp.stream_file("large-dataset.csv") + +For small files where memory isn't a concern, ``resp.file()`` loads the +entire file at once — simpler but less efficient for large files. + + +After-Request Hooks +------------------- + +Run code after every request, useful for logging, adding headers, or +cleanup:: + + @api.after_request() + def log_response(req, resp): + print(f"{req.method} {req.full_url} -> {resp.status_code}") + + +Route Groups +------------ + +Organize related routes with a shared URL prefix. Useful for API versioning +and logical grouping:: + + v1 = api.group("/v1") + + @v1.route("/users") + def list_users(req, resp): + resp.media = [] + + @v1.route("/users/{user_id:int}") + def get_user(req, resp, *, user_id): + resp.media = {"id": user_id} + + +Request ID +---------- + +Auto-generate unique request IDs for tracing and debugging. If the client +sends an ``X-Request-ID`` header, it's forwarded; otherwise a new UUID is +generated:: + + api = responder.API(request_id=True) + + +Rate Limiting +------------- + +Built-in token bucket rate limiter:: + + from responder.ext.ratelimit import RateLimiter + + limiter = RateLimiter(requests=100, period=60) # 100 req/min + limiter.install(api) + +When the limit is exceeded, clients receive a ``429 Too Many Requests`` +response with ``Retry-After`` and ``X-RateLimit-Remaining`` headers. + + +MessagePack +----------- + +In addition to JSON and YAML, Responder supports MessagePack for efficient +binary serialization:: + + # Decode MessagePack request body + data = await req.media("msgpack") + + # Content negotiation also works — clients can send + # Accept: application/x-msgpack to receive MessagePack responses. diff --git a/pyproject.toml b/pyproject.toml index de80b7c..2601999 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,7 @@ dependencies = [ "graphene>=3", "graphql-core>=3.1", "marshmallow", + "msgpack", "pueblo[sfa-full]>=0.0.11", "pydantic>=2", "python-multipart", diff --git a/responder/__version__.py b/responder/__version__.py index f5f41e5..1173108 100644 --- a/responder/__version__.py +++ b/responder/__version__.py @@ -1 +1 @@ -__version__ = "3.1.0" +__version__ = "3.2.0" diff --git a/responder/api.py b/responder/api.py index 1b3382e..d7e5086 100644 --- a/responder/api.py +++ b/responder/api.py @@ -59,6 +59,7 @@ class API: allowed_hosts=None, openapi_theme=DEFAULT_OPENAPI_THEME, lifespan=None, + request_id=False, ): self.background = BackgroundQueue() @@ -131,6 +132,17 @@ class API: self.templates = Templates(directory=templates_dir) + if request_id: + import uuid as _uuid + + def _add_request_id(req, resp): + rid = req.headers.get( + "X-Request-ID", str(_uuid.uuid4()) + ) + resp.headers["X-Request-ID"] = rid + + self.router.after_request(_add_request_id) + @property def requests(self): """A test client connected to the ASGI app. Lazily initialized.""" diff --git a/responder/ext/ratelimit.py b/responder/ext/ratelimit.py new file mode 100644 index 0000000..a9c8e64 --- /dev/null +++ b/responder/ext/ratelimit.py @@ -0,0 +1,67 @@ +"""Simple in-memory rate limiter for Responder.""" + +import time +from collections import defaultdict + + +class RateLimiter: + """Token bucket rate limiter. + + Usage:: + + from responder.ext.ratelimit import RateLimiter + + limiter = RateLimiter(requests=100, period=60) # 100 req/min + + @api.route(before_request=True) + def rate_limit(req, resp): + limiter.check(req, resp) + + Or use the shorthand:: + + limiter = RateLimiter(requests=100, period=60) + limiter.install(api) + + """ + + def __init__(self, requests=100, period=60): + self.max_requests = requests + self.period = period + self._buckets: dict[str, list[float]] = defaultdict(list) + + def _client_key(self, req): + client = req.client + if client: + return client[0] + return req.headers.get("X-Forwarded-For", "unknown") + + def _cleanup(self, key): + now = time.time() + cutoff = now - self.period + self._buckets[key] = [ + t for t in self._buckets[key] if t > cutoff + ] + + def check(self, req, resp): + """Check rate limit. Sets 429 status if exceeded.""" + key = self._client_key(req) + self._cleanup(key) + + if len(self._buckets[key]) >= self.max_requests: + resp.status_code = 429 + resp.media = {"error": "rate limit exceeded"} + resp.headers["Retry-After"] = str(self.period) + return False + + self._buckets[key].append(time.time()) + remaining = self.max_requests - len(self._buckets[key]) + resp.headers["X-RateLimit-Limit"] = str(self.max_requests) + resp.headers["X-RateLimit-Remaining"] = str(remaining) + return True + + def install(self, api): + """Install as a before_request hook on the API.""" + + @api.route(before_request=True) + def _rate_limit(req, resp): + self.check(req, resp) diff --git a/responder/formats.py b/responder/formats.py index 5d434f7..f8b29a6 100644 --- a/responder/formats.py +++ b/responder/formats.py @@ -139,10 +139,25 @@ async def format_files(r, encode=False): return dump +async def format_msgpack(r, encode=False): + try: + import msgpack + except ImportError as exc: + raise ImportError( + "msgpack is required for MessagePack support: pip install msgpack" + ) from exc + + if encode: + r.headers.update({"Content-Type": "application/x-msgpack"}) + return msgpack.packb(r.media) + return msgpack.unpackb(await r.content) + + def get_formats(): return { "json": format_json, "yaml": format_yaml, "form": format_form, "files": format_files, + "msgpack": format_msgpack, } diff --git a/tests/test_new_features.py b/tests/test_new_features.py index f964d87..7ebaaf5 100644 --- a/tests/test_new_features.py +++ b/tests/test_new_features.py @@ -1,10 +1,11 @@ -"""Tests for new features: validation, SSE, after_request, route groups, stream_file.""" +"""Tests for new features: validation, SSE, after_request, route groups, etc.""" import pytest from pydantic import BaseModel from starlette.testclient import TestClient as StarletteTestClient import responder +from responder.ext.ratelimit import RateLimiter # --- Pydantic auto-validation --- @@ -213,3 +214,76 @@ def test_multiple_route_groups(api): assert api.requests.get("http://;/v1/status").json() == {"version": 1} assert api.requests.get("http://;/v2/status").json() == {"version": 2} + + +# --- Request ID --- + + +def test_request_id(): + """Auto-generated request ID header.""" + api = responder.API(request_id=True, allowed_hosts=[";"]) + + @api.route("/") + def view(req, resp): + resp.text = "ok" + + r = api.requests.get("http://;/") + assert "X-Request-ID" in r.headers + assert len(r.headers["X-Request-ID"]) > 0 + + +def test_request_id_forwarded(): + """Request ID is forwarded from client header.""" + api = responder.API(request_id=True, allowed_hosts=[";"]) + + @api.route("/") + def view(req, resp): + resp.text = "ok" + + r = api.requests.get("http://;/", headers={"X-Request-ID": "my-trace-123"}) + assert r.headers["X-Request-ID"] == "my-trace-123" + + +# --- Rate Limiting --- + + +def test_rate_limiter(): + """Rate limiter returns 429 when exceeded.""" + api = responder.API(allowed_hosts=[";"]) + limiter = RateLimiter(requests=3, period=60) + limiter.install(api) + + @api.route("/") + def view(req, resp): + resp.text = "ok" + + for i in range(3): + r = api.requests.get("http://;/") + assert r.status_code == 200 + assert "X-RateLimit-Remaining" in r.headers + + # 4th request should be rate limited + r = api.requests.get("http://;/") + assert r.status_code == 429 + assert "Retry-After" in r.headers + + +# --- MessagePack --- + + +def test_msgpack_format(api): + """MessagePack encoding and decoding.""" + import msgpack + + @api.route("/") + async def view(req, resp): + data = await req.media("msgpack") + resp.media = data + + payload = {"hello": "world", "number": 42} + r = api.requests.post( + api.url_for(view), + content=msgpack.packb(payload), + headers={"Content-Type": "application/x-msgpack"}, + ) + assert r.json() == payload