Rate limiting¶
A velociraptor does not let the herd trample its hunting ground. When too many requests arrive at once, the pack needs discipline: slow the swarm, protect the nest, and make sure one loud client cannot starve everyone else.
VelociPy's rate limiter is that discipline. It is dependency-based, not a
hidden middleware rule, so you decide exactly which endpoints wear the armor
and how tight the straps are. Attach a rate_limit(...) dependency to a route,
and every request must pass the check before the handler runs.
The limiter's storage check is async internally, but it attaches to any route
through Depends - sync or async. It supports three algorithms, two storage
backends, and arbitrary identifier logic, all resolved through the same
Depends machinery used everywhere else in the framework.
At a glance¶
| Gear | What it does |
|---|---|
| Algorithms | Token bucket, fixed window, sliding window. Pick the claw for the prey. |
| Identifiers | Any dependency that returns a string: IP, user ID, header, cookie, path, or your own mix. |
| Storage | MemoryStorage for local runs, RedisStorage for distributed packs. |
| Rules | Reusable RateLimitRule objects with limit, window, cost, and algorithm. |
| Failure mode | on_storage_error chooses fail-open (ALLOW) or fail-closed (DENY). |
| Response | Exceeded limits raise 429 Too Many Requests with a Retry-After header when available. |
The simplest fence¶
Start with a single rule and in-memory storage. This is perfect for development and tests. Each rule has a name, a limit, and a window in seconds.
from velocipy import Depends, Request, VelociPy
from velocipy.limiter import Limiter, RateLimitRule, rate_limit
from velocipy.limiter.storage import MemoryStorage
app = VelociPy()
limiter = Limiter(storage=MemoryStorage())
STANDARD = RateLimitRule(name="standard", limit=10, window=60)
async def client_ip(request: Request) -> str:
"""Mark the prey by IP address."""
forwarded = request.header("x-forwarded-for")
if forwarded:
ip = forwarded.split(",")[0].strip()
else:
conn = request.connection.client
ip = conn[0] if conn else "unknown"
return f"ip:{ip}"
@app.get("/items")
async def list_items(
_: None = Depends(rate_limit(limiter, rules=[STANDARD], identifier=client_ip))
):
return {"items": ["claw", "feather"]}
Why a dependency?
Because rate_limit(...) returns a normal Depends, it composes with every
other feature: security, other dependencies, sub-dependencies, and
generator teardown. The check runs only on the routes that declare it.
Sync routes work too
The dependency resolver awaits the limiter check whether your handler is
async def or a plain def. You can protect sync routes without any extra
syntax.
When a client makes more than ten requests in sixty seconds, VelociPy returns
429 Too Many Requests and skips the handler.
Picking your strike: algorithms¶
Different prey need different claws.
Best for allowing short bursts while keeping the long-term average under control. A bucket refills steadily and each request costs one or more tokens.
Simplest and most predictable. Counters reset at the start of each window. Watch out for stampede at the window boundary.
Smoothest behavior. It tracks each request timestamp and prunes anything older than the window. No boundary stampede, slightly more storage work.
The default algorithm is token bucket, so you only need to import the enum when you want to switch claws.
Marking the prey: identifiers¶
A rule without a target is just noise. The identifier argument tells the
limiter who to count. It can be any VelociPy dependency that returns a
string.
async def user_id(request: Request) -> str:
"""Mark the prey by an API-user header."""
value = request.header("x-user-id") or "anonymous"
return f"user:{value}"
@app.get("/profile")
async def profile(
_: None = Depends(rate_limit(limiter, rules=[STANDARD], identifier=user_id))
):
return {"profile": "raptor"}
Common identifier recipes:
- IP address - good for public endpoints and login forms.
- User ID header - good for authenticated APIs.
- Cookie value - track a session-scoped bucket.
- Path - rate-limit per route, shared across all callers.
Because identifiers are plain dependencies, you can combine them with security helpers. For example, return the OAuth2 subject as the limit key.
Layered rules¶
One rule is rarely enough. Pass a list, and the limiter enforces the most restrictive outcome. This lets you set both a burst limit and a sustained limit on the same route.
STRICT = [
RateLimitRule(name="strict-burst", limit=5, window=1),
RateLimitRule(name="strict-minute", limit=20, window=60),
]
@app.post("/expensive")
async def expensive(
_: None = Depends(rate_limit(limiter, rules=STRICT, identifier=user_id))
):
return {"status": "done"}
Order does not matter
Rules are checked independently. The request is allowed only if every rule allows it. The returned headers reflect the rule with the fewest remaining requests.
Costly requests¶
Not every strike weighs the same. A file upload or heavy report might cost
more than a quick lookup. Set cost to consume more than one token.
A cost of 5 means each request drains five tokens from the bucket, or five
slots from the window.
When storage stumbles¶
If Redis hiccups, you must decide whether to block traffic or let it through.
RateLimitRule accepts on_storage_error:
from velocipy.limiter import StorageErrorAction
SAFEGUARD = RateLimitRule(
name="critical",
limit=100,
window=60,
on_storage_error=StorageErrorAction.DENY,
)
| Action | Behavior |
|---|---|
ALLOW (default) |
Fail open. The request proceeds as if no limit existed. |
DENY |
Fail closed. Returns 503 Service Unavailable. |
Use DENY for sensitive endpoints where an unavailable counter is safer than
an unenforced one.
Scaling the pack: Redis storage¶
MemoryStorage only sees one process. When you run multiple workers, use
RedisStorage. It runs atomic Lua scripts for every algorithm, so the count
stays consistent across the whole pack.
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from velocipy import VelociPy
from velocipy.limiter.storage import RedisStorage
@asynccontextmanager
async def lifespan(app: VelociPy) -> AsyncIterator[None]:
yield
await limiter.close()
redis_storage = RedisStorage(url="redis://localhost")
limiter = Limiter(storage=redis_storage)
app = VelociPy(lifespan=lifespan)
Close the storage
Both MemoryStorage and RedisStorage should be closed when the app shuts
down. The example above uses a lifespan context manager to do exactly that.
Without it, Redis connections may leak.
Install the Redis backend with:
Testing the fence¶
Use TestClient to verify the limiter without starting a server.
from velocipy.testing import TestClient
client = TestClient(app)
# First requests are allowed.
for _ in range(10):
response = client.get("/items", headers={"x-forwarded-for": "10.0.0.1"})
assert response.status_code == 200
# The eleventh is turned away.
response = client.get("/items", headers={"x-forwarded-for": "10.0.0.1"})
assert response.status_code == 429
assert "Retry-After" in response.headers
Isolate identifiers in tests
Give each test its own IP or user ID so one test does not drain the bucket
for another. MemoryStorage is reset per process, but tests run in the same
process can still step on each other's counters.
Full working example¶
See examples/rate_limit.py for a runnable app
that uses per-user limits, per-IP login limits, and a lifespan handler to close
the limiter cleanly.
Summary¶
| Concept | API |
|---|---|
| Limiter | Limiter(storage=...) |
| Rule | RateLimitRule(name, limit, window, cost=1, algorithm=..., on_storage_error=...) |
| Algorithm | RateLimitAlgorithm.TOKEN_BUCKET, FIXED_WINDOW, SLIDING_WINDOW |
| Dependency | Depends(rate_limit(limiter, rules=[...], identifier=...)) |
| Identifier | Any async def returning a string, often built from Request |
| Storage | MemoryStorage() or RedisStorage(url=...) |
| Storage failure | StorageErrorAction.ALLOW or DENY |
| Response on exceed | 429 Too Many Requests with Retry-After |
Rate limiting in VelociPy stays light: declare the rule, mark the prey, and let the dependency resolver do the rest. No middleware to configure, no global state to manage - just armor where you need it.