Introduction à MapReduce et Big Data
Introduction à MapReduce et Big Data
• Pour exécuter un problème large de manière distribuée, il faut pouvoir découper le problème en plusieurs
problèmes de taille réduite à exécuter sur chaque machine du cluster (stratégie algorithmique dite du
divide and conquer
/ diviser pour régner).
• De multiples approches existent et ont existé pour cette division d'un problème en plusieurs « sous-tâches ».
• MapReduce est un paradigme (un modèle) visant à généraliser les approches existantes pour produire une
approche unique applicable à tous les problèmes.
u Possibilité́ :
u Pour chaque entrée, saisir la ville et le prix de vente
u Si on trouve une entrée avec une ville déjà saisie, on les regroupe en
faisant la somme des ventes
u Dans un environnement traditionnel, on fera des Hashtable sous la forme
<clé-valeur>
u Dans ce cas, la clé sera l’adresse du magasin et la valeur le total de ventes
Reducers
Map-Reduce
Mappers
u Mappers
u Pour chaque entrée, saisir la ville et le total de ventes dans
une fiche
u Rassembler les fiches d’un même magasin dans une pile
N.Y.C. Miami N.Y.C. L. A. Miami N.Y.C
u Reducers L. A.
u Chaque reducer sera responsable d’un ensemble de magasins
u Collectent les fiches associées des différents mappers
u Pour chaque ville, ils parcourent les piles en ordre N.Y.C
alphabétique (Los Angeles avant Miami) et font la somme Miami L. A.
des enregistrements N.Y.C
Miami L. A.
N.Y.C
Reducers $603.768
$300,578 $432.900
Map-Reduce
Mappers
u Un Reducer reçoit des données comme suit :
u L.A 12.34
u L.A 99.07
u NYC 3.14 N.Y.C. Miami N.Y.C. L. A. Miami N.Y.C
u NYC 99.77
L. A.
u NYC 88.99
u Pour chaque entrée, de quoi avons-nous besoin pour calculer
la totalité́ des ventes pour chaque magasin ?
N.Y.C
Coût précédent
Miami L. A.
N.Y.C
Coût en cours
Miami L. A.
Ventes totales par magasin N.Y.C
Reducers $603.768
Magasin précédent $300,578 $432.900
Magasin en cours
MapReduce : Présentation (2/4)
• La première, MAP, va transformer les données d'entrée en une série de couples clef/valeur. Elle va
regrouper les données en les associant à des clefs, choisies de telle sorte que les couples clef/valeur aient
un sens par rapport au problème à résoudre. Par ailleurs, cette opération doit être parallélisable: on doit
pouvoir découper les données d'entrée en plusieurs fragments, et faire exécuter l'opération MAP à
chaque machine du cluster sur un fragment distinct.
• La seconde, REDUCE, va appliquer un traitement à toutes les valeurs de chacune des clefs distinctes
produite par l'opération MAP. Au terme de l'opération REDUCE, on aura un résultat pour chacune des
clefs distinctes. Ici, on attribuera à chacune des machines du cluster une des clefs uniques produites par
MAP, en lui donnant la liste des valeurs associées à la clef. Chacune des machines effectuera alors
l'opération REDUCE pour cette clef.
MapReduce : Présentation (3/4)
On distingue donc 4 étapes distinctes dans un traitement MapReduce:
• Mapper chacun de ces fragments pour obtenir des couples (clef ; valeur).
• Réduire (reduce) les groupes indexés par clef en une forme finale, avec une valeur pour chacune des
clefs distinctes.
• Choisir une manière de découper les données d'entrée de telle sorte que l'opération MAP soit
parallélisable.
• Première étape: déterminer une manière de découper (split) les données d'entrée pour
que chacune des machines puisse travailler sur une partie du texte.
• Notre problème est ici très simple – on peut par exemple décider de découper les
données d'entrée ligne par ligne. Chacune des lignes du texte sera un fragment de nos
données d'entrée.
MapReduce : Exemple concret (2/8)
• Nos données d'entrée (le texte):
• Puisqu'on s'intéresse aux occurrences des mots dans le texte, et qu'à terme on aura
après l'opération REDUCE un résultat pour chacune des clefs distinctes, la clef qui
s'impose logiquement dans notre cas est: le mot-lui même.
• Quand à notre opération MAP, elle sera elle aussi très simple: on va simplement
parcourir le fragment qui nous est fourni et, pour chacun des mots, générer le couple
clef/valeur: (MOT ; 1). La valeur indique ici l’occurrence pour cette clef - puisqu'on a
croisé le mot une fois, on donne la valeur « 1 ».
MapReduce : Exemple concret (5/8)
• Le code de notre opération MAP sera donc (ici en pseudo code):
POUR MOT dans LIGNE,
FAIRE : GENERER
COUPLE (MOT; 1)
• Pour chacun de nos fragments, les couples (clef; valeur) générés seront
donc:
TOTAL=0
POUR COUPLE dans GROUPE, FAIRE:
TOTAL=TOTAL+1
RENVOYER TOTAL
MapReduce : Exemple concret (8/8)
• Une fois l'opération REDUCE effectuée, on obtiendra donc une valeur
unique pour chaque clef distincte. En l’occurrence, notre résultat sera:
qui : 4
celui : 2
croyait : 2 •On constate que le mot le plus utilisé dans notre texte est
fou : 2 « qui », avec 4 occurrences, suivi de « celui », « croyait » et
au : 1 « fou », avec 2 occurrences chacun.
ciel : 1
ny : 1
pas : 1
fait :1
[…]
MapReduce : Exemple concret - Conclusion
• Notre exemple est évidemment trivial, et son exécution aurait été instantanée même
sur une machine unique, mais il est d'ores et déjà utile: on pourrait tout à fait utiliser
les mêmes implémentations de MAP et REDUCE sur l'intégralité des textes d'une
bibliothèque Française, et obtenir ainsi un bon échantillon des mots les plus utilisés
dans la langue Française.
• L’intérêt du modèle MapReduce est qu'il nous suffit de développer les deux
opérations réellement importantes du traitement: MAP et REDUCE, et de bénéficier
automatiquement de la possibilité d'effectuer le traitement sur un nombre variable
de machines de manière distribuée.
MapReduce : Schéma général
MapReduce : Exemple – Statistiques web
• Un autre exemple: on souhaite compter le nombre de visiteurs sur chacune des pages
d'un site Internet. On dispose des fichiers de logs sous la forme suivante:
/[Link] [19/05/[Link]+0200]
/[Link] [19/05/[Link] +0200]
/[Link]?id=5 [24/05/[Link] +0200]
/[Link]?id=4 [24/05/[Link] +0200]
/[Link]?id=18 [24/05/[Link] +0200]
...etc...
• Ici, notre clef sera par exemple l'URL d’accès à la page, et nos opérations MAP et
REDUCE seront exactement les mêmes que celles qui viennent d'être présentées: on
obtiendra ainsi le nombre de vue pour chaque page distincte du site.
MapReduce : Exemple – Graphe social (1/8)
• On ne peut pas se permettre d'effectuer une série de requêtes SQL à chaque fois que la
page est accédée (trop lourd en traitement). On va donc développer des programmes
MAP et REDUCE pour cette opération et exécuter le traitement toutes les nuits sur
notre base de données, en stockant le résultat dans une nouvelle table.
MapReduce : Exemple – Graphe social (2/8)
• Ici, nos données d'entrée sous la forme Utilisateur =>Amis:
A => B, C, D
B => A, C, D, E
C=> A, B, D, E
D => A, B, C, E
E=>B, C, D
• On fait également en sorte que la clef soit toujours triée par ordre alphabétique (clef «
B-A » sera exprimée sous la forme « A-B »).
On obtiendra ainsi :
("A-B"; "A CDE") ("B-C"; "A CDE") ("B-D"; "A CDE") ("B-E"; " A CDE")
On aura :
("A-C"; "A B D E") ("B-C"; "A BD E") ("C-D"; "A B DE") ("C-E"; " A B DE")
Comme pour HDFS, la gestion des tâches de Hadoop se base sur deux
serveurs (des daemons):
• Le JobTracker, qui va directement recevoir la tâche à exécuter (un .jar Java), ainsi
que les données d'entrées (nom des fichiers stockés sur HDFS) et le répertoire où
stocker les données de sortie (toujours sur HDFS). Il y a un seul JobTracker sur une
seule machine du cluster Hadoop. Le JobTracker est en communication avec le
NameNode de HDFS et sait donc où sont les données.
• Pour effectuer un traitement Hadoop, on va donc stocker nos données d'entrée sur
HDFS, créer un répertoire où Hadoop stockera les résultats sur HDFS, et compiler
nos programmes MAP et REDUCE au sein d'un
.jar Java.
• On soumettra alors le nom des fichiers d'entrée, le nom du répertoire des résultats,
et le .jar lui-même au JobTracker: il s'occupera du reste (et notamment de
transmettre les programmes MAP et REDUCE aux serveurs TaskTracker des
machines du cluster).
Architecture Hadoop: Présentation (3/3)
Architecture Hadoop: Le JobTracker (1/3)
Le déroulement de l'exécution d'une tâche Hadoop suit les étapes suivantes du point
de vue du JobTracker :
1. Le client (un outil Hadoop console) va soumettre le travail à effectuer au JobTracker : une archive java .jar
implémentant les opérations Map et Reduce. Il va également soumettre le nom des fichiers d'entrée et
l'endroit où stocker les résultats.
2. Le JobTracker communique avec le NameNode HDFS pour savoir où se trouvent les blocs correspondant
aux noms de fichiers donnés par le client.
3. Le JobTracker, à partir de ces informations, détermine quels sont les noeuds TaskTracker les plus appropriés,
c'est à dire ceux qui contiennent les données sur lesquelles travailler sur la même machine, ou le plus proche
possible (même rack/rack proche).
4. Pour chaque « morceau » des données d'entrée, le JobTracker envoie au TaskTracker sélectionné le travail à
effectuer (MAP/REDUCE, code Java) et les blocs de données correspondants.
Architecture Hadoop: Le JobTracker (2/3)
5. Pour chaque « morceau » des données d'entrée, le JobTracker envoie au TaskTracker sélectionné le travail à
effectuer (MAP/REDUCE, code Java) et les blocs de données correspondants.
6. Le JobTracker communique avec les noeuds TaskTracker en train d'exécuter les tâches. Ils envoient
régulièrement un « heartbeat », un message signalant qu'ils travaillent toujours sur la sous-tâche reçue. Si
aucun heartbeat n'est reçu dans une période donnée, le JobTracker considère la tâche comme ayant échouée et
donne le même travail à effectuer à un autre TaskTracker.
7. Si par hasard une tâche échoue (erreur java, données incorrectes, etc.), le TaskTracker va signaler au
JobTracker que la tâche n'a pas pu être exécutée.
Le JobTracker va alors décider de la conduite à adopter :
- Demander au même TaskTracker de ré-essayer.
- Redonner la sous-tâche à un autre TaskTracker.
- Marquer les données concernées comme invalides, etc.
Il pourra même blacklister le TaskTracker concerné comme non-fiable dans certains cas.
49
Architecture Hadoop: Le JobTracker (3/3)
7. Une fois que toutes les opérations envoyées aux TaskTracker (MAP + REDUCE) ont été
effectuées et confirmées comme effectuées par tous les noeuds, le JobTracker marque la
tâche comme « effectuée ». Des informations détaillées sont disponibles (statistiques,
TaskTracker ayant posé problème, etc.).
Remarques
• Par ailleurs, on peut également obtenir à tout moment de la part du JobTracker des informations sur
les tâches en train d'être effectuées: étape actuelle (MAP, SHUFFLE, REDUCE), pourcentage de
complétion, etc.
• La soumission du .jar, l'obtention de ces informations, et d'une manière générale toutes les opérations
liées à Hadoop s'effectuent avec le même unique client console vu précédemment: hadoop (avec
d'autres options que l'option fs vu précédemment).
Architecture Hadoop: Le TaskTracker
• Le TaskTracker dispose d'un nombre de « slots » d'exécution. A chaque « slot » correspond une tâche
exécutable (configurable).
• Lorsqu'il reçoit une nouvelle tâche à effectuer (MAP, REDUCE, SHUFFLE) depuis le JobTracker, le
TaskTracker va démarrer une nouvelle instance de Java avec le fichier .jar fourni par le JobTracker, en
appelant l'opération correspondante.
• Une fois la tâche démarrée, il enverra régulièrement au JobTracker ses messages heartbeats. En dehors
d'informer le JobTracker qu'il est toujours fonctionnels, ces messages indiquent également le nombre de
slots disponibles sur le TaskTracker concerné.
• Lorsqu'une sous-tâche est terminée, le TaskTracker envoie un message au JobTracker pour l'en
informer, que la tâche se soit bien déroulée ou non (il indique évidemment le résultat au JobTracker).
Architecture Hadoop: Remarques (1/2)
• Même si le JobTracker est situé sur une seule machine, le « client » qui
envoie la tâche au JobTracker initialement peut être exécuté sur n'importe
quelle machine du cluster – comme les TaskTracker sont présents sur la
machine, ils indiquent au client comment joindre le JobTracker.
ResourceManager et ApplicationMaster.
RessourceManager • ResourceManager remplace le JobTracker et ne
gère que les ressources du Cluster.
• Une entité ApplicationMaster est allouée par
ApplicationMaster AM AM Application pour gérer les tâches.
• ApplicationMaster est déployée sur les noeuds
esclaves.
YARN (MapReduce 2) : Architecture (2/7)
• Cette nouvelle version contient aussi un autre composant :
Le NodeManager (NM)
– Permet d’exécuter plus de tâches qui ont du sens pour l’Application
Master, pas seulement du Map et du Reduce.
– La taille des ressources est variable (RAM, CPU, network….). Il y aura
plus de valeurs codées en dur qui nécessitent un redémarrage.
RessourceManager
AM AM ApplicationMaster NodeManager NM NM
YARN (MapReduce 2) : Architecture (3/7)
• Le JobTracker a disparu de l’architecture, ou plus précisément, ses
rôles ont été répartis différemment.
a. Scheduler
• Le Scheduler est responsable de l’allocation des ressources des applications tournant sur le cluster.
• Il s’agit uniquement d’ordonnancement et d’allocation de ressources.
• Les ressources allouées aux applications par le Scheduler pour leur permettre de s’exécuter sont appelées des
Containers.
YARN (MapReduce 2) : Architecture (6/7)
Container
• Un Container désigne un regroupement de mémoire, de cpu, d’espace disque, de
bande passante réseau, …
b. ApplicationsManager
• L’ApplicationsManager accepte les soumissions d’applications.
• Une application n’étant pas gérée par le ResourceManager, la partie
ApplicationsManager ne s’occupe que de négocier le premier Container que le
Scheduler allouera sur un noeud du cluster. La particularité de ce premier
Container est qu’il contient l’ApplicationMaster (diapo suivant).
2. NodeManager
• Les NodeManager sont des agents tournant sur chaque nœud et tenant le
Scheduler au fait de l’évolution des ressources disponibles. Ce dernier peut ainsi
prendre ses décisions d’allocation des Containers en prenant en compte des
demandes de ressources cpu, disque, réseau, mémoire, …
61
YARN (MapReduce 2) : Architecture (7/7)
3. ApplicationMaster
• L’ApplicationMaster est le composant spécifique à chaque application, il est en charge des jobs qui y
sont associés.
– Lancer et au besoin relancer des jobs
– Négocier les Containers nécessaires auprès du Scheduler
– Superviser l’état et la progression des jobs.
• Un ApplicationMaster gère donc un ou plusieurs jobs tournant sur un framework donné. Dans le cas de
base, c’est donc un ApplicationMaster qui lance un job MapReduce. De ce point de vue, il remplit un
rôle de TaskTracker.
2. Le ResourceManager alloue un container, « Application Master », sur le cluster et y lance la classe « driver »
du programme.
4. Pour chacun des fragments des données d’entrée sur lesquelles travailler, l’Application Master va demander
au Resource Manager d’allouer aun container en lui indiquant les données sur lesquelles celui-ci va travailler
et le code à exécuter.
YARN (MapR2): Déroulement de l’exécution
5. L’Application Master va alors lancer le code en question (une classe Java, générallement map ou reduce)
sur le container alloué. Il communiquera avec la tâche directement (via un protocole potentiellement
propre au programme lancé), sans passer par le ResourceManager.
6. Ces tâches vont régulièrement contacter l’Application Master du programme pour transmettre des
informations de progression, de statut, etc. parallèlement, chacun des NodeManager communique en
permanence avec le ResourceManager pour lui indiquer son statut en terme de ressources (containers
lancés, RAM, CPU), mais sans information spécifique aux tâches exécutées.
7. Pendant l’exécution du programme, le client peut à tout moment contacter le programme en cours
d’exécution; pour ce faire, il communique directement avec l’Application Master du programme en
contactant le container correspondant sans passer par le ResourceManager du cluster.
8. A l’issue de l’exécution du programme, l’Application Master s’arrète; son container est libéré et est à
nouveau disponible pour de futures tâches.
YARN (MapR2) : Lancement d’une Application
dans un Cluster Yarn
YARN (MapR2) : Lancement d’une Application
dans un Cluster Yarn
YARN (MapR2) : Lancement d’une Application
dans un Cluster Yarn
YARN (MapR2) : Lancement d’une Application
dans un Cluster Yarn
YARN (MapR2) : Lancement d’une Application
dans un Cluster Yarn
YARN (MapR2) : Exécution d’un Job MR
YARN (MapR2) : Exécution d’un Job MR
YARN (MapR2) : Exécution d’un Job MR
YARN (MapR2) : Exécution d’un Job MR
YARN (MapR2) : Exécution d’un Job MR
Hadoop 2: HDFS - Remarques
• De la même façon que l’évolution de MapReduce 2, nous trouvons quelques améliorations de HDFS
dans la 2ème version de Hadoop. Ces améliorations ont été déjà expliqué dans la partie consacrée à HDFS.
2. La fédération HDFS
• Dans un cluster HDFS, un namenode correspond à un espace de nommage (namespace). Dans L’ancienne architecture
d’Hadoop, on ne pouvait utiliser qu’un namenode par cluster. La fédération HDFS permet de supporter plusieurs namenodes
et donc plusieurs namespace sur un même cluster. (Exemple : un NameNode pourrait gérer tous les fichiers sous /user, et un
second NameNode peut gérer les fichiers sous /share.)
Hadoop 2: Architecture générale
Sources
• Cours
Big Data Analytics – Lesson 1: What is Big Data , IBM, Big Data University,
Intro to Hadoop and MapReduce , Coursera, Udacity
Hadoop / Big Data – MBDS, Benjamin Renaut (avec quelques modifications).
• Articles
“Data scientist : The sexiest Job of the 21th Century” T.H. Davenport, DJ. Patil, Harvard Business
Review.
Bernard Marr, “Big Data: The 5 Vs Everyone Must Know”, LinkedIn
« Hadoop et le "Big Data », LaTechnologie Hadoop au coeur des projets "Big Data" - Stéphane
Goumard – Tech day Hadoop, Spark - Arrow- Institute.
Comparison Between Hadoop 2.x vs Hadoop 3.x – Data Flair.
• Livres
Hadoop: The Definitive Guide, Tom White, O'Reilly Media.
52