Linux Embedded

Le blog des technologies libres et embarquées

Une introduction à ZeroMQ

ZeroMQ est une bibliothèque légère et rapide qui permet de faire communiquer des programmes entre eux facilement via des sockets. C'est du moins cet aspect de ZeroMQ que cet article explique.

Introduction

Un des avantages de ZeroMQ est sa portabilité tant au niveau Système d'exploitation qu'au niveau langage. Si vous en voulez la preuve regardez la liste des bindings existant, disponible ici.

ZeroMQ propose différents types de socket pour différentes utilisation. Ces différents type de socket sont articulés autour de quatre modèles de communication :

  •  requête/réponse, où un échange commence par l'émission d'une requête et se fini par la réception de la réponse. Ceci peut-être fait avec une socket REQ, connectée exclusivement à une socket REP, qui envoie des requêtes et attend les réponses. Les sockets DEALER et ROUTER sont utiles pour étendre ce modèle à des systèmes plus complexes (i.e avec plus qu'une simple paire de socket).
  • publication/souscription: une socket PUB émet des messages typé par sujet (ou "topic"). Les sockets SUB connecté s'abonnent à certains sujet pour recevoir les messages correspondant. Une socket PUB peut alimenter plusieurs SUB tandis qu'un socket SUB peut se connecter à plusieurs PUB.
  • pipeline: le type PUSH est assez simple puisqu'il permet uniquement d'envoyer des données, tandis que la type PULL permet seulement d'en recevoir. Ces types de socket permettent de mettre en place très facilement un flux de données unidirectionnel. Cela peut par exemple permettre facilement à différentes applications d'envoyer leur logs à une application unique pour les centraliser, les archiver, ou les envoyer ailleurs.
  • communication interthread : le type de sockets PAIR permet d'échanger facilement des données entre 2 threads d'un même process, avec une faible consommation de ressource.

Pour en savoir plus, voir cette page en anglais.

Exemple de communication asynchrone et bidirectionnelle

L'exemple est codé en c++11, utilise ZeroMQ 3 (disponible en rpm sur fedora, et certainement sur d'autres distribution) et le binding c++ léger "cppzmq" qui consiste en un simple header téléchargeable sur github et à inclure dans le projet. Le code a été découpé en plusieurs partie pour pouvoir les présenter séparément. Le fichier source entier est disponible à la fin de cet article.

Tout d'abord voici une fonction qui sera exécutée par un thread pour envoyer du texte en boucle sur une socket ZMQ :

 void sendSomeData(zmq::socket_t& socket, std::string textToSend, std::mutex& socketMutex)
 {
   while (not stopThread)
   {
     socketMutex.lock();
     socket.send(textToSend.c_str(), textToSend.size(), ZMQ_DONTWAIT);
     socketMutex.unlock();
     std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 1000));
   }
 }

Le flag ZMQ_DONTWAIT permet de ne pas bloquer sur la fonction send s'il n'est pas possible d'envoyer les données (aucune socket prête à les recevoir par exemple). Il faut protéger l'appel à send par un mutex car les socket ZeroMQ ne sont pas thread safe. Or dans notre exemple, la réception et l'envoi sont fait dans deux thread distincts du même programme.

Pour continuer, voici une fonction qui sera exécutée par un thread pour recevoir et afficher le texte reçu par une socket :

void receiveSomeData(zmq::socket_t& socket, std::string id, std::mutex& socketMutex)
 {
    while (not stopThread)
    {
       zmq::message_t receivedMessage;
       socketMutex.lock();
       bool isMessageReceived = socket.recv(&receivedMessage, ZMQ_DONTWAIT);
       socketMutex.unlock();
       if (isMessageReceived)
       {
          std::string receivedString(static_cast<char*>(receivedMessage.data()), receivedMessage.size());
          std::cout << id << " received " << receivedString << std::endl;
          std::this_thread::sleep_for(std::chrono::milliseconds(100));
       }
    }
 }

L'objet zmq::message_t rempli par la méthode recv peut aussi être utilisé lors de l'envoi de données.

L'objectif du code qui va suivre est de créer deux sockets, les connecter entre elles pour établir une communication asynchrone et symétrique, c'est à dire que chaque sockets peut prendre l'initiative d'envoyer un message sans devoir attendre une requête, et sans que cela ne l'empêche de recevoir les messages entrants.

D'après le guide ZeroMQ, un couple de socket DEALER est idéal pour ce cas d'utilisation.
Pour créer une socket ZeroMQ, il faut avoir un context ZeroMQ qu'il est préférable de garder unique et de ne pas détruire prématurément donc

static zmq::context_t ZMQ_CONTEXT(1);

Voici comment utiliser ce zmq::context_t pour créer une socket serveur ainsi que 2 thread pour envoyer et recevoir du texte :

