Python
Today

FastAPI и WebSocket: когда HTTP уже не хватает

Обычный HTTP похож на официанта в кафе: ты позвал его, сделал заказ, получил ответ — и всё, разговор закончен. Потом снова позвал, снова спросил, снова получил ответ. Для REST API, админок, форм, личных кабинетов и обычных CRUD-операций такая модель прекрасна. Она простая, понятная и не требует от сервера помнить о тебе больше, чем нужно.

Но иногда хочется другого поведения. Хочется, чтобы сервер сам мог сказать: “у тебя новое сообщение”, “задача завершилась”, “деплой упал”, “кот нажал лапой на кнопку и теперь надо срочно смотреть логи” 😸

Вот тут и появляются WebSocket.

WebSocket — это не “HTTP, только моднее”. Это постоянное соединение между клиентом и сервером, где обе стороны могут отправлять сообщения друг другу в любой момент.

Представь не официанта, а телефонный звонок. Клиент подключился, сервер принял соединение, и дальше они могут разговаривать, пока кто-нибудь не положит трубку. Клиент может писать серверу. Сервер может писать клиенту. И ему не нужно ждать очередного HTTP-запроса.

В этом вся суть.

Когда WebSocket реально нужен

WebSocket не надо пихать в проект просто потому, что он звучит красиво. Я видел такие истории: команда добавляет WebSocket “на будущее”, а потом внезапно получает переподключения, heartbeat, хранение соединений, проблемы с несколькими воркерами, прокси, таймауты и грустные лица на созвоне. Вроде хотели “чуть-чуть realtime”, а получили маленький зоопарк.

Обычный REST часто лучше. Особенно если данные можно спокойно получить по запросу.

WebSocket нужен там, где данные должны прилетать почти сразу и без постоянного опроса сервера. Например, в чатах, уведомлениях, live-статусах фоновых задач, онлайн-играх, совместном редактировании документов, мониторинге, live-логах или панелях управления, где состояние меняется без действий пользователя.

Допустим, у тебя есть сервис, который запускает долгую задачу: сборку проекта, импорт данных, генерацию отчёта или деплой приложения. Пользователь нажал кнопку “Запустить”, и дальше операция может идти 30 секунд, 5 минут или вообще уйти в загадочный режим “я думаю”.

Можно сделать polling:

GET /tasks/123/status
GET /tasks/123/status
GET /tasks/123/status
GET /tasks/123/status

Работает? Да. Но выглядит как человек, который каждые две секунды спрашивает: “Ну что? А теперь? А сейчас? Уже?” Прям кот у закрытой двери.

С WebSocket можно сделать иначе: клиент подключился к каналу задачи, а сервер сам присылает события.

{"status": "started"}
{"status": "pulling_image"}
{"status": "running_migrations"}
{"status": "done"}

Фронтенд просто слушает. Пользователь видит движение. Сервер не получает пачку одинаковых запросов. Всем немного спокойнее.

Первый WebSocket в FastAPI

FastAPI поддерживает WebSocket из коробки. Для простого старта не нужен отдельный фреймворк, не нужно городить свой транспорт и не нужно приносить в жертву документацию. Минимальный пример выглядит довольно дружелюбно:

from fastapi import FastAPI, WebSocket

app = FastAPI()


@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()

    while True:
        message = await websocket.receive_text()
        await websocket.send_text(f"Ты написал: {message}")

Клиент подключается к /ws, сервер принимает соединение через accept(), потом в цикле читает сообщения и отправляет ответ. На первый взгляд — красота. Почти слишком просто.

Но этот пример хорош только для знакомства.

В реальном проекте пользователь может закрыть вкладку, сеть может отвалиться, клиент может отправить кривой JSON, сервер может перезапуститься, а браузер может решить, что вкладка давно спит и пора бы её придушить. WebSocket — это живое соединение. А всё живое иногда ломается, устает и исчезает без предупреждения.

Отключение клиента — это не ошибка, а обычная жизнь

Когда клиент закрывает соединение, receive_text() выбросит WebSocketDisconnect. Это не катастрофа. Это нормальный сценарий, который будет происходить постоянно: пользователь обновил страницу, закрыл ноутбук, ушёл в метро, переключился на мобильный интернет или просто передумал.

from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()


@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()

    try:
        while True:
            message = await websocket.receive_text()
            await websocket.send_text(f"Получил: {message}")
    except WebSocketDisconnect:
        print("Клиент отключился")

Этот код всё ещё простой, но уже не совсем игрушечный. Он хотя бы понимает, что клиент может уйти.

