0% ont trouvé ce document utile (0 vote)
12 vues59 pages

M2bdia Bded Kafka

Transféré par

mamagym89
Copyright
© © All Rights Reserved
Nous prenons très au sérieux les droits relatifs au contenu. Si vous pensez qu’il s’agit de votre contenu, signalez une atteinte au droit d’auteur ici.
Formats disponibles
Téléchargez aux formats PDF, TXT ou lisez en ligne sur Scribd
0% ont trouvé ce document utile (0 vote)
12 vues59 pages

M2bdia Bded Kafka

Transféré par

mamagym89
Copyright
© © All Rights Reserved
Nous prenons très au sérieux les droits relatifs au contenu. Si vous pensez qu’il s’agit de votre contenu, signalez une atteinte au droit d’auteur ici.
Formats disponibles
Téléchargez aux formats PDF, TXT ou lisez en ligne sur Scribd

Généralités Fonctionnement Stream processing Ecosystème Ressources

Bases de Données et Environnements Distribués


Bus de messages à hautes performances : Apache Kafka

Annabelle Gillet

IEM/LIB

Révision : novembre 2021


Email : [email protected]

1 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Plan du cours

1 Généralités

2 Fonctionnement

3 Stream processing

4 Ecosystème

5 Ressources

2 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Fonctionnalités

Kafka est un MOM pensé pour le traitement des Big Data


Souvent utilisé pour collecter des données et réaliser des
traitements en temps réel, grâce au stream processing
Il offre des performances et des opportunités de scaling
horizontal que le fonctionnement de JMS ne permet pas
d’obtenir
Utilisé par de nombreuses grosses entreprises comme LinkedIn,
Twitter, Netflix, etc. pour traiter plusieurs millions de
messages par seconde

3 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Un flux de données
Voir les données comme un flux d’information de mises à jour, qui
permet d’obtenir un état lorsqu’il est traité dans son ensemble.

+5

-2
10
-1

+8

Opposé à la vision BD : on ne conserve pas juste l’état final, mais


toutes les informations qui permettent de l’obtenir.
4 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Un système d’échange de messages


Simplifie les échanges entre applications (producteurs et
consommateurs) pour abstraire le système de communication et se
concentrer sur les traitements.

5 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Traitements à faible latence en temps réel

Le map reduce permet de réaliser des traitements sur des données


massives en répartissant les données de manière distribuée
(opération map), puis en leur appliquant des traitements
d’agrégation (opération reduce).

Kafka suit ce principe, mais en continu plutôt qu’en traitements


batch imposants.

6 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Comparaison avec JMS

Critère Kafka JMS


Philosophie Centré consommateur Centré broker
Mode de consommation Pull Push
des messages
Mode d’échange des mes- Topic Queue ou topic
sages
Nombre de consomma- Multiple (groupes de 1 (queue) ou nombre
teurs consommateurs) d’abonnés (topic)
Rétention des messages Paramétrable avec une Suppression après la
limite en temps et/ou en consommation du
volume message

7 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Fonctionnement interne

Architecture

Zookeeper
Groupe de
consommateurs

Consommateur
Producteur
Topic
Consommateur

Producteur Kafka serveur


Groupe de
consommateurs
Topic
Producteur Consommateur

8 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Fonctionnement interne

Messages

Les messages ont un format <clé - valeur>


Le producteur définit la clé et la valeur, et choisit le topic
dans lequel envoyer le message
Lorsque le serveur Kafka reçoit un message d’un producteur, il
lui attribut un offset et un timestamp

9 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Fonctionnement interne

Organisation d’un topic

Les topics sont fractionnés en partitions


Par défaut, les messages sont assignés à une partition en
fonction de la valeur de leur clé : deux messages avec la
même clé se retrouveront dans le même topic
En raison de ce partitionnement, l’ordre des messages n’est
pas garanti pour l’ensemble du topic, mais il l’est pour
chaque partition (offset)
10 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Fonctionnement interne

Segments d’une partition

Lorsque les messages d’une partition sont enregistrés sur


disque, ils sont stockés dans des segments, des fichiers qui
peuvent avoir une limite en temps et/ou en taille (si les deux
sont fixés, un nouveau segment est créé lorsqu’une des deux
limites est atteinte)
La suppression des messages se fait par segment entier,
lorsque l’intégralité du segment dépasse la durée ou la taille
de rétention
Le segment actif (celui en court d’écriture) n’est jamais
supprimé

