Retour

Créer un RPC via RabbitMQ

6 janv. 2014
8mn

RabbitMQ est un gestionnaire de queue, il permet donc de conserver des messages et de les lire via une autre tâche. Une présentation plus approfondie sera faite dans un autre article. Dans cet article, nous allons nous intéresser à un concept important dans RabbitMQ : le RPC. Un RPC (remote procedure call) permet d'envoyer un message à une queue et d'en attendre la réponse, pour mieux comprendre ce concept, partons d'un exemple simple : la génération d'une url de contenu externalisée. Il y a donc un client qui envoie un contenu dans une queue RabbitMQ afin de connaitre l'url générée. Le client n'a alors besoin que d'une méthode "call".

<?php class generateUrlClient { public function call($value) { // TODO return $response; } } $generateUrlClient= new generateUrlClient(); $response = $generateUrlClient->call('vive le RPC'); echo "Url généré ".$response;

Toujours dans le client, l'initialisation de la queue de "callback" permet au message mis dans la queue de savoir où déposer le message de réponse.

<?php list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); $msg = new AMQPMessage( $payload, array('reply_to' => $queue_name)); $channel->basic_publish($msg, '', 'rpc_queue'); // Ici le code de lecture le réponse

Vous pouvez trouver toutes les options disponibles pour le protocole AMQP dans la library suivante https://github.com/videlalvaro/php-amqplib

Le code ci-dessus fait que tous les messages publiés dans la queue auront une réponse dans la queue de callback. Un problème demeure : comment reconnaître chaque message dans la queue de callback? L'idée est de mettre sur chaque message une clé unique qui permet de le reconnaitre ensuite.

La clé unique est ce que l'on appelle la 'correlation_id', elle permet d'identifier chaque réponse par rapport à son message. Elle est envoyée sur chaque message envoyé sur le serveur, et renvoyée dans la réponse qui permet alors de reconnaître la demande initiale. Avant de faire l'exemple de code, voici un petit résumé:

RPC description

Comme on peut le voir sur le schéma ci-dessus, le client envoie un message dans la queue 'rpc_queue' avec l'option reply_to qui permet d'envoyer la réponse dans une queue de callblack et la clé de 'correlation_id' qui est l'index unique de chaque demande. Commençons l'exemple du serveur de génération d'url via un titre. Nous commencerons par le serveur qui s'occupera de créer une url à partir d'un titre. Pour l'exemple, nous prenons simplement un titre et remplaçons les espaces par des underscores.

<?php require_once __DIR__.'/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPConnection; use PhpAmqpLib\Message\AMQPMessage; class generateUrlClient { private $connection; private $channel; private $callback_queue; private $response; private $correlation_id; // On créer la connexion comme sur le serveur public function __construct() { // Création de la connexion RabbitMQ $this->connection = new AMQPConnection( 'localhost', 5672, 'guest', 'guest'); // On récupère ensuite le channel qui nous permet de communiquer avec RabbitMQ $this->channel = $this->connection->channel(); // Création de la queue list($this->callback_queue, ,) = $this->channel->queue_declare( "", false, false, true, false); // Consommation de la queue en mode réponse $this->channel->basic_consume( $this->callback_queue, '', false, false, false, false, array($this, 'on_response')); } // Varification de la correlation_id public function on_response($rep) { if($rep->get('correlation_id') == $this->correlation_id) { $this->response = $rep->body; } } // Publication dans la queue de notre message public function call($n) { $this->response = null; $this->correlation_id = uniqid(); // Préparation du message avec demande de callback $msg = new AMQPMessage( (string) $n, array('correlation_id' => $this->correlation_id, 'reply_to' => $this->callback_queue) ); // Publication du message dans la queue rpc $this->channel->basic_publish($msg, '', 'rpc_queue'); // On attend la réponse while(!$this->response) { $this->channel->wait(); } // On retourne la reponse return $this->response; } }; // On creer un message et on attend la reponse $generateUrlClient = new generateUrlClient(); $titles = array('test de ouf', 'numero 1', 'numero 5'); foreach ($titles as $title) { $response = $generateUrlClient->call($title); echo " Pour le titre ".$title.' url recu '.$response, "\n"; }

Lancer le serveur directement avec:

php generateUrlServer.php

Maintenant créons le client qui devra envoyer le message au serveur de génération et l'utiliser ensuite.

<?php require_once __DIR__.'/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPConnection; use PhpAmqpLib\Message\AMQPMessage; class generateUrlClient { private $connection; private $channel; private $callback_queue; private $response; private $correlation_id; // On créer la connexion comme sur le serveur public function __construct() { // Création de la connexion RabbitMQ $this->connection = new AMQPConnection( 'localhost', 5672, 'guest', 'guest'); // On récupère ensuite le channel qui nous permet de communiquer avec RabbitMQ $this->channel = $this->connection->channel(); // Création de la queue list($this->callback_queue, ,) = $this->channel->queue_declare( "", false, false, true, false); // Consommation de la queue en mode réponse $this->channel->basic_consume( $this->callback_queue, '', false, false, false, false, array($this, 'on_response')); } // Varification de la correlation_id public function on_response($rep) { if($rep->get('correlation_id') == $this->correlation_id) { $this->response = $rep->body; } } // Publication dans la queue de notre message public function call($n) { $this->response = null; $this->correlation_id = uniqid(); // Préparation du message avec demande de callback $msg = new AMQPMessage( (string) $n, array('correlation_id' => $this->correlation_id, 'reply_to' => $this->callback_queue) ); // Publication du message dans la queue rpc $this->channel->basic_publish($msg, '', 'rpc_queue'); // On attend la réponse while(!$this->response) { $this->channel->wait(); } // On retourne la reponse return $this->response; } }; // On creer un message et on attend la reponse $generateUrlClient = new generateUrlClient(); $titles = array('test de ouf', 'numero 1', 'numero 5'); foreach ($titles as $title) { $response = $generateUrlClient->call($title); echo " Pour le titre ".$title.' url recu '.$response, "\n"; }

Maintenant lançons le client :

php generateUrlClient.php

Si tout se passe bien, le serveur renvoie les bonnes valeurs.

Laissez un commentaire si vous avez des questions.


Articles sur le même thème

Découvrez Eleven Labs

L'ESN qui fait décoller vos projets web, mobile & data !

Voir le site web

Contact

Eleven Labs - Paris

102, rue du Faubourg Saint Honoré

75008 Paris

Eleven Labs - Nantes

40, rue la Tour d'Auvergne

44200 Nantes

Eleven Labs - Montréal

1155, Metcalfe St Suite 1500

Montréal, QC H3B 2V6, Canada