Рубріки: Теория

Оптимизация очередей через Celery и Flask: делаем это правильно

Игорь Грегорченко

Если вы наткнулись на эту статью, скорее всего, вы знакомы с Flask и работаете над добавлением функции в свое веб-приложение, на выполнение которой уходит довольно много минут (если не больше). Возможно, вы хотите узнать, есть ли лучший или более быстрый способ сделать это. Да, именно так — такой способ есть, мы его подробно обсудим в этом справочном материале для всех любителей Python.

Если в вашем приложении есть долго выполняющаяся задача, например, обработка загруженных данных или отправка электронной почты, вы не хотите ждать ее завершения во время запроса. Вместо этого используйте очередь задач для отправки необходимых данных другому процессу, который будет выполнять задачу в фоновом режиме.

Celery — это мощная очередь задач, которую можно использовать как для простых фоновых задач, так и для сложных многоэтапных программ и расписаний. Это руководство покажет вам, как использовать Celery с помощью Flask, но предполагает, что вы уже прочитали руководство «Первые шаги с Celery» в документации по Celery.

Celery — это отдельный пакет Python. Установите его из PyPI с помощью pip:

$ pip install celery

Введение

Некоторые распространенные примеры, которые мы обсудим ниже,  включают в себя:

  • Вызов стороннего API для получения некоторых данных на основе пользовательского ввода.
  • Отправка электронного письма пользователю при регистрации.
  • Генерация отчета в формате PDF.

Все эти типы задач блокируют цикл запрос/ответ до его завершения, что означает, что пользователю придется подождать некоторое время. Чтобы разгрузить длительно выполняющиеся задачи, подобные этим, вы можете использовать Celery, который предоставляет механизм для разгрузки этих задач на отдельные рабочие потоки.

Celery взаимодействует с помощью сообщений, обычно используя брокер для посредничества между клиентами и рабочими потоками. Чтобы инициировать задачу, клиент Celery добавляет сообщение в очередь, а брокер затем доставляет это сообщение рабочему потоку.

Наиболее часто используемыми брокерами являются Redis и RabbitMQ. Мы установим сервер Redis локально, чтобы использовать этот механизм.

Необходимые стартовые условия

  • Python 3.6+
  • Virtualenv v20+
  • Кроме того, ожидается промежуточное знание Python и Flask. Все остальное будет объяснено по ходу статьи.

Настройка проекта

Скачайте стартовый проект и настройте его с помощью следующих команд:

git clone -b step_1 https://github.com/raunaqness/flask_celery_tutorial.git
cd flask_celery_tutorial

# make virtualenv
virtualenv v
source v/bin/activate

# install dependencies
pip install -r requirements.txt

# start server
export FLASK_APP=app; python -m flask run

Откройте 127.0.0.1:5000 в браузере, и, если все работает нормально, вы должны увидеть надпись «Hello, world!».

Далее добавим маршрут, который будет содержать кнопку Button, при нажатии на которую будет запускаться имитация длительной задачи, например, отправка электронного письма, создание отчета в формате PDF, вызов API стороннего разработчика и т.д.

Мы сымитируем этот API с помощью time.sleep(), который заблокирует работу приложения на 15 секунд.

Откройте файл app.py и добавьте следующий блок кода.

# route that will show will simply render an HTML template
@app.route("/tasks")
def tasks():
    return render_template("tasks.html")

# route that will execute a long-running task
@app.route("/long_running_task")
def long_running_task():
    # time in seconds 
    time_to_wait = 15

    print(f"This task will take {time_to_wait} seconds to complete...")
    time.sleep(time_to_wait)

    return f"<p>The task completed in {time_to_wait} seconds!"

Убедитесь, что импортировали модуль времени, добавив следующее, вместе с утверждениями импорта в верхней части файла:

import time

Далее создайте каталог с именем templates в корне проекта. Внутри него создайте новый файл tasks.html и добавьте в него следующее:

<!DOCTYPE html>
<html>

