Python
June 5

Celery: запуск асинхронных задач в Python

Введение

Celery — это распределенная система для обработки асинхронных задач и управления очередями задач в Python. Она широко используется для выполнения фоновых задач, таких как отправка электронных писем, обработка изображений, выполнение долгих вычислений и интеграция с внешними сервисами. Celery поддерживает множество брокеров сообщений, таких как RabbitMQ, Redis и другие, что делает его гибким и мощным инструментом для создания масштабируемых приложений.

Основные возможности Celery

  1. Асинхронные задачи
    • Celery позволяет выполнять задачи асинхронно, что освобождает основной поток приложения и улучшает производительность. Например, можно отправлять электронные письма в фоновом режиме, не задерживая обработку веб-запросов.
  2. Периодические задачи
    • С помощью Celery можно легко настраивать периодические задачи, которые будут выполняться через определенные интервалы времени. Это удобно для задач, таких как регулярное обновление данных или очистка временных файлов.
  3. Масштабируемость
    • Celery поддерживает распределенные системы, что позволяет легко масштабировать обработку задач на нескольких серверах. Это делает его идеальным выбором для приложений с высокими требованиями к производительности.
  4. Поддержка различных брокеров сообщений
    • Celery может работать с различными брокерами сообщений, такими как RabbitMQ, Redis, Amazon SQS и другими, что делает его гибким и легко интегрируемым в существующую инфраструктуру.

Установка и настройка Celery

Для начала работы с Celery необходимо установить библиотеку и настроить брокер сообщений. В этом примере мы будем использовать Redis в качестве брокера.

Установка Celery и Redis

Установите Celery и Redis с помощью pip:

pip install celery[redis]

Настройка Celery

Создайте файл celery_app.py и настройте Celery:

from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0') 

@app.task
def add(x, y):
    return x + y

В этом примере мы создали простое приложение Celery с задачей add, которая складывает два числа.

Если хотите использовать RabbitMQ, то нужно поменять url брокера:

app = Celery('tasks', broker='amqp://myuser:mypassword@localhost:5672/myvhost')

Для использования Amazon SQS потребуются aws_access_key_id и aws_secret_access_key. Для этого также потребуется поставить зависимости:

pip install celery[sqs]

А код для формирования URL брокера будет выглядеть примерно вот так:

from kombu.utils.url import safequote  # необходим для экранирования специальных символов

aws_access_key = safequote("ABCDEFGHIJKLMNOPQRST")
aws_secret_key = safequote("ZYXK7NiynG/TogH8Nj+P9nlE73sq3")

broker_url = "sqs://{aws_access_key}:{aws_secret_key}@".format(
    aws_access_key=aws_access_key, aws_secret_key=aws_secret_key,
)

Запуск Celery Worker

Запустите Celery worker для обработки задач:

celery -A celery_app worker --loglevel=info

Выполнение задач

Теперь мы можем вызывать задачу add из нашего кода:

from celery_app import add 
result = add.delay(4, 6) 
print('Task result:', result.get()) 

Примеры использования Celery

  1. Отправка электронных писем Создайте задачу для отправки электронных писем:
from celery import Celery
import smtplib

app = Celery('email_tasks', broker='redis://localhost:6379/0')

@app.task
def send_email(to_address, subject, message):
    # Настройка SMTP сервера и отправка письма
    server = smtplib.SMTP('smtp.example.com')
    server.sendmail('from@example.com', to_address, f"Subject: {subject}\n\n{message}")
    server.quit()

Вызывайте задачу для отправки письма в фоновом режиме:

from email_tasks import send_email

send_email.delay('to@example.com', 'Hello', 'This is a test email')

Периодические задачи

Настройте периодическую задачу, которая будет выполняться каждый час:

from celery import Celery
from celery.schedules import crontab

app = Celery('periodic_tasks', broker='redis://localhost:6379/0')

@app.task
def periodic_task():
    print('This task runs every hour')

app.conf.beat_schedule = {
    'run-every-hour': {
        'task': 'periodic_tasks.periodic_task',
        'schedule': crontab(minute=0, hour='*'),
    },
}

Запустите Celery beat для планирования периодических задач:

celery -A periodic_tasks beat --loglevel=info 

Интеграция Celery и Django