11 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Fonctionnement interne

Topic compaction

Il es possible de rendre un topic ”compact”, c’est-à-dire que le


topic conserve uniquement le dernier message pour
chaque clé
Cela peut être utile dans les cas où seule la dernière
information d’une clé est importante (par exemple Kafka
stocke les commits des consommateurs dans un topic
compacté consumer offsets)

Nettoyage des topics compacts


La suppression des messages ne se fait pas immédiatement à la
réception d’un nouveau message, mais périodiquement et jamais
sur le segment actif.

12 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Fonctionnement interne

Serveur et brokers

Un serveur Kafka est composé d’un ou plusieurs brokers


Chaque broker est une instance de Kafka
Les brokers se partagent la répartition et la réplication des topics. Pour chaque
partition, un broker est leader et d’autres sont des followers : le leader reçoit
les messages, puis les envoie à ses followers qui lui confirment la réception.
Lorsque tous les followers ont confirmé, le message est commité
Les producteurs et consommateurs interagissent uniquement avec les leaders de
chaque partition. Les followers servent à augmenter la tolérance aux pannes, en
prenant le relais du leader en cas de défaillance de celui-ci
13 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Fonctionnement interne

Configuration d’un broker


Fichier de configuration : config/server.properties
Chaque broker doit avoir son propre fichier de configuration
Paramètres importants de la configuration :
# L ’ adresse de Zookeeper
zookeeper . connect = [ ip : port ]
# L ’ id du broker
broker . id = [ unique integer ]
# Le dossier d ’ enr egistrem ent des messages
log . dirs = [ dossier ]
# L ’ ip et le port d ’ ecoute du broker
listeners = [ PLAINTEXT :// ip : port ]

Autres paramètres intéressants :


# Nombre de replicas d ’ une partition par defaut
default . replication . factor = [ integer ]
# Nombre de partitions d ’ un topic par defaut
num . partitions = [ integer ]

La liste entière des paramètres disponibles :


https://kafka.apache.org/documentation/#configuration
14 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Fonctionnement interne

Lancement d’un broker


Démarrage de Zookeeper :
$ bin / zookeeper - server - start . sh config / zookeeper . properties

Démarrage d’un broker Kafka :


$ bin / kafka - server - start . sh config / server . properties

Pour démarrer un serveur Kafka composé de plusieurs brokers :


Faire une copie du fichier config/server.properties pour chaque
broker
Référencer le même serveur Zookeeper, attribuer un broker id
unique, et si les brokers sont lancés sur la même machine vérifier les
paramètres log.dirs et listeners pour que les brokers n’utilisent pas
les mêmes
Démarrer les brokers individuellement grâce à la ligne de
commande, en référençant le fichier de configuration
correspondant à chaque broker
15 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Fonctionnement interne

Création d’un topic


Pour créer un topic :
$ bin / kafka - topics . sh -- create -- topic topic1 \
-- bootstrap - server localhost :9092

Plusieurs options peuvent être utilisées :


# Pour specifier le nombre de partitions du topic
-- partitions 1
# Pour specifier le nombre de replicas des partitions du
topic
-- replication - factor 1

Pour obtenir la description d’un topic :


$ bin / kafka - topics . sh -- describe -- topic topic1 \
-- bootstrap - server localhost :9092

Pour supprimer un topic :


$ bin / kafka - topics . sh -- delete -- topic topic1 \
-- bootstrap - server localhost :9092

16 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Fonctionnement interne

Gestion des messages

Les messages de chaque partition sont enregistrés dans des


fichiers : l’écriture et la lecture de ces fichiers sont faites de
manière séquentielle (en append-only, dans le style des
logs), ce qui permet d’obtenir de bonnes performances
Les messages restent dans le topic même après avoir été
consommés, ils ne sont supprimés que lorsque le délai de
rétention ou que le volume de stockage par topic est
dépassé
Exemple de configuration de la rétention, dans le fichier
config/server.properties :
log . retention . hours = [ integer ]
log . retention . bytes = [ integer ]

17 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Producteurs

Les producteurs

Les producteurs envoient des messages à un topic


Ils sont indépendants les uns des autres et sont thread-safe
Ils peuvent être paramétrés pour attendre différents niveaux
de confirmation de réception d’un message :
1 acks=0 : le producteur n’attend pas la confirmation de
bonne réception du message par le serveur Kafka
2 acks=1 : le producteur attend uniquement la confirmation de
bonne réception du message par le broker leader de la
partition. La perte de message est possible si le leader
rencontre un problème avant que les brokers ayant un réplicas
de la partition ait pu récupérer le message
3 acks=all (ou acks=-1) : le producteur attend la confirmation
de la bonne réception et réplication du message par tous les
brokers hébergeant la partition

18 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Producteurs

Producteurs console

$ bin / kafka - console - producer . sh -- topic topic1 \


-- bootstrap - server localhost :9092

Chaque ligne entrée à la suite de cette commande constituera un


message envoyé au serveur Kafka.
Les messages sont envoyés sans clé, dans ce cas ils sont répartis en
suivant une stratégie round robin.

19 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Producteurs

Producteurs Java

Importer l’API via maven :


1 < dependency >
2 < groupId > org . apache . kafka </ groupId >
3 < artifactId > kafka - clients </ artifactId >
4 < version >2.6.0 </ version >
5 </ dependency >

Code du producteur :
1 Properties props = new Properties () ;
2 props . put ( ProducerConfig . BOOTSTRAP_SERVERS_CONFIG , " localhost :9092 " ) ;
3 props . put ( ProducerConfig . ACKS_CONFIG , " all " ) ;
4 props . put ( ProducerConfig . KEY_SERIALIZER_CLASS_CONFIG ,
" org . apache . kafka . common . serialization . StringSerializer " ) ;
5 props . put ( ProducerConfig . VALUE_SERIALIZER_CLASS_CONFIG ,
" org . apache . kafka . common . serialization . StringSerializer " ) ;
6
7 Producer < String , String > producer = new KafkaProducer < >( props ) ;
8 for ( int i = 0; i < 100; i ++)
9 producer . send ( new ProducerRecord < String , String >( " topic1 " ,
Integer . toString ( i ) , Integer . toString ( i ) ) ) ;
10
11 producer . close () ;

20 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Producteurs

Configuration du partitionnement
1 import java . util . Map ;
2 import org . apache . kafka . clients . producer . Partitioner ;
3 import org . apache . kafka . common . Cluster ;
4
5 public class Cus tomP art iti one r implements Partitioner {
6
7 private static final int PARTITION_COUNT = 50;
8
9 @Override
10 public void configure ( Map < String , ? > configs ) {}
11
12 @Override
13 public int partition ( String topic , Object key , byte [] keyBytes ,
14 Object value , byte [] valueBytes , Cluster cluster ) {
15 Integer keyInt = Integer . parseInt ( key . toString () ) ;
16 return keyInt % PARTITION_COUNT ;
17 }
18
19 @Override
20 public void close () {}
21 }

À utiliser avec l’option :


1 props . put ( ProducerConfig . PARTITIONER_CLASS_CONFIG ,
C u s t om Par tit ion er . class . getName () ) ;

21 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Consommateurs

Les consommateurs

Le fonctionnement des consommateurs Kafka


permet de paralléliser les traitements, et
Groupe 1
donc d’obtenir de bonnes performances, et Consommateur 1

de pouvoir passer à l’échelle


Consommateur 2
Topic 1

Les consommateurs s’organisent en groupes, Partition 1 Consommateur 3

qui s’abonnent à un ou plusieurs topics Partition 2 Consommateur 4

La degré de parallélisation des consommateurs Partition 3 Groupe 2

dépend du nombre de partitions d’un Partition 4 Consommateur 1

topic : dans chaque groupe, un Consommateur 2

consommateur peut recevoir les messages de


plusieurs partitions, mais une partition ne
sera assignée qu’à un seul consommateur

22 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Consommateurs

Consommateurs console

$ bin / kafka - console - consumer . sh -- topic topic1 \


-- from - beginning -- bootstrap - server localhost :9092

L’option from-beginning permet de consommer tous les


messages présents dans le topic, par défaut le consommateur
console se place à la fin du topic et n’affichera que les messages
envoyés après sa connexion.

23 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Consommateurs

Consommateurs Java

1 Properties props = new Properties () ;


2 props . setProperty ( ConsumerConfig . BOOTSTRAP_SERVERS_CONFIG , " localhost :9092 " ) ;
3 props . setProperty ( ConsumerConfig . GROUP_ID_CONFIG , " test " ) ;
4 props . setProperty ( ConsumerConfig . ENABLE_AUTO_COMMIT_CONFIG , " false " ) ;
5 props . setProperty ( ConsumerConfig . KEY_DESERIALIZER_CLASS_CONFIG ,
" org . apache . kafka . common . serialization . S tr in g De s er ia l iz er " ) ;
6 props . setProperty ( ConsumerConfig . VALUE_DESERIALIZER_CLASS_CONFIG ,
" org . apache . kafka . common . serialization . S tr in g De s er ia l iz er " ) ;
7 KafkaConsumer < String , String > consumer = new KafkaConsumer < >( props ) ;
8 consumer . subscribe ( Arrays . asList ( " topic1 " , " topic2 " ) ) ;
9 final int minBatchSize = 200;
10 List < ConsumerRecord < String , String > > buffer = new ArrayList < >() ;
11 while ( true ) {
12 ConsumerRecords < String , String > records =
consumer . poll ( Duration . ofMillis (100) ) ;
13 for ( ConsumerRecord < String , String > record : records ) {
14 buffer . add ( record ) ;
15 }
16 if ( buffer . size () >= minBatchSize ) {
17 insertIntoDb ( buffer ) ;
18 consumer . commitSync () ;
19 buffer . clear () ;
20 }
21 }

24 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Consommateurs

Techniques de commit des consommateurs


Les consommateurs peuvent se servir de plusieurs types de commit :
Automatique : l’offset du dernier message consommé sera
commité à interval de temps régulier (configurable)
Manuel : l’appel à la fonction commitSync() permet de commité
l’offset du dernier message consommé

Messages dupliqués ou non traités


Si les commits ne sont pas gérés correctement, cela peut entraı̂ner une
duplication ou une absence de traitement des messages.
En effet, si le consommateur commit juste après avoir reçu des messages,
et qu’il subit une défaillance à ce moment, le consommateur qui prendre
le relais considérera les derniers messages consommés comme traités et
continuera de lire les messages suivants.
À l’inverse, si des messages sont consommés après un commit et que le
consommateur rencontre un problème, les messages seront considérés
comme nouveaux et seront traités une nouvelle fois.
25 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Plan du cours

1 Généralités
2 Fonctionnement
Fonctionnement interne
Producteurs
Consommateurs
3 Stream processing
Definition
Types de traitements
Concepts
4 Ecosystème
5 Ressources

26 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Definition

Stream processing

Le stream processing est un paradigme de programmation


consistant à traiter un flux infini de données, ayant une
forte composante temporelle
Il est souvent utilisé pour obtenir des résultats en continu et
en temps réel
Ses domaines d’application sont assez variés : détection de
fraudes, détection d’évènements, monitoring, etc.
C’est un paradigme particulièrement adapté pour traiter les
variations des séries temporelles ou pour trouver des
patterns dans les données
Les exemples donnés sont réalisés avec Kafka Streams

27 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Definition

Particularités

L’utilisation du stream processing diffère des traitements


traditionnels selon plusieurs points :
Les données peuvent arriver avec du retard
Les données peuvent arriver dans le désordre
Comme le flux de données est infini, il n’y a pas de repère de
fin de traitement
C’est souvent un enchaı̂nement d’opérations simples, puisant
ses données d’une source (ex. : un topic Kafka), pour envoyer le
résultat vers une destination (ex. : un autre topic Kafka).

28 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Definition

Architecture

29 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Definition

Temps
Plusieurs notions de temps sont présentes avec le stream
processing :
Le temps de l’évènement : le moment auquel l’évènement a
été émis à la source
Le temps du traitement : le moment auquel l’évènement est
traité
Différence de temps
Dans certains cas, la différence entre le temps de l’évènement et de
traitement peut être importante.
Un délai peut être défini avant de considérer un évènement comme
retardataire.
Plusieurs stratégies peuvent alors être adoptées : ignorer
l’évènement, ou l’intégrer tardivement.

30 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Definition

Programmation d’applications de stream processing

Le développement d’applications en stream processing fait appels à


quelques principes :
L’appel d’une méthode retourne un nouvel objet du même type que
celui sur lequel l’appel est effectué, ce qui permet de chaı̂ner les
appels de méthodes
L’utilisation des lambda expressions (à partir de Java 8) permet
de simplifier l’écriture du code
1 // Sans lambda expression
2 monBouton . ad dAct ion Lis ten er ( new ActionListener () {
3 public void actionPerformed ( ActionEvent event ) {
4 System . out . println ( " clic " ) ;
5 }
6 }) ;
7 // Avec lambda expression
8 monBouton . ad dAct ion Lis ten er ( event -> System . out . println ( " clic " ) ) ;

31 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Definition

Exemple : Word Count

1 Properties props = new Properties () ;


2 props . put ( StreamsConfig . APPLICATION_ID_CONFIG , " streams - wordcount " ) ;
3 props . put ( StreamsConfig . BOOTSTRAP_SERVERS_CONFIG , " localhost :9092 " ) ;
4 props . put ( StreamsConfig . DEFAULT_KEY_SERDE_CLASS_CONFIG ,
Serdes . String () . getClass () ) ;
5 props . put ( StreamsConfig . DEFAULT_VALUE_SERDE_CLASS_CONFIG ,
Serdes . String () . getClass () ) ;
6
7 final Str eamsBuilder builder = new StreamsBuilder () ;
8
9 KStream < String , String > source = builder . stream ( " streams - plaintext - input " ) ;
10 source . flatMapValues ( value ->
Arrays . asList ( value . toLowerCase ( Locale . getDefault () ) . split ( " \\ W + " ) ) )
11 . groupBy (( key , value ) -> value )
12 . count ( Materialized . < String , Long , KeyValueStore < Bytes ,
byte [] > > as ( " counts - store " ) )
13 . toStream ()
14 . to ( " streams - wordcount - output " , Produced . with ( Serdes . String () ,
Serdes . Long () ) ) ;
15
16 final Topology topology = builder . build () ;
17 final KafkaStreams streams = new KafkaStreams ( topology , props ) ;
18 streams . start () ;

32 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Types de traitements

Transformation sans état

Appliquer une transformation à un message (clé et/ou valeur),


filtrer les messages, produire plusieurs messages à partir d’un
seul message, etc.
Peuvent aussi être utilisés pour regrouper des messages
(opérations de group by, similaire à celles des BD)

Répartition des messages


Dans certains systèmes de streaming, la répartition des messages
ne se fait pas automatiquement lors d’une modification de la clé et
peuvent nécessiter des opérations spéciales pour le faire.
Par exemple Kafka Streams marque le flux comme nécessitant une
répartition après une opération susceptible de modifier la clé, mais
ne le fait effectivement qu’avec des opérations group by.

33 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Types de traitements

Exemple de transformation sans état

1 Properties props = new Properties () ;


2 [...]
3
4 final Serde < String > stringSerde = Serdes . String () ;
5 final Serde < byte [] > byteArraySerde = Serdes . ByteArray () ;
6
7 final Str eamsBuilder builder = new StreamsBuilder () ;
8
9 final KStream < byte [] , String > textLines = builder . stream ( " TextLinesTopic " ) ,
Consumed . with ( byteArraySerde , stringSerde ) ) ;
10
11 // Tr ansformation des valeurs uniquement
12 final KStream < byte [] , String > u p p e r c a s e d W i t h M a p V a l u e s = textLines . mapValues ( v
-> v . toUpperCase () ) ;
13 u p p e r c a s e d W i t h M a p V a l u e s . to ( " U p p e r c a s e d T e x t L i n e s T o p i c " ) ;
14
15 // Tr ansformation des cles et des valeurs
16 final KStream < String , String > o r i g i n a l A n d U p p e r c a s e d = textLines . map (( key ,
value ) -> KeyValue . pair ( value , value . toUpperCase () ) ) ;
17 o r i g i n a l A n d U p p e r c a s e d . to ( " O r i g i n a l A n d U p p e r c a s e d T o p i c " ,
Produced . with ( stringSerde , stringSerde ) ) ;
18
19 final KafkaStreams streams = new KafkaStreams ( builder . build () , props ) ;
20 streams . start () ;

