Tout le monde a déjà entendu parler de Kafka et lu maints articles à ce sujet. Tout le monde a testé les “consumers” et “producers”, tout le monde a développé un “stream” pour compter des mots… Mais lorsque l’on sort des sentiers battus, il est difficile de trouver des articles sur comment utiliser la Stream Processor API dans des cas spécifiques. Cette dernière permet d’écrire des Streams Kafka lorsque Streams DSL n’est pas suffisant. Ces API permettent de définir la Topology d’un stream représentant la logique métier à appliquer sur la donnée traitée..
Bien entendu, je pense qu’il faut bien réfléchir avant d’utiliser un outil d’une façon différente de ce pour quoi il a été conçu. La question que l’on doit se poser est “Est-ce que ma façon d’implémenter mon besoin est justifiée, est-ce que j’utilise le bon outil ?” (Il est toujours possible d’ouvrir des huîtres avec un tournevis…)
Dans le cadre d’un projet client, des boîtiers installés sur des véhicules envoient des données captées sur ces derniers et sont transmises au format protocol buffer via le protocole MQTT sur des brokers en mode streaming. C’est un cas d’usage standard qui est facile à mettre en place dans la majeure partie des cas. Cela jusqu’au jour où de nouveaux boîtiers ont leur propre logique : stocker la donnée pour l’agréger au format trajet avant de l’envoyer.
Cela induit des messages volumineux qui doivent être découpés en sous messages par les boitiers lorsque leur taille dépasse la taille maximum autorisée par le broker pour un message ou que la qualité du réseau ne permet pas d’envoyer des messages trop volumineux. De plus, ces messages peuvent être envoyés dans le désordre, sur une période plus ou moins longue et sans assurance de tous les recevoir (1). Enfin, les données d’un trajet doivent être renvoyées à un tiers une fois le trajet reçu en un seul message reconstruit.
Le processus décrit dans le schéma suivant permet de valider l’envoi d’un trajet consolidé une fois qu’il est entièrement reçu.
Il doit répondre aux deux cas suivants :
Pour qu’il soit possible de savoir si un message est de type trajet, une information facultative doit être rajoutée dans le message (cf .proto (2)). Le champs “Trip” est présent si le boîtier émet des données en mode “trajet”, et contient le nombre de paquets représentant le trajet. Cette information permettra de savoir si tous les paquets d’un trajet sont reçus.
message Trip
{
required int64 start = 1;
required int64 end = 2;
required uint32 chunk = 3;
required uint32 chunks = 4;
}
message Header
{
required int64 ts = 1;
required int64 device = 2;
optional Trip trip = 3;
}
Comme évoqué précédemment, l’API Streams DSL de Kafka ne permet pas de gérer ce cas. De prime abord nous pourrions penser qu’il est possible de le gérer avec les Sessions Windows, ces dernières sont représentées par des périodes d’activité séparées par des périodes d’inactivité. Si les boitiers envoient les données en mode streaming il est conseillé d’utiliser cette fonctionnalité car elle correspond à la définition d’un trajet. Mais dans notre cas, le trajet est déjà construit par le boîtier et il faut le reconstruire côté serveur.
Dans notre cas, nous considérons qu’un trajet peut être reconstruit côté serveur lorsque tous les paquets d’un même trajet sont reçus ; si le TTL d’attente de tous les paquets est atteint nous considérons que nous ne pouvons pas envoyer le trajet incomplet au tiers.
De plus, le stream utilisé pour reconstruire les trajets ne doit pas stocker toute la donnée, seulement les index des paquets reçus afin de ne pas avoir de messages de taille trop importante et d’informations inutiles. L’idée est de réduire l’impact mémoire des messages qui transitent par Kafka, dans notre cas seules les métadonnées sont nécessaires pour définir un trajet.
Pour répondre à cette problématique il faut donc utiliser l’ API Processor et plus particulièrement son intégration avec l’API Stream DSL . Voici une liste des composants importants:
Dans la majeure partie des cas l’API Stream DSL fournie par Kafka permet de répondre au besoin d’implémentation. Dans cet exemple, le fait d’utiliser un Transformer custom a permis de répondre à la problématique, mais dans certains cas il faut utiliser l’API bas niveau pour construire entièrement le stream en utilisant des Source, Processors et Sink.
La définition de la Topology établie uniquement avec l’API processor permet une grande liberté d’implémentation mais s’avère verbeuse et contraignante. La couche d’abstraction fournie par l’API Stream DSL simplifie l’écriture des streams et la possibilité de mixer les deux API Processor & Stream DSL évite d’avoir à tout écrire avec uniquement des processors dans certains cas.
Dans cet article nous nous sommes focalisés sur un cas précis, mais lorsqu’est abordé un sujet avec Kafka, il faut commencer par travailler sur la modélisation des données, sur les clés et valeurs qui doivent transiter dans le broker. La clé d’un message est très importante pour le partitioning et un choix inadapté peut avoir des conséquences sur l’écriture des streams et des consumers notamment.
N’hésitez pas à nous contacter si vous avez des questions sur ce sujet ou sur Kafka en général, nous nous ferons un plaisir de vous répondre et d’échanger avec vous !
1 Le protocole MQTT avec un QoS 2 assure qu’un message est envoyé et est reçu une seule fois. Mais dans le cadre du projet les données sont envoyées avec une connexion GSM et certains véhicules roulant à l’étranger peuvent rester de longues périodes dans des zones blanches.
2 Le .proto est le fichier qui définit le format des messages Protocol Buffer.
3 Dans notre cas nous utilisons un Schema Registry avec des schémas Avro.
Merci de nous avoir contactés.
Nous reviendrons vers vous dès que possible.
Oups ! Une erreur s'est produite lors de l'envoi de votre message.
Veuillez réessayer plus tard.