<head>
    <title>Tasks</title>
    <meta charset="utf-8" />
    <meta name="viewport" content="width=device-width, initial-scale=1" />

    <!-- Bootstrap CSS -->
    <link href="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css" rel="stylesheet"
        integrity="sha384-1BmE4kWBq78iYhFldvKuhfTAU6auU8tT94WrHftjDbrCEXSU1oBoqyl2QvZ6jIW3" crossorigin="anonymous" />
</head>

<body>
    <script src="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/js/bootstrap.bundle.min.js"
        integrity="sha384-ka7Sk0Gln4gmtz2MlQnikT1wXgYsOg+OMhuP+IlRH9sENBO0LRn5q+8nbTov4+1p"
        crossorigin="anonymous"></script>

    <div>
        <a class="btn btn-primary" href="/long_running_task" role="button">Trigger Long Running Task</a>
    </div>
</body>

</html>

Структура (дерево) вашего проекта должна выглядеть примерно так:

code
├── __pycache__
│   └── app.cpython-38.pyc
├── app.py
├── requirements.txt
└── templates
    └── tasks.html

2 directories, 4 files

Вернитесь в терминал, остановите и снова запустите сервер Flask, затем откройте 127.0.0.1:5000/tasks в браузере. Вы должны увидеть страницу tasks.html с единственной кнопкой.

Теперь, когда вы нажмете на кнопку Trigger Long-Running Task, произойдет переход к маршруту /long_running_task, который выполнит функцию def long_running_task(), определенную в файле app.py.

Обратите внимание, что страница будет находиться в состоянии «загрузки» в течение 15 секунд, так что ваше приложение застрянет в этом состоянии и не сможет выполнить никакую другую операцию, пока не завершится текущая.

Через 15 секунд вы должны увидеть выполненную задачу и ожидаемый ответ в браузере.

Также обратите внимание, что вы сможете видеть операторы печати в окне терминала во время выполнения длительного задания.

Теперь давайте посмотрим, как мы можем использовать Celery для выполнения этой задачи в фоновом режиме. Если у вас возникли проблемы, вы можете посмотреть текущее состояние вашего проекта вот здесь.

Настройка Celery и Redis

Вы уже установили пакет Celery python в начальной настройке. Чтобы подтвердить установку пакета, вы можете запустить pip freeze в окне терминала с активированным virtualenv, чтобы увидеть все установленные пакеты.

Далее необходимо установить Redis Server на локальной машине. Официальную инструкцию по установке можно найти вот здесь.

Теперь давайте настроим Celery.

Начало работы с Celery

Создайте новый файл в корне проекта под названием celery_utils.py. Он будет использоваться для инициализации экземпляра приложения Celery, аналогично тому, как мы инициализируем приложение Flask в app.py. Добавьте в файл следующий код:

from celery import Celery

# celery config
CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'

# initialize celery app
def get_celery_app_instance(app):
    celery = Celery(
        app.import_name,
        backend=CELERY_BROKER_URL,
        broker=CELERY_BROKER_URL
    )
    celery.conf.update(app.config)

    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    return celery

Вот краткое объяснение:

  • Импортируется пакет Celery python.
  • Определяется функция get_celery_app_instance, которая возвращает экземпляр Celery, который, в свою очередь, требует следующих параметров для инициализации:
    • name: это имя рабочего Celery.
    • backend: это URL используемого бэкенда, которым в данном случае является Redis, а URL хоста определяется переменной CELERY_BROKER_URL.
    • брокер: аналогично бэкенду, необходимо определить URL брокера, который также является сервером Redis.
  • <объяснить ContextTask>.
  • <возвратить экземпляр приложения celery>.

Далее, давайте используем Celery для определения долгоиграющей задачи. Внесите следующие изменения в app.py:

Добавьте следующее рядом с операторами импорта.

# importing function to get celery app instance
from celery_utils import get_celery_app_instance

Добавьте следующее после утверждений, инициализирующих приложение Flask:

# celery app instance
celery = get_celery_app_instance(app)

Затем добавьте следующий блок кода в нижнюю часть файла:

# celery tasks
@celery.task
def sending_email_with_celery():
    print("Executing Long running task : Sending email with celery...")
    time.sleep(15)
    print("Task complete!")