34 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Types de traitements

Gestion des états


Pour les opérations nécessitant un état, le système de streaming
doit pouvoir le conserver
Kafka Streams utilise RocksDB, une base de données clé-valeur, et
assure la tolérance aux pannes en utilisant un topic pour stocker
les modifications d’état de la base de données
Plusieurs opérations nécessitent d’avoir un état : les agrégations, le
fenêtrage, les jointures
Pour distinguer les opérations, Kafka Streams utilise :
Les KStream : pour les flux de messages sans état, une
opération appliquée à l’ensemble des messages avec une même
clé représente l’état courant
Les KTable : pour les flux avec état, le dernier message pour
une clé donnée représente l’état courant

On parle de dualité stream/table.


35 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Types de traitements

Agrégation

Nécessite de travailler sur un flux de données groupé (avec


un opérateur de type group by)
Permet d’appliquer une opération commune sur l’ensemble
des éléments groupés selon le même critère (ex. : compter
ou faire une somme des éléments ayant une même clé)
Les agrégations peuvent être fenêtrées ou non

36 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Types de traitements

Exemple d’agrégation

1 Properties props = new Properties () ;


2 [...]
3
4 final Str eamsBuilder builder = new StreamsBuilder () ;
5
6 final KStream < Integer , Integer > input = builder . stream ( " NumbersTopic " ) ) ;
7
8 // On attribue la meme cle a tous les messages pour faire la somme globale
9 final KTable < Integer , Integer > sumOfNumbers = input . selectKey (( k , v ) -> 1)
10 . groupByKey ()
11 . reduce (( v1 , v2 ) -> v1 + v2 ) ;
12
13 sumOfNumbers . toStream () . to ( " SumTopic " ) ;
14
15 final KafkaStreams streams = new KafkaStreams ( builder . build () , props ) ;
16 streams . start () ;

