Подключение RabbitMQ - очереди и работа с ними

Опубликовано 2021.09.11

В данной статье мы рассмотрим как развернуть и использовать очередь заданий в PHP с помощью сервера RabbitMQ.

Сам рэббит должен быть установлен через пакеты или можно поднять Docker контейнер.

Зачем использовать очереди заданий

Очереди заданий позволяют вашей инфраструктуре изящно справляться с резкими скачками нагрузки без необходимости быстрого увеличения ресурсов. То есть при увеличении нагрузки вам не придётся докупать железо. Достаточно будет разместить задания в очереди и они выполнятся последовательно, сэкономив тем самым ресурсы процессора и оперативной памяти.

Например, если вы создали свой собственный аналог Youtube, вы можете создать очередь для преобразования видео, загружаемых пользователями. Без очереди ваши серверы конвертации видео были бы перегружены в часы пик (середина дня), так как им пришлось бы одновременно обрабатывать много видео. Но с очередью видео будут обрабатываться одно за одним, и если есть большое отставание(в очереди скопилось много видео на конвертацию), у вас есть время для развертывания ещё одного сервера с очередью. Так вы можете масштабировать сервера, или просто дождитесь завершения очереди за ночь, пока люди больше не загружают видео.

Подключение

Во-первых, нам нужно использовать composer для установки пакета PHP для взаимодействия с RabbitMQ. Вам нужно будет установить composer, если у вас его еще нет.

  1. composer require php-amqplib/php-amqplib

Publisher

Теперь создадим нашего паблишера. Он создаст задания, которые будут выполняться в очереди. В этом руководстве я назвал его publisher.php.

  1. require_once(__DIR__ . '/vendor/autoload.php');
  2.  
  3. define("RABBITMQ_HOST", "localhost");
  4. define("RABBITMQ_PORT", 5672);
  5. define("RABBITMQ_USERNAME", "guest");
  6. define("RABBITMQ_PASSWORD", "guest");
  7. define("RABBITMQ_QUEUE_NAME", "my_tasks");
  8.  
  9. $connection = new \PhpAmqpLib\Connection\AMQPStreamConnection(
  10. RABBITMQ_HOST,
  11. RABBITMQ_PORT,
  12. RABBITMQ_USERNAME,
  13. RABBITMQ_PASSWORD
  14. );
  15.  
  16. $channel = $connection->channel();
  17.  
  18. # Создание очереди, если она еще не существует.
  19. $channel->queue_declare(
  20. $queue = RABBITMQ_QUEUE_NAME,
  21. $passive = false,
  22. $durable = true,
  23. $exclusive = false,
  24. $auto_delete = false,
  25. $nowait = false,
  26. $arguments = null,
  27. $ticket = null
  28. );
  29.  
  30. $job_id=0;
  31. while ($job_id <= 100)
  32. {
  33. $jobArray = array(
  34. 'id' => $job_id,
  35. 'task' => 'sleep',
  36. 'sleep_time' => rand(3, 15)
  37. );
  38.  
  39. $msg = new \PhpAmqpLib\Message\AMQPMessage(
  40. json_encode($jobArray, JSON_UNESCAPED_SLASHES),
  41. array('delivery_mode' => 2)
  42. );
  43.  
  44. $channel->basic_publish($msg, '', RABBITMQ_QUEUE_NAME);
  45. print 'Создано задание' . PHP_EOL;
  46. $job_id++;
  47. }
  48. </=>

В этом примере я использовал JSON для хранения тела сообщения, чтобы передать задание воркерам. Сообщения просто должны быть строкой, поэтому вы можете использовать любой формат, который вам нравится. Если вам достаточно просто строки - можете не паковать её в JSON. Или, например сериализуйте, или создайте XML-документ.

Вы также можете заметить, что я указал в задании id. Это сделано для того, чтобы позже мы увидели, что ни одна задача не выполняется более чем одним воркером.

Worker

  1. require_once __DIR__ . '/vendor/autoload.php';
  2.  
  3. define("RABBITMQ_HOST", "localhost");
  4. define("RABBITMQ_PORT", 5672);
  5. define("RABBITMQ_USERNAME", "guest");
  6. define("RABBITMQ_PASSWORD", "guest");
  7. define("RABBITMQ_QUEUE_NAME", "my_tasks");
  8.  
  9. $connection = new \PhpAmqpLib\Connection\AMQPStreamConnection(
  10. RABBITMQ_HOST,
  11. RABBITMQ_PORT,
  12. RABBITMQ_USERNAME,
  13. RABBITMQ_PASSWORD
  14. );
  15.  
  16.  
  17. $channel = $connection->channel();
  18.  
  19. # Создание очереди, если она еще не существует.
  20. $channel->queue_declare(
  21. $queue = RABBITMQ_QUEUE_NAME,
  22. $passive = false,
  23. $durable = true,
  24. $exclusive = false,
  25. $auto_delete = false,
  26. $nowait = false,
  27. $arguments = null,
  28. $ticket = null
  29. );
  30.  
  31.  
  32. echo ' [*] Обработка сообщений. Для выхода нажмите CTRL+C', "\n";
  33.  
  34. $callback = function($msg){
  35. echo " [x] Сообщение ", $msg->body, "\n";
  36. $job = json_decode($msg->body, $assocForm=true);
  37. sleep($job['sleep_time']);
  38. echo " [x] Готово", "\n";
  39. $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
  40. };
  41.  
  42. $channel->basic_qos(null, 1, null);
  43.  
  44. $channel->basic_consume(
  45. $queue = RABBITMQ_QUEUE_NAME,
  46. $consumer_tag = '',
  47. $no_local = false,
  48. $no_ack = false,
  49. $exclusive = false,
  50. $nowait = false,
  51. $callback
  52. );
  53.  
  54. while (count($channel->callbacks))
  55. {
  56. $channel->wait();
  57. }
  58.  
  59. $channel->close();
  60. $connection->close();

Обратите внимание на переменную $callback - в ней хранится функция, которая прочитает сообщение и просто вызовет функцию sleep на то количество секунд, которое хранится в элементе sleep_time сообщений очереди. Так мы эмулируем работу тяжелых задач.

Данный воркер будет работать пока вы его не остановите. Если он обработает все сообщения очереди - он просто будет ждать новых. И ка только вы их добавите - он мгновенно проступит к их обработке.

Тестирование

Теперь, когда у нас есть как создатель задач (publisher.php) и обработчик задач (worker.php), вы можете начать видеть очередь в действии. Просто запустите процесс publisher.php, а затем worker.php потоки для обработки очереди.

Вы можете запустить publisher.php и увидеть, что в очереди добавилось 100 сообщений. Через некоторое время они все обработаются.

Вот так, без увеличения мощности сервера можно значительно снизить нагрузку, не отказываясь от работы скриптов.