Здесь мы просто определили функцию с именем sending_email_with_celery(), которая будет имитировать отправку электронного письма, которая может занять 15 секунд.

Однако для того, чтобы эта функция выполнялась как фоновая задача, в строке чуть выше определения функции добавлен декоратор @celery.task.

Наконец, определите маршрут для запуска этой функции:

# route to trigger celery task
@app.route("/long_running_task_celery")
def long_running_task_celery():
    # function.delay() is used to trigger function as celery task
    sending_email_with_celery.delay()
    return f"Long running task triggered with Celery! Check terminal to see the logs..."

В этом блоке кода мы определяем маршрут /long_running_task_celery, который запускает функцию как задачу Celery.

Обратите внимание, что функция вызывается с помощью метода delay(). Это указывает на то, что мы хотим запустить эту функцию как задачу Celery, а не как обычную функцию Python.

Наконец, чтобы увидеть это в действии, давайте добавим еще одну кнопку в файл tasks.html для запуска этой функции.

<div>
    <a class="btn btn-primary" href="/long_running_task" role="button">Trigger Long Running Task</a>
</div>

<!-- new code -->
<div>
    <a class="btn btn-primary" href="/long_running_task_celery" role="button">Trigger Long Running Task with Celery</a>
</div>

Пришло время увидеть его в действии!

Убедитесь, что у вас запущен сервер Flask в окне терминала.

В другом окне терминала перейдите (cd) в корень проекта и выполните следующую команду для запуска Celery worker.

celery -A app.celery worker --loglevel=info

Откройте 127.0.0.1:5000/tasks в браузере, где вы должны увидеть две кнопки:

  • Запуск долго выполняющейся функции с помощью Python.
  • Запуск долго выполняющейся функции с помощью Celery.

Мы уже видели, что если мы запускаем долго выполняющуюся функцию с помощью Python, сервер замирает до завершения выполнения этой функции.

Теперь, если вы нажмете на кнопку Trigger Long-Running Task with Celery, вы увидите, что страница мгновенно перенаправляется на маршрут /long_running_task_celery, а в окне браузера появится ожидаемый результат.

В фоновом режиме выполнение функции осуществляется Celery. Чтобы посмотреть журнал выполнения функции, переключитесь в окно терминала, в котором вы запустили Celery worker. Оно должно выглядеть примерно так:

Заключение

Вот и все! Теперь вы знаете, как настраивать и запускать длительно выполняющиеся задачи с помощью Celery в вашем веб-приложении Flask.

Вот краткий обзор того, что мы обсудили выше. Чтобы запустить функцию как задачу Celery, вам необходимо:

  • Импортируйте экземпляр приложения Celery в ваш файл.
  • Добавьте декоратор @celery.task поверх определения функции.
  • Запустите функцию, используя метод function_name.delay().

Останні статті

Обучение Power BI – какие онлайн курсы аналитики выбрать

Сегодня мы поговорим о том, как выбрать лучшие курсы Power BI в Украине, особенно для…

13.01.2024

Work.ua назвал самые конкурентные вакансии в IТ за 2023 год

В 2023 году во всех крупнейших регионах конкуренция за вакансию выросла на 5–12%. Не исключением…

08.12.2023

Украинская IT-рекрутерка создала бесплатный трекер поиска работы

Unicorn Hunter/Talent Manager Лина Калиш создала бесплатный трекер поиска работы в Notion, систематизирующий все этапы…

07.12.2023

Mate academy отправит работников в 10-дневный оплачиваемый отпуск

Edtech-стартап Mate academy принял решение отправить своих работников в десятидневный отпуск – с 25 декабря…

07.12.2023

Переписки, фото, история браузера: киевский программист зарабатывал на шпионаже

Служба безопасности Украины задержала в Киеве 46-летнего программиста, который за деньги устанавливал шпионские программы и…

07.12.2023

Как вырасти до сеньйора? Девелопер создал популярную подборку на Github

IT-специалист Джордан Катлер создал и выложил на Github подборку разнообразных ресурсов, которые помогут достичь уровня…

07.12.2023