37 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Types de traitements

Tumbling time window


Les fenêtres sont de taille fixe, et une nouvelle fenêtre est créée lorsque la fenêtre
précédente se termine. Un message n’appartient qu’à une seule fenêtre.

38 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Types de traitements

Exemple de tumbling time window

1 Properties props = new Properties () ;


2 [...]
3
4 final Str eamsBuilder builder = new StreamsBuilder () ;
5
6 final KStream < Integer , Integer > input = builder . stream ( " SellsTopic " ) ) ;
7
8 // Tumbling window de 10 minutes
9 final KTable < Integer , Integer > wi nd o wS u mO fN u mb e rs = input . groupByKey ()
10 . windowedBy ( TimeWindows . of ( Duration . ofMinutes (10) ) )
11 . count () ;
12
13 w i n d o w S u m Of Nu m be r s . toStream () . to ( " WindowCountTopic " ) ;
14
15 final KafkaStreams streams = new KafkaStreams ( builder . build () , props ) ;
16 streams . start () ;

Possibilité de définir une période de grâce :


1 TimeWindows . of ( Duration . ofMinutes (10) ) . grace ( Duration . ofMinutes (1) )

39 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Types de traitements

Timestamp extractor

1 public class C u s t o m T i m e s t a m p E x t r a c t o r implements T im e st a mp Ex t ra c to r {


2 @Override
3 public long extract ( ConsumerRecord < Object , Object > record , long
pr evi ousT ime sta mp ) {
4 final SimpleDateFormat sdf = new
SimpleDateFormat ( " yyyy - MM - dd ’T ’ HH : mm : ssZ " ) ;
5
6 String eventTime = (( Rating ) record . value () ) . getTimestamp () ;
7
8 try {
9 return sdf . parse ( eventTime ) . getTime () ;
10 } catch ( ParseException e ) {
11 return 0;
12 }
13 }
14 }

