Skip to content

Очереди сообщений

Что такое Celery и для каких задач он используется?

Celery — это распределенная очередь задач (Distributed Task Queue) для выполнения долгих и фоновых операций асинхронно, вне основного потока приложения. Он используется для таких задач, как отправка электронных писем, обработка изображений, генерация отчетов и автоматизация повторяющихся заданий, что позволяет ускорить отклик веб-приложений и повысить удобство для пользователей.

🔄Основные задачи Celery

  • Асинхронная обработка: Выполняет задачи в фоновом режиме, не блокируя основной поток приложения. Это повышает производительность, так как пользователь сразу получает ответ, а задача выполняется параллельно.
  • Выполнение долгих вычислений: Отлично подходит для ресурсоемких операций, таких как анализ данных или сложные запросы, которые могут занять много времени.
  • Автоматизация и планирование: Позволяет планировать выполнение задач на определенное время или с определенной периодичностью, например, для генерации ежедневных отчетов или очистки старых данных.
  • Отправка уведомлений: Используется для пакетной отправки уведомлений, таких как email или push-уведомления, чтобы избежать задержек в основном потоке.
  • Обработка медиафайлов: Применяется для задач, связанных с обработкой изображений, таких как создание миниатюр (thumbnails), что освобождает основной веб-процесс

Celery работает по принципу производитель-потребитель (producer-consumer):

  • Производитель (ваше приложение) помещает задачи в очередь
  • Потребитель (Celery worker) забирает задачи и выполняет их
  • Брокер (Redis, RabbitMQ) служит посредником для передачи задач
    #Производитель (Django/FastAPI приложение)
    task_result = send_email.delay(user_id, "Welcome!")
    
    #Потребитель (Celery worker)
    @task
    def send_email(user_id, message):
        # Выполнение в фоне
        user = User.objects.get(id=user_id)
        user.email_user(subject="Welcome", message=message)
    

Преимущества использования

  • Производительность и отзывчивость Основное преимущество — освобождение основного приложения от блокирующих операций. Веб-сервер может быстро обрабатывать HTTP-запросы, в то время как тяжелые задачи выполняются в фоне отдельными процессами.

  • Масштабируемость Архитектура Celery позволяет легко масштабировать систему. При увеличении нагрузки можно просто добавить дополнительные воркеры, не изменяя код основного приложения. Очереди задач могут распределяться между множеством серверов.

  • Надежность и отказоустойчивость Задачи не теряются при перезапуске системы или сбоях в работе. Брокер сообщений гарантирует доставку, а механизм повторных попыток обеспечивает выполнение даже в условиях временных проблем.

  • Гибкость управления Поддержка приоритетов задач, отложенного запуска, цепочек и групп задач позволяет создавать сложные рабочие процессы. Можно настроить разные очереди для различных типов задач с индивидуальными настройками параллелизма.

🛠 Технические особенности

  • Поддерживаемые брокеры: Наиболее популярные решения — Redis (быстрый, с поддержкой персистентности) и RabbitMQ (надежный, с богатыми возможностями маршрутизации). Выбор зависит от конкретных требований проекта.

  • Мониторинг и администрирование: Для наблюдения за выполнением задач существуют инструменты вроде Flower, которые предоставляют веб-интерфейс для мониторинга состояния воркеров, просмотра очередей и управления задачами. Интеграция с веб-фреймворками

Celery легко интегрируется с популярными фреймворками Django, Flask, FastAPI и другими. Существуют готовые решения и лучшие практики для каждой из этих платформ.

🚀 Типичный рабочий процесс

Приложение получает запрос, требующий длительной обработки. Вместо выполнения операции в рамках HTTP-запроса, оно создает асинхронную задачу и помещает ее в очередь через брокер. Celery воркер, работающий в фоне, забирает задачу и начинает ее выполнение. Пользователь немедленно получает ответ, что задача принята в обработку, и может отслеживать ее статус через отдельный интерфейс.

Статьи на хабре Celery для новичков

Celery: проясняем неочевидные моменты

Как подружить Celery и SqlAlchemy 2.0 с асинхронным Python


Как Celery взаимодействует с брокерами сообщений (RabbitMQ, Redis)?

