Créer un RPC via RabbitMQ

Créer un RPC via RabbitMQ


Sommaire

RabbitMQ est un gestionnaire de queue, il permet donc de conserver des messages et de les lire via une autre tâche. Une présentation plus approfondie sera faite dans un autre article. Dans cet article, nous allons nous intéresser à un concept important dans RabbitMQ : le RPC. Un RPC (remote procedure call) permet d'envoyer un message à une queue et d'en attendre la réponse, pour mieux comprendre ce concept, partons d'un exemple simple : la génération d'une url de contenu externalisée. Il y a donc un client qui envoie un contenu dans une queue RabbitMQ afin de connaitre l'url générée. Le client n'a alors besoin que d'une méthode "call".

<?php class generateUrlClient { public function call($value) { // TODO return $response; } } $generateUrlClient= new generateUrlClient(); $response = $generateUrlClient->call('vive le RPC'); echo "Url généré ".$response;

Toujours dans le client, l'initialisation de la queue de "callback" permet au message mis dans la queue de savoir où déposer le message de réponse.

<?php list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); $msg = new AMQPMessage( $payload, array('reply_to' => $queue_name)); $channel->basic_publish($msg, '', 'rpc_queue'); // Ici le code de lecture le réponse

Vous pouvez trouver toutes les options disponibles pour le protocole AMQP dans la library suivante https://github.com/videlalvaro/php-amqplib

Le code ci-dessus fait que tous les messages publiés dans la queue auront une réponse dans la queue de callback. Un problème demeure : comment reconnaître chaque message dans la queue de callback? L'idée est de mettre sur chaque message une clé unique qui permet de le reconnaitre ensuite.

La clé unique est ce que l'on appelle la 'correlation_id', elle permet d'identifier chaque réponse par rapport à son message. Elle est envoyée sur chaque message envoyé sur le serveur, et renvoyée dans la réponse qui permet alors de reconnaître la demande initiale. Avant de faire l'exemple de code, voici un petit résumé:

RPC description

Comme on peut le voir sur le schéma ci-dessus, le client envoie un message dans la queue 'rpc_queue' avec l'option reply_to qui permet d'envoyer la réponse dans une queue de callblack et la clé de 'correlation_id' qui est l'index unique de chaque demande. Commençons l'exemple du serveur de génération d'url via un titre. Nous commencerons par le serveur qui s'occupera de créer une url à partir d'un titre. Pour l'exemple, nous prenons simplement un titre et remplaçons les espaces par des underscores.

<?php require_once __DIR__.'/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPConnection; use PhpAmqpLib\Message\AMQPMessage; class generateUrlClient { private $connection; private $channel; private $callback_queue; private $response; private $correlation_id; // On créer la connexion comme sur le serveur public function __construct() { // Création de la connexion RabbitMQ $this->connection = new AMQPConnection( 'localhost', 5672, 'guest', 'guest'); // On récupère ensuite le channel qui nous permet de communiquer avec RabbitMQ $this->channel = $this->connection->channel(); // Création de la queue list($this->callback_queue, ,) = $this->channel->queue_declare( "", false, false, true, false); // Consommation de la queue en mode réponse $this->channel->basic_consume( $this->callback_queue, '', false, false, false, false, array($this, 'on_response')); } // Varification de la correlation_id public function on_response($rep) { if($rep->get('correlation_id') == $this->correlation_id) { $this->response = $rep->body; } } // Publication dans la queue de notre message public function call($n) { $this->response = null; $this->correlation_id = uniqid(); // Préparation du message avec demande de callback $msg = new AMQPMessage( (string) $n, array('correlation_id' => $this->correlation_id, 'reply_to' => $this->callback_queue) ); // Publication du message dans la queue rpc $this->channel->basic_publish($msg, '', 'rpc_queue'); // On attend la réponse while(!$this->response) { $this->channel->wait(); } // On retourne la reponse return $this->response; } }; // On creer un message et on attend la reponse $generateUrlClient = new generateUrlClient(); $titles = array('test de ouf', 'numero 1', 'numero 5'); foreach ($titles as $title) { $response = $generateUrlClient->call($title); echo " Pour le titre ".$title.' url recu '.$response, "\n"; }

Lancer le serveur directement avec:

php generateUrlServer.php

Maintenant créons le client qui devra envoyer le message au serveur de génération et l'utiliser ensuite.

<?php require_once __DIR__.'/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPConnection; use PhpAmqpLib\Message\AMQPMessage; class generateUrlClient { private $connection; private $channel; private $callback_queue; private $response; private $correlation_id; // On créer la connexion comme sur le serveur public function __construct() { // Création de la connexion RabbitMQ $this->connection = new AMQPConnection( 'localhost', 5672, 'guest', 'guest'); // On récupère ensuite le channel qui nous permet de communiquer avec RabbitMQ $this->channel = $this->connection->channel(); // Création de la queue list($this->callback_queue, ,) = $this->channel->queue_declare( "", false, false, true, false); // Consommation de la queue en mode réponse $this->channel->basic_consume( $this->callback_queue, '', false, false, false, false, array($this, 'on_response')); } // Varification de la correlation_id public function on_response($rep) { if($rep->get('correlation_id') == $this->correlation_id) { $this->response = $rep->body; } } // Publication dans la queue de notre message public function call($n) { $this->response = null; $this->correlation_id = uniqid(); // Préparation du message avec demande de callback $msg = new AMQPMessage( (string) $n, array('correlation_id' => $this->correlation_id, 'reply_to' => $this->callback_queue) ); // Publication du message dans la queue rpc $this->channel->basic_publish($msg, '', 'rpc_queue'); // On attend la réponse while(!$this->response) { $this->channel->wait(); } // On retourne la reponse return $this->response; } }; // On creer un message et on attend la reponse $generateUrlClient = new generateUrlClient(); $titles = array('test de ouf', 'numero 1', 'numero 5'); foreach ($titles as $title) { $response = $generateUrlClient->call($title); echo " Pour le titre ".$title.' url recu '.$response, "\n"; }

Maintenant lançons le client :

php generateUrlClient.php

Si tout se passe bien, le serveur renvoie les bonnes valeurs.

Laissez un commentaire si vous avez des questions.

Auteur(s)

Jonathan Jalouzot

Jonathan Jalouzot

Lead développeur au @lemondefr, mes technologies sont le symfony depuis 2009, le nodejs, l'angularjs, rabbitMq etc ... J'adore les médias et aimerai continuer dans ce secteur plein de surprise. Vous pouvez me retrouver sur les réseaux sociaux: Twitter: @captainjojo42 Instagram: @captainjojo42 Linkedin: https://fr.linkedin.com/in/jonathanjalouzot Github: https://github.com/captainjojo

Voir le profil

Vous souhaitez en savoir plus sur le sujet ?
Organisons un échange !

Notre équipe d'experts répond à toutes vos questions.

Nous contacter

Découvrez nos autres contenus dans le même thème