Pour l’utiliser, lors de la définition des paramètres de l’application de


streaming :
1 props . put ( StreamsConfig . D E F A U L T _ T I M E S T A M P _ E X T R A C T O R _ C L A S S _ C O N F I G ,
C u s t o m T i m e s t a m p E x t r a c t o r . class . getName () )

40 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Types de traitements

Hopping time window


Les fenêtres sont de taille fixe, et une nouvelle fenêtre est créée à intervalle de temps
réguliers, sans forcément que la fenêtre précédente soit terminée. Un message peut
appartenir à plusieurs fenêtres.

41 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Types de traitements

Exemple de hoping time window

1 Properties props = new Properties () ;


2 [...]
3
4 final Str eamsBuilder builder = new StreamsBuilder () ;
5
6 final KStream < Integer , Integer > input = builder . stream ( " SellsTopic " ) ) ;
7
8 // Hoping window de 10 minutes , avec " hop " d ’1 minute
9 final KTable < Integer , Integer > wi nd o wS u mO fN u mb e rs = input . groupByKey ()
10 . windowedBy ( TimeWindows . of ( Duration . ofMinutes (10) )
11 . advanceBy ( Duration . ofMinutes (1) ) )
12 . count () ;
13
14 w i n d o w S u m Of Nu m be r s . toStream () . to ( " WindowCountTopic " ) ;
15
16 final KafkaStreams streams = new KafkaStreams ( builder . build () , props ) ;
17 streams . start () ;