//Création d'une socket de type DEALER
 zmq::socket_t server(ZMQ_CONTEXT, ZMQ_DEALER);
//Qui devient serveur et attend une connexion
server.bind("tcp://*:5353");
//Démarrage du thread qui envoie "ici server" au client via la fonction préalablement décrite « sendSomeData »
std::thread serverSendThread(std::bind(sendSomeData, std::ref(server), "ici server", std::ref(serverMutex)));

//Démarrage du thread de réception des données du client via la fonction préalablement décrite « receiveSomeData »
std::thread serverReceiveThread(std::bind(receiveSomeData, std::ref(server), "Serveur", std::ref(serverMutex)));

ZMQ_DEALER est un simple #define qui permet à ZeroMQ de créer une socket du bon type et de la renvoyer sous la forme générique zmq::socket_t.
Lorsqu'on bind une socket ZeroMQ, il faut préciser le protocol à utiliser, ici « tcp ». Il existe aussi « inproc » pour des échanges au sein d'un process, et « ipc » pour utiliser des sockets ipc.
De manière similaire, voici comment utiliser un contexte pour créer une socket client ainsi que 2 thread pour envoyer et recevoir du texte :

 //Création d'une seconde socket de type DEALER
 zmq::socket_t client(ZMQ_CONTEXT, ZMQ_DEALER);
 //Qui devient cliente de la précédente
 client.connect("tcp://127.0.0.1:5353");
//Démarrage du thread qui envoie "ici client" au server via la fonction préalablement décrite « sendSomeData »
 std::thread clientSendThread(std::bind(sendSomeData, std::ref(client), "ici client", std::ref(clientMutex)));

//Démarrage du thread de réception des données du server via la fonction préalablement décrite « receiveSomeData »
 std::thread clientReceiveThread(std::bind(receiveSomeData, std::ref(client), "Client", std::ref(clientMutex)));

Il n'y a pas beaucoup de différence si ce n'est que bind est remplacé par connect et qu'il faut préciser une adresse IP pour le protocol tcp.

Compilation

Pour compiler cet exemple, il faut installer ZeroMQ 3 ou supérieur, télécharger zmq.hpp depuis le github de cppzmq, le mettre dans le même dossier que le fichier source, et linker avec -lzmq. Sur Fedora, la ligne de compilation est la suivante :
g++ zmqExample.cpp -I/usr/include -I. -std=c++11 -lzmq -o zmqExemple.exe

Surveillance de l'état de la connexion

Le code précédent met en place la communication asynchrone et bidirectionnelle de notre cas d'utilisation. Pour aller un peu plus loin, voici comment utiliser ZeroMQ pour surveiller l'état de la connexion. En effet ZeroMQ premet d'enregistrer des callback sur certains évènements se produisant pour une socket. Pour ce faire, l'API cppzmq propose la classe zmq::monitor_t dont on peut hériter pour faciliter la création des callback. L'exemple suivant définit des callback affichant le nom de l'événement pour chacun des événements possibles

class SocketMonitor: public zmq::monitor_t
{
   public:
   SocketMonitor(std::string identifier) :
   Identifier(identifier)
   {
   }

   std::string Identifier;
   private:
   void on_event_disconnected(const zmq_event_t &event_, const char* addr_) override
   {
      std::cout << Identifier << " received DISCONNECTED event" << std::endl;
   }
   void on_event_accepted(const zmq_event_t &event_, const char* addr_) override
   {
      std::cout << Identifier << " received ACCEPTED event" << std::endl;
   }
   void on_event_connected(const zmq_event_t &event_, const char* addr_) override
   {
      std::cout << Identifier << " received CONNECTED event" << std::endl;
   }
   void on_event_connect_delayed(const zmq_event_t &event_, const char* addr_) override
   {
      std::cout << Identifier << " received CONNECT DELAYED event" << std::endl;
   }
   void on_event_connect_retried(const zmq_event_t &event_, const char* addr_) override
   {
      std::cout << Identifier << " received CONNECT RETRIED event"<< std::endl;
   }
   void on_event_listening(const zmq_event_t &event_, const char* addr_) override
   {
      std::cout << Identifier << " received LISTENING event" << std::endl;
   }
   void on_event_bind_failed(const zmq_event_t &event_, const char* addr_) override
   {
      std::cout << Identifier << " received BIND FAILED event" << std::endl;
   }
   void on_event_accept_failed(const zmq_event_t &event_, const char* addr_) override
   {
      std::cout << Identifier << " received ACCEPT FAILED event" << std::endl;
   }
   void on_event_closed(const zmq_event_t &event_, const char* addr_) override
   {
      std::cout << Identifier << " received CLOSED event" << std::endl;
   }
   void on_event_close_failed(const zmq_event_t &event_, const char* addr_) override
   {
      std::cout << Identifier << " received CLOSE FAILED event" << std::endl;
   }
   void on_event_unknown(const zmq_event_t &event_, const char* addr_) override
   {
      std::cout << Identifier << " received UNKNOWN event" << std::endl;
   }
};