Celery отлично интегрируется с Django, позволяя выполнять асинхронные задачи и управлять очередями задач в вашем Django-проекте. В этом разделе мы рассмотрим, как настроить и использовать Celery в приложении Django.

Шаг 1: Установка зависимостей

Для начала установите Celery и брокер сообщений (в данном примере используем Redis):

pip install celery[redis]

Шаг 2: Настройка Celery в Django

  1. Создание файла конфигурации CeleryВ корневой директории вашего Django-проекта (рядом с файлом settings.py) создайте файл celery.py:
# myproject/celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# Устанавливаем переменную окружения для настроек Django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

# Создаем экземпляр приложения Celery
app = Celery('myproject')

# Загружаем конфигурацию из настроек Django
app.config_from_object('django.conf:settings', namespace='CELERY')

# Автоматически обнаруживаем задачи в файлах tasks.py
app.autodiscover_tasks()

Обновление настроек Django

В файле settings.py добавьте конфигурацию Celery:

# myproject/settings.py

# Настройки брокера сообщений (Redis)
CELERY_BROKER_URL = 'redis://localhost:6379/0'

# Настройки хранилища результатов (опционально)
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'

# Настройки для автоматического обнаружения задач
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'UTC'

Инициализация Celery в Django

Обновите файл mysite/__init__.py (mysite - там где лежит settings.py) вашего проекта, чтобы инициализировать Celery при запуске Django:

# myproject/__init__.py
from __future__ import absolute_import, unicode_literals

# Инициализация Celery
from .celery import app as celery_app

__all__ = ('celery_app',)

Шаг 3: Создание задач

Создайте файл tasks.py в одном из ваших Django-приложений и добавьте задачи, которые будут выполняться Celery:

# myapp/tasks.py
from celery import shared_task

@shared_task
def add(x, y):
    return x + y

@shared_task
def send_email(to_address, subject, message):
    # Пример отправки письма
    from django.core.mail import send_mail
    send_mail(subject, message, 'from@example.com', [to_address])

Шаг 4: Запуск Celery Worker

Запустите Celery worker для обработки задач:

celery -A myproject worker --loglevel=info

Шаг 5: Запуск Celery Beat (для периодических задач)

Если у вас есть периодические задачи, запустите Celery beat:

celery -A myproject beat --loglevel=info

Пример использования задач в Django

Теперь вы можете вызывать задачи Celery из любого места в вашем Django-проекте. Например, вызов задачи из представления:

# myapp/views.py
from django.shortcuts import render
from .tasks import add, send_email

def index(request):
    # Вызов задачи add
    result = add.delay(4, 6)
    
    # Вызов задачи send_email
    send_email.delay('to@example.com', 'Hello', 'This is a test email')

    return render(request, 'index.html', {'result': result.get()})

Интеграция Celery с Django позволяет вам эффективно управлять асинхронными задачами и очередями задач, улучшая производительность и масштабируемость вашего приложения. Следуя шагам, описанным в этом руководстве, вы сможете настроить Celery для работы с вашим Django-проектом, создать и выполнять задачи, а также использовать мощные возможности Celery для обработки периодических задач и управления очередями.

Возможные ошибки

1. Ошибки при загрузке задач

Ошибка:

Received unregistered task of type 'myapp.tasks.add'.

Причина: Эта ошибка возникает, если Celery не может найти или загрузить задачу, указанную в коде.

Решение:

  • Убедитесь, что файл tasks.py существует и правильно настроен.
  • Проверьте, что Celery настроен на автоматическое обнаружение задач.

2. Django и загрузка celery.py

При использовании Django может возникнуть такая ошибка:

kombu.exceptions.OperationalError: [Errno 111] Connection refused

Решение этой проблемы: добавьте в файл mysite/__init__.py (mysite - директория где лежат settings.py и, собственно, celery.py) следующий код:

from .celery import app as celery_app

__all__ = ['celery_app']

Заключение

Celery является мощным и гибким инструментом для управления асинхронными задачами и очередями задач в Python. Его возможности позволяют легко масштабировать приложения и выполнять различные фоновые задачи, что делает его незаменимым инструментом для разработки высокопроизводительных приложений. С поддержкой различных брокеров сообщений и возможности настройки периодических задач, Celery предоставляет все необходимое для эффективной обработки задач в распределенных системах.