42 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Types de traitements

Session window
Les fenêtres sont de taille variable, et les messages sont regroupés si la différence
entre leur timestamp est inférieure à un seuil défini.

43 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Types de traitements

Session window - Messages en retard


Une fenêtre peut être agrandie si un message arrive en retard et qu’il réduit
suffisamment la différence de timestamp entre deux messages pour respecter le seuil
défini.

44 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Types de traitements

Exemple de session window

1 Properties props = new Properties () ;


2 [...]
3
4 final Str eamsBuilder builder = new StreamsBuilder () ;
5
6 final KStream < Integer , Integer > input = builder . stream ( " SellsTopic " ) ) ;
7
8 // Session window avec un gap de 5 minutes
9 final KTable < Integer , Integer > wi nd o wS u mO fN u mb e rs = input . groupByKey ()
10 . windowedBy ( SessionWindows . with ( Duration . ofMinutes (5) ) ;)
11 . count () ;
12
13 w i n d o w S u m Of Nu m be r s . toStream () . to ( " WindowCountTopic " ) ;
14
15 final KafkaStreams streams = new KafkaStreams ( builder . build () , props ) ;
16 streams . start () ;

45 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Types de traitements

Jointure

Les jointures peuvent se faire sur des flux fenêtrés


De la même manière que pour les BD, ils servent à joindre
deux messages sur une clé commune, puis de traiter le résultat
de la jointure comme un seul message