La surveillance d'une socket se fait dans un nouveau thread, qui exécutera la fonction suivante :

void startMonitor(zmq::monitor_t& monitorToStart,zmq::socket_t& socketToMonitor, std::string identifier)
{
   std::string internalAddress("inproc://" + identifier + ".monitor");
   monitorToStart.monitor(socketToMonitor, internalAddress.c_str(),ZMQ_EVENT_ALL);
}

On note donc qu'un zmq::monitor_t crée une socket interne zmq fonctionnant avec le protocol « inproc ». Il lui faut donc une adresse unique. Précisons que la fonction monitor est bloquante, il faut donc lui donner son propre thread. Ce qui peut être fait de la manière suivante :

//Création des monitors
SocketMonitor clientMonitor("Client");
SocketMonitor serverMonitor("Server");

//Démarrage de la surveillance du server
std::thread serverMonitorThread(std::bind(startMonitor, std::ref(serverMonitor), std::ref(server), "Server"));

//Démarrage de la surveillance du client
std::thread clientMonitorThread(std::bind(startMonitor, std::ref(clientMonitor), std::ref(client), "Client"));

Une fois tous ces éléments disponibles, encore faut-il rajouter quelques ligne de codes pour provoquer des évènements et voir le tout fonctionner :

// Cinq secondes d'attente après avoir démarrer les différents thread pour voir les messages échangés, 
// puis 1 secondes d'attente entre chaque changement d'état de la connexion pour voir les événements surveillés se produire
std::this_thread::sleep_for(std::chrono::seconds(5));
std::cout << "Disconnecting server" << std::endl;
server.unbind("tcp://*:5353");
std::this_thread::sleep_for(std::chrono::seconds(1));

std::cout << "Reconnecting server" << std::endl;
server.bind("tcp://*:5353");
std::this_thread::sleep_for(std::chrono::seconds(1));

std::cout << "Disconnecting client" << std::endl;
client.disconnect("tcp://127.0.0.1:5353");
std::this_thread::sleep_for(std::chrono::seconds(1));

std::cout << "Reconnecting client" << std::endl;
client.connect("tcp://127.0.0.1:5353");
std::this_thread::sleep_for(std::chrono::seconds(1));

Ce code permet de voir que la reconnexion est entièrement gérée par ZeroMQ, sans qu'aucune exception ne soit levée, ni de thread bloqué, tout en gardant la possibilité d'être notifié de tout changement de l'état de la connexion afin de pouvoir adopter le comportement adéquat dans le programme.
Dans la sortie standard, voici un aperçu de ce que le programme d'exemple peut afficher :

Server received LISTENING event
Client received CONNECTED event
Server received ACCEPTED event
Serveur received ici client
Client received ici server
(...)
Serveur received ici client
Client received ici server
Client received ici server
Disconnecting server
Server received CLOSED event
Client received DISCONNECTED event
Client received CONNECT RETRIED event
Client received CLOSED event
(...)
Client received CLOSED event
Client received CONNECT RETRIED event
Reconnecting server
Server received LISTENING event
Client received CONNECTED event
Server received ACCEPTED event
Serveur received ici client
Serveur received ici client
Serveur received ici client
Serveur received ici client
Client received ici server
Serveur received ici client
Client received ici server
Serveur received ici client
Disconnecting client
Server received DISCONNECTED event
Reconnecting client
Client received CONNECTED event
Server received ACCEPTED event
Client received ici server
Serveur received ici client
Client received ici server
Serveur received ici client
Client received ici server

Le code complet de l'exemple peut être obtenu ici zmqExample

Conclusion

Ce code permet de voir que ZeroMQ facilite vraiment l'utilisation des sockets, particulièrement dans un environnement multithread. Nul besoin de tester l'état des sockets avant de faire un envoi ou une réception de message. On échappe aussi à la gestion des différents codes de retour des fonctions systèmes classiques (socket, connect, read, write, bind, listen, accept, etc.).
De plus la deuxième partie de l'article montre comment développer facilement des comportements particuliers à une application utilisant ZeroMQ lorsque l'état de la connexion évolue tout en laissant à ZeroMQ la charge de rétablir la connexion. ZeroMQ offre même la possibilité de garder en file d'attente les messages à envoyer pour ne pas bloquer l'application émettrice, ni perdre les messages. La reconnexion automatique offre le luxe, dans le cas de deux programmes distincts, de pouvoir les lancer dans n'importe quel ordre et de pouvoir en redémarrer un sans toucher à l'autre.

Pour en savoir plus

Le site zeromq.org possède tous les liens utiles pour bien commencer avec ZeroMQ, dont voici une sélection des plus utiles :

Laisser un commentaire

Votre adresse de messagerie ne sera pas publiée.