WebSockets¶
Most HTTP traffic is a single pounce: the client asks, the server answers, the connection goes quiet. WebSockets keep the channel open - a warm stream the raptor can listen to and speak into, frame by frame. VelociPy treats WebSocket endpoints as first-class routes, and the same handler runs unchanged on ASGI or RSGI terrain.
See examples/websocket.py for a runnable demo.
Why keep the line open?¶
WebSockets shine when the server must push data without waiting for a fresh request: live dashboards, collaborative cursors, game state, chat rooms. VelociPy's WebSocket wrapper hides the protocol surface and gives you a small, predictable API:
accept()- shake hands and start the hunt.receive_text(),receive_bytes(),receive_json()- read the next frame.send_text(),send_bytes(),send_json()- write back.close()- end the connection cleanly.
A simple echo den¶
A WebSocket endpoint is declared with @app.websocket(path). The handler receives a single WebSocket argument.
from velocipy import VelociPy, WebSocket
app = VelociPy()
@app.websocket("/ws")
async def echo(ws: WebSocket) -> None:
await ws.accept()
try:
while True:
message = await ws.receive_text()
await ws.send_text(f"echo: {message}")
except Exception:
await ws.close()
Run it under Uvicorn or Granian, then probe the socket with any client:
Tip
WebSocket handlers are responsible for their own loop. Accept the connection, read until the client hangs up or an exception breaks the loop, then close. VelociPy will close the underlying connection in finally if you forget, but explicit cleanup is good discipline.
Reading the terrain: path params and headers¶
The WebSocket URL can carry path parameters, and the upgrade request carries headers. Both are exposed on the WebSocket object.
@app.websocket("/rooms/{room_id}")
async def room(ws: WebSocket) -> None:
await ws.accept()
room_id = ws.path_params["room_id"]
client_name = ws.header("x-client-name", default="stranger")
await ws.send_text(f"joined room {room_id} as {client_name}")
while True:
text = await ws.receive_text()
await ws.send_text(f"[{room_id}] {text}")
| What you need | How to read it |
|---|---|
| Path parameters | ws.path_params["name"] |
| Headers | ws.headers dict, or ws.header("name") case-insensitively |
| Raw query string | ws.query_string |
| Connection state | ws.accepted, ws.closed |
Binary and JSON frames¶
Not every message is plain text. Use the typed receive and send helpers to move bytes or structured JSON.
import msgspec
class Move(msgspec.Struct):
x: int
y: int
@app.websocket("/game")
async def game(ws: WebSocket) -> None:
await ws.accept()
try:
while True:
move = await ws.receive_json() # decoded with the app's JSON adapter
await ws.send_json({"ack": move, "tick": 42})
except Exception:
await ws.close()
For raw frames, swap in receive_bytes() and send_bytes().
Note
receive_json() uses the same JSON adapter configured on the app, so msgspec, orjson, and stdlib JSON all behave consistently with your HTTP endpoints.
Subprotocol negotiation¶
Pass a subprotocol to accept() when the client requests one.
@app.websocket("/chat")
async def chat(ws: WebSocket) -> None:
await ws.accept(subprotocol="chat.v1")
await ws.send_text("protocol accepted")
Closing with intent¶
Call close(code, reason) to end a connection gracefully. If an unhandled exception escapes the handler, VelociPy closes with code 1011 (internal error) in production, or re-raises it in debug mode so you can see the traceback.
from velocipy.exceptions import WebSocketException
@app.websocket("/guard")
async def guard(ws: WebSocket) -> None:
await ws.accept()
try:
while True:
text = await ws.receive_text()
if text == "quit":
await ws.close(code=1000, reason="client requested exit")
return
await ws.send_text(f"ok: {text}")
except WebSocketException as exc:
await ws.close(code=exc.code, reason=exc.reason)
A small broadcast pack¶
VelociPy does not ship with a built-in broadcast manager - that would be heavier gear than most hunts need. In practice you keep a global set of open connections and fan out to it.
from velocipy import VelociPy, WebSocket
app = VelociPy()
connections: set[WebSocket] = set()
@app.websocket("/broadcast")
async def broadcast(ws: WebSocket) -> None:
await ws.accept()
connections.add(ws)
try:
while True:
message = await ws.receive_text()
for conn in list(connections):
if conn is not ws and not conn.closed:
await conn.send_text(message)
finally:
connections.discard(ws)
if not ws.closed:
await ws.close()
Warning
This pattern works for a single process. Once you scale across multiple workers, move the roster to Redis, a message bus, or another shared surface.
WebSocket endpoints at a glance¶
| Tool | Purpose |
|---|---|
@app.websocket("/path") |
Register a WebSocket route |
ws.accept(subprotocol=None) |
Accept the handshake |
ws.receive_text() |
Read a text frame |
ws.receive_bytes() |
Read a binary frame |
ws.receive_json() |
Read text and decode as JSON |
ws.send_text(...) |
Send a text frame |
ws.send_bytes(...) |
Send a binary frame |
ws.send_json(...) |
Encode and send JSON |
ws.close(code, reason) |
Close gracefully |
ws.path_params |
Route path parameters |
ws.header(name) |
Case-insensitive header lookup |
VelociPy keeps WebSockets feather-light: one handler, one loop, and the same app on ASGI or RSGI. For a full working den, see examples/websocket.py.