46 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Types de traitements

Exemple de jointure

1 Properties props = new Properties () ;


2 [...]
3
4 final Str eamsBuilder builder = new StreamsBuilder () ;
5
6 final KStream < String , String > userRegions = builder . stream ( " userTopic " ) ;
7 final KStream < String , Long > regionMetrics = builder . stream ( " regionTopic " ) ;
8
9 regionMetrics . join ( userRegions ,
10 ( regionValue , metricValue ) -> regionValue + " / " + metricValue ,
11 JoinWindows . of ( Duration . ofMinutes (5) ) ,
12 Joined . with (
13 Serdes . String () , /* key */
14 Serdes . Long () , /* left value */
15 Serdes . String () ) /* right value */
16 )
17 ) . to ( " outputTopic " ) ;
18
19 final KafkaStreams streams = new KafkaStreams ( builder . build () , props ) ;
20 streams . start () ;

47 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Concepts

Plan du cours

1 Généralités
2 Fonctionnement
Fonctionnement interne
Producteurs
Consommateurs
3 Stream processing
Definition
Types de traitements
Concepts
4 Ecosystème
5 Ressources

48 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Concepts

Garantie de traitement

At-most-once : les messages peuvent ne pas être traités


At-least-once : les messages peuvent être traités plusieurs
fois
Effectively-once (souvent appelé exactly-once) : les
messages ne sont traités qu’une seule fois, ni plus, ni moins.
Effectively-once
Cette garantie est toutefois à nuancer : elle n’est effective que d’un
point de vue du stream processing, ce qui veut dire que si un
traitement possède des effets de bord (ex. : écriture dans un
fichier), ils ne sont pas couverts par la garantie et peuvent être
effectués plusieurs fois.

49 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Concepts

Débit vs latence (Throughput vs latency )

Latence : le temps nécessaire pour traiter un élément


individuellement. Une bonne latence possède une valeur faible
Débit : le nombre d’éléments traitables en un temps donné.
Un bon débit possède une valeur élevée

Complémentarité
Ces deux notions sont complémentaires. Les traitements batch ont
un débit et une latence élevés. Les systèmes de streaming doivent
minimiser la latence et maximiser le débit pour obtenir les
meilleures performances.

50 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Concepts

Limites du stream processing

Certains algorithmes s’adaptent mal au stream processing :

Optimisation globale (par exemple algorithme du sac à dos)


