В данной статье мы рассмотрим как развернуть и использовать очередь заданий в PHP с помощью сервера RabbitMQ.
Сам рэббит должен быть установлен через пакеты или можно поднять Docker контейнер.
Зачем использовать очереди заданий
Очереди заданий позволяют вашей инфраструктуре изящно справляться с резкими скачками нагрузки без необходимости быстрого увеличения ресурсов. То есть при увеличении нагрузки вам не придётся докупать железо. Достаточно будет разместить задания в очереди и они выполнятся последовательно, сэкономив тем самым ресурсы процессора и оперативной памяти.
Например, если вы создали свой собственный аналог Youtube, вы можете создать очередь для преобразования видео, загружаемых пользователями. Без очереди ваши серверы конвертации видео были бы перегружены в часы пик (середина дня), так как им пришлось бы одновременно обрабатывать много видео. Но с очередью видео будут обрабатываться одно за одним, и если есть большое отставание(в очереди скопилось много видео на конвертацию), у вас есть время для развертывания ещё одного сервера с очередью. Так вы можете масштабировать сервера, или просто дождитесь завершения очереди за ночь, пока люди больше не загружают видео.
Подключение
Во-первых, нам нужно использовать composer для установки пакета PHP для взаимодействия с RabbitMQ. Вам нужно будет установить composer, если у вас его еще нет.
composer require php-amqplib/php-amqplib
Publisher
Теперь создадим нашего паблишера. Он создаст задания, которые будут выполняться в очереди. В этом руководстве я назвал его publisher.php.
require_once(__DIR__ . '/vendor/autoload.php'); $connection = new \PhpAmqpLib\Connection\AMQPStreamConnection( RABBITMQ_HOST, RABBITMQ_PORT, RABBITMQ_USERNAME, RABBITMQ_PASSWORD ); $channel = $connection->channel(); # Создание очереди, если она еще не существует. $channel->queue_declare( $queue = RABBITMQ_QUEUE_NAME, $passive = false, $durable = true, $exclusive = false, $auto_delete = false, $nowait = false, $arguments = null, $ticket = null ); $job_id=0; while ($job_id <= 100) { 'id' => $job_id, 'task' => 'sleep', ); $msg = new \PhpAmqpLib\Message\AMQPMessage( ); $channel->basic_publish($msg, '', RABBITMQ_QUEUE_NAME); print 'Создано задание' . PHP_EOL; $job_id++; } </=>
В этом примере я использовал JSON для хранения тела сообщения, чтобы передать задание воркерам. Сообщения просто должны быть строкой, поэтому вы можете использовать любой формат, который вам нравится. Если вам достаточно просто строки - можете не паковать её в JSON. Или, например сериализуйте, или создайте XML-документ.
Вы также можете заметить, что я указал в задании id. Это сделано для того, чтобы позже мы увидели, что ни одна задача не выполняется более чем одним воркером.
Worker
require_once __DIR__ . '/vendor/autoload.php'; $connection = new \PhpAmqpLib\Connection\AMQPStreamConnection( RABBITMQ_HOST, RABBITMQ_PORT, RABBITMQ_USERNAME, RABBITMQ_PASSWORD ); $channel = $connection->channel(); # Создание очереди, если она еще не существует. $channel->queue_declare( $queue = RABBITMQ_QUEUE_NAME, $passive = false, $durable = true, $exclusive = false, $auto_delete = false, $nowait = false, $arguments = null, $ticket = null ); echo ' [*] Обработка сообщений. Для выхода нажмите CTRL+C', "\n"; $callback = function($msg){ echo " [x] Сообщение ", $msg->body, "\n"; echo " [x] Готово", "\n"; $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; $channel->basic_qos(null, 1, null); $channel->basic_consume( $queue = RABBITMQ_QUEUE_NAME, $consumer_tag = '', $no_local = false, $no_ack = false, $exclusive = false, $nowait = false, $callback ); { $channel->wait(); } $channel->close(); $connection->close();
Обратите внимание на переменную $callback - в ней хранится функция, которая прочитает сообщение и просто вызовет функцию sleep на то количество секунд, которое хранится в элементе sleep_time сообщений очереди. Так мы эмулируем работу тяжелых задач.
Данный воркер будет работать пока вы его не остановите. Если он обработает все сообщения очереди - он просто будет ждать новых. И ка только вы их добавите - он мгновенно проступит к их обработке.
Тестирование
Теперь, когда у нас есть как создатель задач (publisher.php) и обработчик задач (worker.php), вы можете начать видеть очередь в действии. Просто запустите процесс publisher.php, а затем worker.php потоки для обработки очереди.
Вы можете запустить publisher.php и увидеть, что в очереди добавилось 100 сообщений. Через некоторое время они все обработаются.
Вот так, без увеличения мощности сервера можно значительно снизить нагрузку, не отказываясь от работы скриптов.