WebSocket-соединение нельзя воспринимать как вечный туннель. Это скорее разговор на плохом Wi-Fi: вроде всё хорошо, но лучше быть готовым к внезапной тишине.

Без обработки отключений логи быстро начинают выглядеть как крик чайки над мусорным баком. И чем больше пользователей, тем веселее этот хор.

Менеджер соединений

Обычно одного клиента мало. Если у тебя чат, уведомления или live-статусы, нужно где-то хранить активные соединения. Для этого часто делают небольшой ConnectionManager.

from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()


class ConnectionManager:
    def __init__(self):
        self.active_connections: list[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def send_personal_message(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)


manager = ConnectionManager()


@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await manager.connect(websocket)

    try:
        while True:
            message = await websocket.receive_text()
            await manager.broadcast(f"Новое сообщение: {message}")
    except WebSocketDisconnect:
        manager.disconnect(websocket)
        await manager.broadcast("Кто-то вышел из чата")

Теперь сервер может рассылать сообщение всем подключённым клиентам. Получился почти мини-чат. Но я бы не тащил этот код в продакшен как есть: тут ещё нет авторизации, комнат, обработки ошибок при отправке, лимитов, защиты от спама, heartbeat и нормального разделения по слоям.

Идея важнее конкретной реализации: мы храним активные соединения и можем отправлять в них сообщения в нужный момент.

Комнаты: чтобы не кричать всем сразу

Часто тебе не нужно отправлять сообщение всем клиентам. Если пользователь смотрит статус конкретной задачи, он должен получать события только по этой задаче. Не по всем задачам системы, не по соседнему деплою, не по чужому импорту CSV с котиками.

Для этого удобно использовать комнаты.

from collections import defaultdict
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()


class RoomManager:
    def __init__(self):
        self.rooms: dict[str, list[WebSocket]] = defaultdict(list)

    async def connect(self, room_id: str, websocket: WebSocket):
        await websocket.accept()
        self.rooms[room_id].append(websocket)

    def disconnect(self, room_id: str, websocket: WebSocket):
        if websocket in self.rooms[room_id]:
            self.rooms[room_id].remove(websocket)

        if not self.rooms[room_id]:
            del self.rooms[room_id]

    async def send_to_room(self, room_id: str, message: str):
        for connection in self.rooms.get(room_id, []):
            await connection.send_text(message)


manager = RoomManager()


@app.websocket("/ws/tasks/{task_id}")
async def task_status_ws(websocket: WebSocket, task_id: str):
    await manager.connect(task_id, websocket)

    try:
        while True:
            await websocket.receive_text()
    except WebSocketDisconnect:
        manager.disconnect(task_id, websocket)

Клиент подключается к /ws/tasks/123, а сервер добавляет его в комнату 123. Если где-то в приложении задача 123 меняет статус, сервер может отправить событие только тем клиентам, которые подписаны на эту задачу.

Иногда WebSocket работает почти в одну сторону: клиент подключился и слушает, а сервер присылает события. Это нормально. receive_text() в таком случае нужен хотя бы для того, чтобы соединение жило и сервер мог заметить отключение клиента.

Как отправлять события из другого места приложения

WebSocket endpoint — это только точка подключения. Но статус задачи обычно меняется не там. Он меняется в сервисе, воркере, обработчике очереди, фоновой задаче или где-то ещё, куда обычный пользовательский запрос уже давно не дотягивается.

Например, есть функция деплоя:

async def run_deploy(task_id: str):
    await manager.send_to_room(task_id, "Начинаю деплой")
    await manager.send_to_room(task_id, "Скачиваю образ")
    await manager.send_to_room(task_id, "Запускаю миграции")
    await manager.send_to_room(task_id, "Готово")

Для маленького приложения такой подход может сработать. Особенно если у тебя один процесс, один сервер и всё живёт в одной памяти. Для MVP — почему бы и нет? Иногда лучше сделать простую рабочую штуку, чем неделю проектировать идеальный realtime-шлюз, который никто не просил.

Но тут есть подвох.

Если приложение запущено в несколько процессов, каждый процесс хранит свои WebSocket-соединения отдельно. Клиент может быть подключён к процессу A, а событие может произойти в процессе B. Процесс B ничего не знает про соединения процесса A.

И сообщение не дойдёт.

Вот так.

В разработке всё работало, а на сервере стало “иногда странно”. Любимая категория багов.

Несколько воркеров и брокер сообщений

Если ты запускаешь FastAPI так:

uvicorn app.main:app --workers 4

то у тебя четыре отдельных процесса. У каждого своя память, свой manager, свой список соединений и своя маленькая вселенная. Python-список не становится общим просто потому, что нам очень хочется.

Для продакшена обычно добавляют брокер сообщений:

  • Redis Pub/Sub — часто самый простой вариант;
  • RabbitMQ — хорошо подходит, если он уже есть в инфраструктуре;
  • NATS — приятный вариант для лёгких событий;
  • Kafka — если у тебя уже есть Kafka и команда не боится её кормить;
  • PostgreSQL LISTEN/NOTIFY — иногда хватает для простых внутренних событий.

Схема получается такая: WebSocket-сервер держит соединения с клиентами, сервисы и воркеры публикуют события в брокер, а каждый процесс FastAPI слушает брокер и отправляет событие тем клиентам, которые подключены именно к нему.

Для одного процесса хватит in-memory менеджера. Для нескольких процессов нужен общий канал событий.

Это не делает архитектуру ужасной. Просто появляется ещё один слой. Зато система перестаёт зависеть от того, в какой именно процесс попал клиент.

Авторизация WebSocket

С обычным HTTP всё привычно: заголовок Authorization, cookies, middleware, dependency injection. С WebSocket есть нюанс: браузерный WebSocket API не даёт удобно передавать произвольные заголовки, как в fetch.

Поэтому часто используют один из вариантов:

  • cookie-сессию;
  • токен в query string;
  • первое сообщение после подключения;
  • короткоживущий одноразовый ticket, полученный через HTTP.

Самый простой вариант — токен в query string:

from fastapi import FastAPI, WebSocket, status

app = FastAPI()


async def get_user_from_token(token: str | None):
    if token != "secret":
        return None

    return {"id": 1, "name": "Semyon"}


@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    token = websocket.query_params.get("token")
    user = await get_user_from_token(token)

    if user is None:
        await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
        return

    await websocket.accept()
    await websocket.send_text(f"Привет, {user['name']}")

Для демо и внутренних MVP это может быть нормально. Но у query string есть неприятная особенность: URL может попасть в логи, историю браузера, reverse proxy и прочие места, где токенам жить не надо.

Более аккуратная схема — одноразовый ticket. Клиент сначала делает обычный HTTP-запрос, сервер проверяет авторизацию и выдаёт ticket на короткое время. Потом клиент подключается к WebSocket с этим ticket, а сервер проверяет его и сразу помечает использованным.

Чуть больше кода. Зато меньше тревоги.

Формат сообщений

Не отправляй просто строки, если протокол может вырасти. Сегодня тебе кажется, что хватит "done", а завтра появятся типы событий, ошибки, прогресс, версия формата, request_id и ещё пара полей, потому что фронтенд попросил “буквально одну маленькую штуку”.

Лучше сразу договориться о JSON-формате:

{
  "type": "task.updated",
  "payload": {
    "task_id": "123",
    "status": "running",
    "progress": 42
  }
}

Поле type сильно упрощает жизнь. Клиент видит тип события и понимает, как его обработать. А payload хранит данные конкретного события.

Можно добавить version, request_id, error, timestamp. Не обязательно сразу всё. Но базовая структура должна быть.

Хороший WebSocket-протокол — это маленький договор между фронтендом и бэкендом. Если договор мутный, потом каждый читает сообщения как хочет, и начинается весёлый археологический квест.

Pydantic для сообщений

FastAPI любят за Pydantic, и с WebSocket его тоже можно использовать. Просто тут всё не так автоматически, как в HTTP endpoint. Сообщение нужно прочитать, разобрать и руками провалидировать.

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from pydantic import BaseModel, ValidationError

app = FastAPI()


class ClientMessage(BaseModel):
    type: str
    payload: dict


@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()

    try:
        while True:
            data = await websocket.receive_json()

            try:
                message = ClientMessage.model_validate(data)
            except ValidationError:
                await websocket.send_json({
                    "type": "error",
                    "payload": {
                        "message": "Некорректный формат сообщения"
                    }
                })
                continue

            await websocket.send_json({
                "type": "message.accepted",
                "payload": {
                    "received_type": message.type
                }
            })
    except WebSocketDisconnect:
        pass

Такой код уже похож на нормальный протокол. Клиент может ошибиться, отправить старый формат, сломать JSON или прийти из версии фронтенда, которую кто-то забыл обновить. Сервер не должен падать от одного кривого сообщения.

Это как с котом на столе: ты можешь надеяться, что он ничего не уронит, но лучше всё-таки убрать кружку подальше.

Heartbeat и мёртвые соединения

Есть неприятная штука: соединение может умереть не сразу. Пользователь закрыл ноутбук, Wi-Fi моргнул, телефон ушёл в сон, а сервер ещё какое-то время думает, что всё хорошо. В списке соединений висит клиент, которого уже нет.

Для этого используют heartbeat: периодические ping/pong-сообщения. Иногда часть этой работы берёт на себя сервер или инфраструктура, иногда удобнее сделать прикладной heartbeat.

{"type": "ping"}

Ответ:

{"type": "pong"}

Если клиент долго не отвечает, соединение закрывается и удаляется из менеджера. Звучит грубовато, но иначе список активных клиентов постепенно превращается в кладбище призраков.

А призраки в памяти — плохая архитектура. Даже если они милые и шуршат пакетиком корма.

Ошибки при broadcast

Ещё одна ловушка — рассылка сообщений. Ты проходишься по всем соединениям и отправляешь сообщение каждому. Но одно соединение уже умерло, второе зависло, третье решило устроить драму. Если не обработать ошибку, один мёртвый клиент может сломать рассылку всем остальным.

class ConnectionManager:
    def __init__(self):
        self.active_connections: list[WebSocket] = []

    async def broadcast(self, message: str):
        disconnected = []

        for connection in self.active_connections:
            try:
                await connection.send_text(message)
            except Exception:
                disconnected.append(connection)

        for connection in disconnected:
            if connection in self.active_connections:
                self.active_connections.remove(connection)

Это всё ещё упрощённый пример, но он уже показывает правильное направление: не доверяй соединениям слишком сильно. Они временные. Они ломаются. Они уходят молча. Код должен относиться к этому спокойно.

WebSocket на фронтенде

На стороне браузера всё начинается довольно просто:

const socket = new WebSocket("ws://localhost:8000/ws");

socket.onopen = () => {
  console.log("Соединение открыто");
  socket.send(JSON.stringify({
    type: "hello",
    payload: {}
  }));
};

socket.onmessage = (event) => {
  const message = JSON.parse(event.data);
  console.log("Сообщение от сервера:", message);
};

socket.onclose = () => {
  console.log("Соединение закрыто");
};

socket.onerror = (error) => {
  console.error("Ошибка WebSocket:", error);
};

А потом появляется переподключение. Потому что соединение будет отваливаться. Не “может быть”, а именно будет: вкладка уснула, сеть пропала, сервер перезапустился, ноутбук вышел из сна, пользователь уехал в лифте, кот лёг на роутер.

Минимальный reconnect может выглядеть так:

let socket = null;

function connect() {
  socket = new WebSocket("ws://localhost:8000/ws");

  socket.onopen = () => {
    console.log("WebSocket подключён");
  };

  socket.onmessage = (event) => {
    console.log("Сообщение:", event.data);
  };

  socket.onclose = () => {
    console.log("WebSocket закрыт, пробую переподключиться");

    setTimeout(() => {
      connect();
    }, 2000);
  };
}

connect();

Позже сюда добавляют exponential backoff, лимит попыток, повторную авторизацию, повторную подписку на комнаты и обработку случая, когда пользователь уже вышел из аккаунта.

WebSocket начинается с трёх строк. А потом приносит чемодан нюансов и садится на диван.

WebSocket за nginx

Если приложение стоит за nginx, нужно правильно прокинуть Upgrade-заголовки. Без них соединение может не перейти в WebSocket-режим, и ты будешь долго смотреть на код FastAPI, хотя проблема вообще не там.

location /ws/ {
    proxy_pass http://127.0.0.1:8000;

    proxy_http_version 1.1;
    proxy_set_header Upgrade $http_upgrade;
    proxy_set_header Connection "upgrade";

    proxy_set_header Host $host;
    proxy_read_timeout 600s;
}

Ещё проверь таймауты. Если proxy считает, что соединение слишком долго молчит, он может его закрыть. Снаружи это выглядит как загадочное “через минуту всё отваливается”.

Иногда WebSocket ломается не в приложении, а на уровне nginx, ingress, load balancer или корпоративного прокси. И это нормально бесит.

Origin — не авторизация, но полезная проверка

Для WebSocket нет CORS в том же виде, как для обычных HTTP-запросов через браузер. Но есть заголовок Origin, и его можно проверять, если endpoint не должен принимать подключения с любых страниц.

ALLOWED_ORIGINS = {"https://example.com"}


@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    origin = websocket.headers.get("origin")

    if origin not in ALLOWED_ORIGINS:
        await websocket.close(code=1008)
        return

    await websocket.accept()

Это не заменяет авторизацию. Пользователя всё равно нужно проверять отдельно. Но как дополнительная защита от странных подключений с чужих страниц — вполне полезно.

Типичная архитектура

Для небольшого проекта можно начать очень просто:

frontend
   |
   | WebSocket
   v
FastAPI app
   |
   | in-memory manager
   v
active connections

Такой вариант хорош для локальной разработки, MVP, внутренней админки или маленького сервиса, где один процесс и понятная нагрузка. Не надо стыдиться простых решений. Иногда они как раз самые здоровые.

Для системы посерьёзнее схема обычно меняется:

frontend
   |
   | WebSocket
   v
FastAPI websocket gateway
   |
   | subscribes
   v
Redis / RabbitMQ / NATS
   ^
   | publishes events
   |
workers / services

Мне нравится второй подход для проектов, где уже есть фоновые задачи, очереди и несколько инстансов приложения. FastAPI тогда не пытается делать всё подряд. Он держит WebSocket-соединения и пересылает события клиентам, а бизнес-логика живёт в сервисах и воркерах.

Так получается чище. И меньше желания закопать весь проект в один огромный main.py, который потом никто не хочет открывать без защитных очков.

История из жизни

Однажды мы делали панель, где пользователь запускал долгую операцию на сервере. Сначала всё было максимально просто: кнопка, REST endpoint, потом polling статуса раз в две секунды. На тестах работало. На демо тоже. Все кивали, интерфейс показывал прогресс, жизнь казалась приятной.

А потом появились реальные пользователи. Кто-то открывал пять вкладок, кто-то запускал несколько операций подряд, кто-то оставлял страницу висеть на фоне до вечера. Сервер начал получать кучу однотипных запросов статуса. Не смертельно, но шумно. Логи пухли, метрики грустили, а мы начали чувствовать, что решение вроде рабочее, но какое-то деревянное.

Мы заменили polling на WebSocket-события. Интерфейс стал ощущаться живее. Даже не “быстрее” в чистом техническом смысле, а именно живее: пользователь видел шаги операции сразу — “подключаемся”, “проверяем”, “перезапускаем”, “готово”.

Для UX это очень заметно. Когда система молчит, пользователь нервничает. Когда система говорит, что делает, пользователь уже почти доволен. Даже если ждёт.

Как кот у миски: если ты хотя бы шуршишь пакетом, надежда жива 😸

Где WebSocket лишний

Не надо использовать WebSocket для обычного CRUD. Список пользователей, форма настроек, создание записи, получение профиля, редактирование описания проекта — всё это отлично живёт на обычном HTTP.

WebSocket не делает API автоматически лучше. Он делает его постоянным и двусторонним, а это не всегда плюс. За это приходится платить: хранением соединений, обработкой отключений, переподключением на фронтенде, более сложной авторизацией, проблемами с несколькими воркерами, настройкой прокси и большим количеством состояния в системе.

Если данные можно спокойно обновлять по запросу, не усложняй.

Иногда кнопка “обновить” честнее, чем архитектура на брокерах, комнатах и heartbeat.

Практичный минимум для продакшена

Перед тем как выкатывать WebSocket в нормальную среду, я бы проверил несколько вещей. Пользователь должен проходить авторизацию, сервер должен проверять права на комнату или ресурс, сообщения должны иметь понятный JSON-формат, а ошибки в сообщениях не должны ронять соединение.

Ещё нужно обрабатывать отключения, чистить мёртвые соединения, не ломать broadcast из-за одного клиента, настроить heartbeat или другую стратегию очистки, научить фронтенд переподключаться и не забыть про nginx, ingress или load balancer.

Отдельный пункт — несколько процессов. Если приложение запускается в несколько воркеров, нужен брокер или другой общий канал событий. Иначе часть сообщений будет пропадать не потому, что WebSocket плохой, а потому что процессы не умеют читать мысли друг друга.

Самый неприятный баг в WebSocket-системах звучит так: “иногда сообщение не приходит”. Вот этого “иногда” лучше бояться заранее.

Итог

FastAPI хорошо подходит для работы с WebSocket. Он даёт понятный API: принять соединение, читать сообщения, отправлять ответы, закрывать подключение, хранить активных клиентов и строить поверх этого свой realtime-слой.

Но WebSocket не стоит романтизировать. Это не REST “на максималках”, а отдельный инструмент со своими правилами. Когда тебе нужно живое двустороннее общение — бери WebSocket. Когда нужен обычный запрос-ответ — оставь HTTP в покое, он ещё отлично бегает.

Если сомневаешься, начни с polling. Иногда его достаточно. Когда интерфейс начнёт просить больше жизни, добавишь WebSocket без драмы и архитектурного цирка.

И желательно без кота на продакшен-клавиатуре.

Хотя тут уж как повезёт 😸