Calculs sur les graphes

Conservation du partitionnement
Si un traitement regroupe tous les messages sous une même clé,
on peut avoir une perte de performances.

51 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Concepts

Exemples de cas d’utilisation du stream processing

Monitoring

Détection de fraudes

Suivi des activités utilisateurs

Détections d’anomalies

Système d’alerte (valeurs trop basses ou trop élevées)

52 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Concepts

Lambda Architecture
Avant que les systèmes de streaming ne proposent la garantie effectively-once,
ils étaient réputés peu fiables et produisant des résultats approximatifs. La
Lambda Architecture a beaucoup été utilisée dans ce contexte : sa couche
Speed réalise les traitements en continue en stream processing, et sa couche
Batch réalise les traitements périodiquement (ex. : 1 fois par jour) pour
remplacer les résultats de la couche Speed par des résultats exacts.
Batch layer Serving layer

Batch view
Traitements
Master Batch
dataset Batch view

Nouvelles
données Speed layer

Speed view

Traitements
incrémentaux
Speed view

Cependant, c’est une architecture complexe, rendue obsolète par l’apparition de


la garantie effectively-once.
53 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Concepts

La Kappa Architecture

Batch layer Serving layer

Batch view
Batch
Master processing
dataset Batch view

New data
Speed layer

Speed view

Incremental
processing
Speed view

Simplification de la Lambda Architecture, en s’appuyant sur


l’évolution des systèmes de stream processing
Perte de la propriété de résistance aux pannes
Cas d’utilisation limités
54 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Concepts

Exemple d’architecture utilisant Kafka et le stream


processing
Polystore

Enrichissement des
Collecte des données données Analytics
PostgreSQL
Machine de insertion
Analyses de données
collecte 1
STREAM

Anrango
SEARCH insertion

Consommateur
...

Kafka
Hadoop
insertion Timescale
STREAM insertion

Kafka Streams
Stream API SEARCH
tweets de la
...

machine 1
...

Search API
Machine de
collecte n tweets de la
STREAM machine n

SEARCH Traitement des


hashtags Visualisation
...

Traitement des
mentions
STREAM Extraction des
opérateurs
Batch layer Serving
Traitement des layer / polystore
URL
SEARCH
Traitement des
retweets

55 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Kafka Connect

Kafka Connect est utilisé pour relier Kafka à d’autres


systèmes (en tant que source ou destination)
Par exemple, il peut servir à insérer les messages dans des
bases de données, ou à capturer leurs modifications pour les
rendre disponibles en streaming
Il propose un framework pour développer de nouveaux
connecteurs vers d’autres systèmes

56 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

ksql

ksql abstrait le développement d’applications en utilisant Kafka


Streams, et propose une interface semblable aux BD.

Création d’un flux :


1 CREATE STREAM riderLocations ( profileId VARCHAR , latitude DOUBLE ,
longitude DOUBLE )
2 WITH ( kafka_topic = ’ locations ’ , value_format = ’ json ’) ;

Définition d’une requête continue (affiche les nouveaux


résultats lorsqu’ils sont disponibles) :
1 SELECT * FROM riderLocations
2 WHERE GEO_DISTANCE ( latitude , longitude , 37.4133 , -122.1162) <= 5
3 EMIT CHANGES ;

57 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Autres systèmes de streaming

Storm est un des premiers système de streaming à prendre de


l’ampleur et à être utilisé plus massivement
Spark propose un système de streaming en micro-batch, qui
lui permet de garder une interface commune avec son système
en batch, très populaire
Flink est un système de stream processing performant, qui
propose également des traitements en stream et en batch

58 / 59
Généralités Fonctionnement Stream processing Ecosystème Ressources

Ressources

https://kafka.apache.org
Exemples de code Kafka et Kafka Streams :
https://github.com/gwenshap/kafka-examples et
https://kafka-tutorials.confluent.io/
Streaming Systems: the what, where, when and how of
larger-data processing, Tyler Akidau, Slava Chernyak et
Reuven Lax, O’Reilly
Kafka The Definitive Guide: real-time data and stream
processing at scale, Neha Narkhede, Gwen Shapira et Todd
Palino, O’Reilly
Certaines illustrations du cours sont issues du livre ”Kafka:
The Definitive Guide” et du site Kafka

59 / 59

Vous aimerez peut-être aussi