Celery: запуск асинхронных задач в Python
Введение
Celery — это распределенная система для обработки асинхронных задач и управления очередями задач в Python. Она широко используется для выполнения фоновых задач, таких как отправка электронных писем, обработка изображений, выполнение долгих вычислений и интеграция с внешними сервисами. Celery поддерживает множество брокеров сообщений, таких как RabbitMQ, Redis и другие, что делает его гибким и мощным инструментом для создания масштабируемых приложений.
Основные возможности Celery
- Асинхронные задачи
- Celery позволяет выполнять задачи асинхронно, что освобождает основной поток приложения и улучшает производительность. Например, можно отправлять электронные письма в фоновом режиме, не задерживая обработку веб-запросов.
- Периодические задачи
- С помощью Celery можно легко настраивать периодические задачи, которые будут выполняться через определенные интервалы времени. Это удобно для задач, таких как регулярное обновление данных или очистка временных файлов.
- Масштабируемость
- Celery поддерживает распределенные системы, что позволяет легко масштабировать обработку задач на нескольких серверах. Это делает его идеальным выбором для приложений с высокими требованиями к производительности.
- Поддержка различных брокеров сообщений
Установка и настройка Celery
Для начала работы с Celery необходимо установить библиотеку и настроить брокер сообщений. В этом примере мы будем использовать Redis в качестве брокера.
Установите Celery и Redis с помощью pip:
pip install celery[redis]
Создайте файл celery_app.py
и настройте Celery:
from celery import Celery app = Celery('tasks', broker='redis://localhost:6379/0') @app.task def add(x, y): return x + y
В этом примере мы создали простое приложение Celery с задачей add
, которая складывает два числа.
Если хотите использовать RabbitMQ, то нужно поменять url брокера:
app = Celery('tasks', broker='amqp://myuser:mypassword@localhost:5672/myvhost')
Для использования Amazon SQS потребуются aws_access_key_id и aws_secret_access_key. Для этого также потребуется поставить зависимости:
pip install celery[sqs]
А код для формирования URL брокера будет выглядеть примерно вот так:
from kombu.utils.url import safequote # необходим для экранирования специальных символов aws_access_key = safequote("ABCDEFGHIJKLMNOPQRST") aws_secret_key = safequote("ZYXK7NiynG/TogH8Nj+P9nlE73sq3") broker_url = "sqs://{aws_access_key}:{aws_secret_key}@".format( aws_access_key=aws_access_key, aws_secret_key=aws_secret_key, )
Запустите Celery worker для обработки задач:
celery -A celery_app worker --loglevel=info
Теперь мы можем вызывать задачу add
из нашего кода:
from celery_app import add result = add.delay(4, 6) print('Task result:', result.get())
Примеры использования Celery
from celery import Celery import smtplib app = Celery('email_tasks', broker='redis://localhost:6379/0') @app.task def send_email(to_address, subject, message): # Настройка SMTP сервера и отправка письма server = smtplib.SMTP('smtp.example.com') server.sendmail('from@example.com', to_address, f"Subject: {subject}\n\n{message}") server.quit()
Вызывайте задачу для отправки письма в фоновом режиме:
from email_tasks import send_email send_email.delay('to@example.com', 'Hello', 'This is a test email')
Настройте периодическую задачу, которая будет выполняться каждый час:
from celery import Celery from celery.schedules import crontab app = Celery('periodic_tasks', broker='redis://localhost:6379/0') @app.task def periodic_task(): print('This task runs every hour') app.conf.beat_schedule = { 'run-every-hour': { 'task': 'periodic_tasks.periodic_task', 'schedule': crontab(minute=0, hour='*'), }, }
Запустите Celery beat для планирования периодических задач:
celery -A periodic_tasks beat --loglevel=info
Интеграция Celery и Django
Celery отлично интегрируется с Django, позволяя выполнять асинхронные задачи и управлять очередями задач в вашем Django-проекте. В этом разделе мы рассмотрим, как настроить и использовать Celery в приложении Django.
Шаг 1: Установка зависимостей
Для начала установите Celery и брокер сообщений (в данном примере используем Redis):
pip install celery[redis]
Шаг 2: Настройка Celery в Django
- Создание файла конфигурации CeleryВ корневой директории вашего Django-проекта (рядом с файлом
settings.py
) создайте файлcelery.py
:
# myproject/celery.py from __future__ import absolute_import, unicode_literals import os from celery import Celery # Устанавливаем переменную окружения для настроек Django os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings') # Создаем экземпляр приложения Celery app = Celery('myproject') # Загружаем конфигурацию из настроек Django app.config_from_object('django.conf:settings', namespace='CELERY') # Автоматически обнаруживаем задачи в файлах tasks.py app.autodiscover_tasks()
В файле settings.py
добавьте конфигурацию Celery:
# myproject/settings.py # Настройки брокера сообщений (Redis) CELERY_BROKER_URL = 'redis://localhost:6379/0' # Настройки хранилища результатов (опционально) CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # Настройки для автоматического обнаружения задач CELERY_ACCEPT_CONTENT = ['json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_TIMEZONE = 'UTC'
Обновите файл mysite/__init__.py
(mysite - там где лежит settings.py) вашего проекта, чтобы инициализировать Celery при запуске Django:
# myproject/__init__.py from __future__ import absolute_import, unicode_literals # Инициализация Celery from .celery import app as celery_app __all__ = ('celery_app',)
Шаг 3: Создание задач
Создайте файл tasks.py
в одном из ваших Django-приложений и добавьте задачи, которые будут выполняться Celery:
# myapp/tasks.py from celery import shared_task @shared_task def add(x, y): return x + y @shared_task def send_email(to_address, subject, message): # Пример отправки письма from django.core.mail import send_mail send_mail(subject, message, 'from@example.com', [to_address])
Шаг 4: Запуск Celery Worker
Запустите Celery worker для обработки задач:
celery -A myproject worker --loglevel=info
Шаг 5: Запуск Celery Beat (для периодических задач)
Если у вас есть периодические задачи, запустите Celery beat:
celery -A myproject beat --loglevel=info
Пример использования задач в Django
Теперь вы можете вызывать задачи Celery из любого места в вашем Django-проекте. Например, вызов задачи из представления:
# myapp/views.py from django.shortcuts import render from .tasks import add, send_email def index(request): # Вызов задачи add result = add.delay(4, 6) # Вызов задачи send_email send_email.delay('to@example.com', 'Hello', 'This is a test email') return render(request, 'index.html', {'result': result.get()})
Интеграция Celery с Django позволяет вам эффективно управлять асинхронными задачами и очередями задач, улучшая производительность и масштабируемость вашего приложения. Следуя шагам, описанным в этом руководстве, вы сможете настроить Celery для работы с вашим Django-проектом, создать и выполнять задачи, а также использовать мощные возможности Celery для обработки периодических задач и управления очередями.
Возможные ошибки
1. Ошибки при загрузке задач
Received unregistered task of type 'myapp.tasks.add'.
Причина: Эта ошибка возникает, если Celery не может найти или загрузить задачу, указанную в коде.
- Убедитесь, что файл
tasks.py
существует и правильно настроен. - Проверьте, что Celery настроен на автоматическое обнаружение задач.
2. Django и загрузка celery.py
При использовании Django может возникнуть такая ошибка:
kombu.exceptions.OperationalError: [Errno 111] Connection refused
Решение этой проблемы: добавьте в файл mysite/__init__.py (mysite - директория где лежат settings.py и, собственно, celery.py) следующий код:
from .celery import app as celery_app __all__ = ['celery_app']
Заключение
Celery является мощным и гибким инструментом для управления асинхронными задачами и очередями задач в Python. Его возможности позволяют легко масштабировать приложения и выполнять различные фоновые задачи, что делает его незаменимым инструментом для разработки высокопроизводительных приложений. С поддержкой различных брокеров сообщений и возможности настройки периодических задач, Celery предоставляет все необходимое для эффективной обработки задач в распределенных системах.