From 7854e5e2c12d64ad01d304809e3b2127c336dd23 Mon Sep 17 00:00:00 2001 From: Scoder12 <34356756+Scoder12@users.noreply.github.com> Date: Mon, 3 Aug 2020 18:01:41 -0700 Subject: [PATCH] Add authed ratelimit --- src/replit/maqpy/utils.py | 66 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 64 insertions(+), 2 deletions(-) diff --git a/src/replit/maqpy/utils.py b/src/replit/maqpy/utils.py index 1c37566..2b52cd3 100644 --- a/src/replit/maqpy/utils.py +++ b/src/replit/maqpy/utils.py @@ -1,5 +1,6 @@ """Utitilities to make development easier.""" from functools import wraps +import time from typing import Any, Callable, Union import flask @@ -35,7 +36,7 @@ def needs_sign_in(func: Callable = None, login_res: str = sign_in_page) -> Calla Args: func (Callable): The function passed in if used as a decorator. Defaults to None. - login_html (str): The HTML to show when the user needs to sign in. Defaults to + login_res (str): The HTML to show when the user needs to sign in. Defaults to sign_in_snippet. Returns: @@ -48,7 +49,7 @@ def needs_sign_in(func: Callable = None, login_res: str = sign_in_page) -> Calla if flask.request.signed_in: return func(*args, **kwargs) else: - return login_html + return login_res return handler @@ -134,3 +135,64 @@ def local_redirect(location: str, code: int = 302) -> flask.Response: "https://" + flask.request.headers["host"] + location, code ) ) + + +def authed_ratelimit( + max_requests: int, + period: float, + login_res: str = sign_in_page, + get_ratelimited_res: Callable[[float], str] = ( + lambda left: f"Too many requests, wait {left} sec" + ), +) -> Callable[[Callable], flask.Response]: + """Require sign in and limit the amount of requests each signed in user can perform. + + This decorator also calls needs_signin for you and passes the login_res kwarg + directly to it. + + Args: + max_requests (int): The maximum amount of requests allowed in the period. + period (float): The length of the period. + login_res (str): The response to be shown if the user is not signed in, passed + to needs_sign_in. + get_ratelimited_res (Callable[[float], str]): A callable which is passed the + amount of time remaining before the user can request again and returns the + response that should be sent to the user. + + Returns: + Callable[[Callable], flask.Response]: A function which decorates the handler. + """ + + def decorator(func: Callable) -> flask.Response: + last_reset = time.time() + num_requests = {} + + # Checks for signin first, before checking ratelimit + @needs_sign_in(login_res=login_res) + @wraps(func) + def handler(*args: Any, **kwargs: Any) -> flask.Response: + nonlocal last_reset + nonlocal num_requests + + name = flask.request.auth.name + now = time.time() + + if now - last_reset >= period: + last_reset = now + num_requests = {} + + times_requested = num_requests.get(name, 0) + if times_requested >= max_requests: + res = get_ratelimited_res(period - (now - last_reset)) + # Make a reponse object so that status can be set + if not isinstance(res, flask.Response): + res = flask.make_response(res) + res.status = "429" + return res + + num_requests[name] = times_requested + 1 + return func(*args, **kwargs) + + return handler + + return decorator