Celery использует брокеры сообщений в качестве посредника между приложением (producer) и воркерами (consumers). Эта архитектура обеспечивает асинхронную связь и надежную доставку задач. Приложение отправляет задачу как сообщение в очередь брокера, а воркеры, которые прослушивают эту очередь, забирают задачу и выполняют её, а затем отправляют результат в бэкенд

Как это работает

  • Отправка задачи: Когда в вашем приложении возникает необходимость выполнить фоновую задачу, она сериализует эту задачу и отправляет сообщение (запрос на выполнение) в очередь на брокере сообщений.
  • Получение задачи: Воркеры Celery постоянно слушают брокера сообщений. Когда на брокере появляется новое сообщение-задача, один из свободных воркеров извлекает его из очереди.
  • Выполнение задачи: Воркер десериализует сообщение и выполняет саму задачу, используя код вашего приложения. Это происходит в отдельном процессе, не блокируя основной поток приложения.
  • Отправка результата: После завершения работы, воркер отправляет результат выполнения (или статус задачи) в бэкенд, который может быть тем же Redis или другим хранилищем. Это позволяет отслеживать статус и результаты выполнения задач.

📡 Протоколы и транспортные механизмы

AMQP (Advanced Message Queuing Protocol)

RabbitMQ использует стандартный протокол AMQP, который предоставляет:

  • Гарантированную доставку — сообщения не теряются при сбоях
  • Гибкую маршрутизацию — обменники (exchanges) и очереди (queues)
  • Подтверждения получения — воркер подтверждает обработку задачи
  • Персистентность — сообщения сохраняются на диск

Redis Protocol

Redis работает через собственный протокол и предлагает:

  • Высокую производительность — операции в памяти
  • Публикация/подписка — механизм pub/sub для широковещательных сообщений
  • Простота настройки — минимальная конфигурация
  • Дополнительные возможности — кэширование, хранение результатов

🔄 Процесс взаимодействия

1. Отправка задачи (Producer → Broker)

# Приложение отправляет задачу в очередь
task_result = send_email.delay(user_id, "Welcome!")

Что происходит:

  • Celery сериализует задачу в JSON/бинарный формат
  • Сообщение помещается в указанную очередь брокера
  • Приложение немедленно получает контроль (не блокируется)

2. Доставка сообщения (Broker → Worker)

# Воркер постоянно опрашивает очередь
celery -A myapp worker --loglevel=info

Механизм доставки:

  • Воркеры подключаются к брокеру и объявляют интересующие очереди
  • Брокер передает сообщения доступным воркерам
  • Сообщение временно блокируется, пока воркер его обрабатывает

3. Подтверждение выполнения (Worker → Broker)

@task(bind=True)
def process_data(self, data):
    try:
        # Обработка данных
        result = heavy_computation(data)
        return result  # Автоматическое подтверждение
    except Exception:
        # Задача будет повторена
        raise self.retry(countdown=60)

Типы подтверждений:

  • ack — успешное выполнение, сообщение удаляется из очереди
  • reject — отказ с возможностью повторной очереди
  • requeue — возврат в очередь для повторной обработки

⚡ Сравнение брокеров

RabbitMQ — надежность и гибкость

# Конфигурация для RabbitMQ
app.conf.broker_url = 'amqp://user:pass@localhost:5672//'
app.conf.broker_connection_retry_on_startup = True

Преимущества:

  • Продвинутая маршрутизация — direct, fanout, topic exchanges
  • Надежность — persistence, подтверждения, транзакции
  • Мониторинг — веб-интерфейс с детальной статистикой
  • Кластеры — поддержка распределенной установки

Redis — скорость и простота

# Конфигурация для Redis
app.conf.broker_url = 'redis://localhost:6379/0'
app.conf.result_backend = 'redis://localhost:6379/0'

Преимущества:

  • Производительность — операции в оперативной памяти
  • Универсальность — один сервер для брокера и бэкенда результатов
  • Простота — минимальная настройка и обслуживание
  • Широкие возможности — pub/sub, кэширование, хранение состояний

Статьи на хабре FastAPI + Redis + Celery: Создание системы временного хранения файлов с автоудалением и удобным веб-интерфейсом

Асинхронные задачи с FastAPI и Celery