Очереди на Beanstalk
По своей сути Beanstalkd — упрощенная и легкая система очередей, которая разрабатывалась под нужны Causes. Представляется как менеджер заданий распределенного приложения, который собирает отложенные задачи (отправка почты, различные запросы рода).
Определенные процессы размещают задачи в очередь, а воркеры получают и выполняют задачи из очереди.
Среди основных возможностей beanstalkd:
- приоритизация;
- распределенность — несколько серверов beanstalkd работают по принципу Memcached (можно масштабировать);
- отложенное выполнение (т.з. bury) для последующего выполнения;
- внешние библиотеки (PHP, Python и многие другие), веб-консоли;
- таймаут для автоматического помещения в очередь.
Термины и команды beanstalkd
Терминология инструмента отличается от привычных серверов сообщений, того же Gearman, к примеру:
- jobs — то же, что и сообщения;
- tubes — очереди, в которые помещаются сообщений для передачи работникам (workers);
- producers — приложения, которые создают задания (или сообщения);
- consumers — приложения, которые получают задания из очереди для обработки.
Основных команд у beanstalkd тоже немного:
- put — добавить новое задание в очередь (в том числе отложенно);
- reserve — получить задание из очереди;
- delete — удалить задание после выполнения;
- bury — отложить задание после завершения;
- kick — вытащить задание из статуса bury и поместить в очередь;
- release —- поставить статус “готово” для задачи после выполнения, сохраняется id и приоритет.
Больше команд и опций на официальной странице системы.
Установка и настройка
Beanstalkd входит в систему пакетов aptitude
:
sudo aptitude install beanstalkd
# Также собирается из исходников с GitHub
А для запуска достаточно выполнить:
beanstalkd
# По умолчанию выполняется локально используя порт 11300
После установки демоном beanstalkd также можно управлять как сервисом ОС. Он выполняется в RAM, а для обеспечения fault tolerance запускается с дополнительной опцией:
beanstalkd -b ~/beanstore &
# Пишет данные очереди в каталог beanstore, выполнение в фоне
Если перезапустить beanstalkd с такими же параметрами, то он в первую очередь проверит лог и продолжит выполнение с места остановки.
Демона beanstalkd можно запускать с опциями:
-b DIR директория для валидации -f MS fsync каждое значение миллисекунд ( -f0 для "always fsync") -F без fsync (по умолчанию) -l ADDR слушать адрес (0.0.0.0 по умолчанию) -p PORT слушать порт (11300 по умолчанию) -u USER стать пользователем и группой -z BYTES максимальный размер задания в байтах (65535 по умолчанию) -s BYTES размер каждого файла валидации (10485760 по умолчанию) (будут округлены до значения, кратного 512 байт) -c уменьшить binlog (по умолчанию) -n не уменьшать binlog -v показать версию -V улучшенный вывод -h показать справку
# Опции можно комбинировать
Ко всему прочему beanstalkd поддерживает клиенты для всех популярных языков и обладает API для создания проприетарных клиентов.
Пример использования Ruby
Для первого примера используем клиент Beaneater для Ruby. Размещение задачи (сообщения) в очередь будет выглядеть так:
require 'beaneater' require 'json' beanstalk = Beaneater::Pool.new(['localhost:11300']) tube = beanstalkd.tubes['my-tube'] job = {some: 'key', value: 'object'}.to_json tube.put job
# Задает адрес и порт, имя очереди и само сообщение, в формате JSON
После этого нужно создать скрипт для вылавливания заданий из очереди:
beanstalkd.tubes.watch!('my-tube') loop do job = beanstalk.tubes.reserve begin # обработка задачи job.delete rescue Exception => e job.bury end end
# Дожидается готовности задачи, обрабатывает ее и повторяет весь процесс
Для управления приложениями-потребителями логично использовать контроллеры процессов, как supervisor.
Если обрабатывается большая, тяжелая задача, то верхний пример можно представить как:
beanstalkd.jobs.register('my-tube') do |job| # ... обработка задачи end beanstalkd.jobs.process!
# “Оборачивает” скрипт, резервируя, обрабатывая, а затем удаляя или откладывая задачу, основываясь на результате
Пример использования PHP
Во втором примере реализована отправка почты при помощи Mandrill. Для удобства установки и использования клиента Pheanstalk для PHP рекомендуется использовать Composer
:
composer require mandrill/mandrill pda/pheanstalk
# Установка зависимостей
Потребуется всего два скрипта: поставщика задачи в очередь и потребителя, который будет обрабатывать задания.
Код поставщика будет иметь вид:
<?php require_once __DIR__ . '/vendor/autoload.php'; $email = array( 'to' => '[email protected]', 'from' => '[email protected]', 'subject' => 'Subject', 'body' => 'Some text' ); $pheanstalk = new \Pheanstalk\Pheanstalk('127.0.0.1'); # Добавляет JSON для задачи "email_queue" $pheanstalk ->useTube('email_queue') ->put(json_encode($email));
# Создает локальную задачу email_queue
— строку JSON массива с данными
А воркер будет выглядеть так:
<?php require_once __DIR__ . '/vendor/autoload.php'; $pheanstalk = new \Pheanstalk\Pheanstalk('127.0.0.1'); # Чтение очереди beanstalkd while (true) { # Проверка соединения if (!$pheanstalk->getConnection()->isServiceListening()) { echo "Ошибка соединения, подождите... \n"; # Ждет 5 с sleep(5); # Запуск следующей итерации continue; } # Получить задачу из очереди, если она готова $job = $pheanstalk ->watch('email_queue') ->ignore('default') ->reserve(); $email = json_decode($job->getData(), true); try { # Отправка почты с использованием Mandrill API $mandrill = new Mandrill('[MANDRILL-API-KEY'); $message = array( 'html' => $email['body'], 'subject' => $email['subject'], 'from_email' => $email['from'], 'to' => array( array( 'email' => $email['to'], ) ) ); # Непосредственная отправка письма $result = $mandrill->messages->send($message); # Удалить задачу из очереди $pheanstalk->delete($job); echo "Письмо отправлено \n"; } catch (Exception $e) { echo "Ошибка отправки - {$e->getMessage()} \n"; } }
# Не забудьте использовать свой уникальный API
Затем можно запускать команду:
nohup php email_worker.php > path/to/logfile.txt &
# Игнорирует сигнал SIGHUP, записывает данные в текстовый файл, выполняется в фоне
Самое главное
Beanstalkd подходит для реализации системы очередей любой сложности на любом популярном языке программирования. А распределенное выполнение позволит использовать инструмент в высоко-нагруженных приложениях.
Сообщить об опечатке
Текст, который будет отправлен нашим редакторам: