Atelier 8 : MapReduce cas de calcul de total avec Python
dans Cloudera
1. Introduction
Dans cet atelier nous allons voir comment écrire un programme MapReduce de calcul de
somme avec le langage de programmation Python. Nous allons donc utiliser l'API Hadoop
Streaming pour écrire des programmes MapReduce en Python pour lire à partir de l'entrée
standard et écrire sur la sortie standard.
Le fichier d'entrée est téléchargé à partir du lien suivant :
[Link]
Il s’agit de données sur des achats effectués classés par magasins par villes. Voici un extrait de
ce fichier texte :
2012-01-01 09:00 San Jose Men's Clothing 214.05 Amex
2012-01-01 09:00 Fort Worth Women's Clothing 153.57 Visa
2012-01-01 09:00 San Jose Music 66.08 Cash
2012-01-01 09:00 Pittsburgh Pet Supplies 493.51 Discover
2012-01-01 09:00 Omaha Children's Clothing 235.63 MasterCard
2012-01-01 09:00 Stockton Men's Clothing 247.18 MasterCard
2012-01-01 09:00 Austin Cameras 379.6 Visa
2012-01-01 09:00 New York Consumer Electronics 296.8 Cash
2012-01-01 09:00 Corpus Christi Toys 25.38 Discover
2012-01-01 09:00 Fort Worth Toys 213.88 Visa
Le fichier d'entrée est écrit dans l’entrée standard de la fonction Map. Le mapper génère des
paires clé/valeur sous forme de lignes délimitées par des tabulations à la norme production. La
clé qui sera choisie est relative à la ville. Alors que la valeur est le montant d’achat inscrit dans
chaque ville.
Le Reducer lit les paires clé/valeur délimitées par des tabulations à partir de l'entrée standard
pour compter le nombre de villes et trouver le total des achats effectués. Puis, écrit également
des paires délimitées par des tabulations paires clé/valeur à la sortie standard pour produire le
résultat final.
1
Le rôle de la fonction Map est de découper la chaîne de caractères donnée en entrée pour extraire
la ville et le montant d’achats correspondant. Ainsi, chaque couple (ville,montant) sert de clés
pour l’étape de tri. Chaque fonction Reduce recevra alors en entrée une paire dont la clé sera la
ville et la valeur sera un tableau composé de valeurs des montants réalisés. La taille de ce
tableau sera le nombre d’occurrences des villes dans les documents en entrée. La fonction
Reduce se contente alors de renvoyer la ville puis de faire le cumul de la somme des achats
réalisés pour une ville donnée.
Nous aurons besoin d’une VM pour réaliser cet atelier. Nous avons choisi celle de Cloudera
QuickStart VM CDH 5.13.x que nous pourrions télécharger à partir du lien suivant :
[Link]
2. Etapes de réalisation
1. Tout d’abord, il faut démarrer la machine virtuelle Cloudera pour ouvrir la console.
2. Téléchargez le fichier texte à travers la commande wget. Puis procéder à sa décompression
avec gunzip.
2
wget [Link]
course/raw/master/Datasets/[Link]
gunzip [Link]
3. Soit un fichier en entrée comportant 6 champs, séparés par des tabulations. Voilà la structure
de ce fichier texte en visualisant les deux premières lignes :
Le Mapper doit :
- Séparer les différents champs par tabulation.
- Extraire les éléments voulus à partir de ces champs, sous forme de clé/valeur.
Pour cet exemple, notre but est de déterminer le total des achats par ville dont les champs sont
de la forme suivante :
date - temps – ville du magasin - produit - montant – type de paiement
Pour calculer les achats par ville, le couple (clé, valeur) à extraire est (ville,montant).
Pour atteindre ce but, d’abord créez un répertoire intitulé « expy », puis y créer un fichier appelé
« [Link] » avec gedit et ayant le code ci-dessous.
3
Remarque : Python est un langage qui délimite les différents blocs en utilisant les tabulations,
faites alors bien attention aux indentations.
#!/usr/bin/python
import string
import fileinput
for line in [Link]():
data = [Link]().split("\t")
if len(data) == 6:
date, time, location, item, cost, payment = data
print "{0}\t{1}".format(location, cost)
Explication du code :
- #!/usr/bin/env python ou #!/usr/bin/python joue un rôle si le script est exécutable et appelé
sans le langage précédent. Le script appelle ensuite l'interpréteur du langage pour exécuter
le code à l'intérieur du script, et le shebang (#!) est le "guide" pour le trouver, dans cet
exemple, python.
- import string : En règle générale, vous n'avez pas besoin d'importer le module de chaîne
car la classe est déjà intégrée. Cependant, il existe plusieurs constantes dans le module
string qui ne sont pas intégrées, qui peuvent être utiles.
- import fileinput : Ce module implémente une classe d'assistance et des fonctions pour écrire
rapidement une boucle sur une entrée standard ou une liste de fichiers.
- L’input est un ensemble de transactions de produits dans plusieurs emplacements ; ceux-ci
peuvent également être transmis à STDIN.
- Le format de chaque ligne est : date\ttemps\tville du magasin\tproduit\tmontant\ttype de
paiement
- Avec [Link](), nous pouvons obtenir le fichier en entrée qui peut être utilisé pour
renvoyer les données du fichier.
4
- Python supprime tous les espaces blancs de début/de fin ([Link]()). Puis divise cette ligne
en une liste de mots individuels lorsqu’il rencontre une tabulation (split("\t")).
- Nous testons qu’il y a 6 champs avec l’instruction if len(data) == 6
- Les 6 champs : date, time, location, item, cost, payment sont extraits à partir de data.
- Puis on imprime le couple clé-valeur sur une seule ligne, séparé par une tabulation (\t)
constitué de la ville (location) et du montant (cost).
Attention : Assurez-vous que le fichier a l'autorisation d'exécution avec chmod :
chmod +x expy/[Link]
Testez ce mapper en local sur les 50 premières lignes du fichier [Link] en tapant
l’instruction suivante, directement à partir de votre répertoire code :
head -50 [Link] | expy/[Link]
5
4. Le Reducer permet de faire le traitement désiré sur des entrées sous forme de clé/valeur,
préalablement triées par Hadoop (on n’a pas à s’occuper du tri manuellement). Dans l’exemple
précédent, une fois que le Mapper extrait les couples (store,cost) relatifs à (ville,montant), le
Reducer aura comme tâche de faire la somme de tous les coûts pour un même magasin.
Pour cela, avant tout, dans le répertoire intitulé « expy », créez un fichier appelé « [Link] »
avec gedit et ayant le code ci-dessous.
En entrée du script ou programme Reducer, on aura une série de lignes : des couples (clé;valeur)
au format : CLE[TABULATION]VALEUR
Les couples seront triés par clé distincte, et la clé répétée à chaque fois. Par ailleurs, on est
susceptible d'avoir des clés différentes au sein d'une seule et même exécution du programme
Reduce.
En sortie du script ou programme Reducer, on doit écrire des couples (clé;valeur), toujours au
format CLE[TABULATION]VALEUR.
#!/usr/bin/python
import fileinput
6
transactions = None
sales_total = 0
for line in [Link]():
data = [Link]().split("\t")
if len(data) != 2:
# Something has gone wrong. Skip this line.
continue
current_key, current_value = data
if transactions == current_key:
sales_total += float(current_value)
else:
if transactions:
print( "%s\t%.2f" % (transactions, sales_total) )
sales_total = float(current_value)
transactions = current_key
if transactions == current_key:
print( "%s\t%.2f" % (transactions, sales_total) )
Explication du code :
- L’input est relatif à l’output résultant de [Link] avec pour chaque ligne le format
ville\tmontant
- Initialiser transactions_count à None et sales_total à 0 qui sont respectivement les variables
relatives à la ville et au montant.
- Ensuite, il faut supprimer les espaces blancs de début et de fin avec strip().
7
- Par la suite, analyser l'entrée que nous avons obtenue de [Link] avec split("\t").
- si len(data) est différent de 2 alors quelque chose ne va pas. Cette ligne ne convient pas. Donc
il faut sauter cette ligne avec continue.
- Dans le cas contraire nous allons extraire current_key et current_value à partir de data avec
current_key est la ville et current_value est le montant.
- Si transactions_count == current_key c’est-à-dire il s’agit de la même ville, donc sales_total
+= float(current_value). On convertit la valeur du montant (current_value) en float avant de
calculer la somme qu’on stocke dans la variable sales_total. Sinon on affecte directement la
valeur à sales_total. Avant que transactions_count devienne current_key.
- L’affichage du résultat se fait pour chaque couple (transactions_count, sales_total) en
précisant 2 chiffres après la virgule pour sales_total.
Attention : Assurez-vous que le fichier a l'autorisation d'exécution avec chmod :
chmod +x expy/[Link]
Maintenant nous allons tester ce Reducer en local sur les 50 premières lignes du fichier
[Link] en tapant l’instruction suivante, directement à partir de votre répertoire code :
head -50 [Link] | expy/./[Link] | sort | expy/./[Link]
8
5. Lancer un Job entier : Lancer un job entier sur Hadoop implique qu’on fera appel au mapper
puis au reducer sur une entrée volumineuse, et qu’on obtiendra à la fin un résultat, directement
sur HDFS. Pour faire cela, nous devons utiliser le Streaming qui est un programme Hadoop
standard, qu’on l'exécutera donc avec la même commande hadoop jar qu'un programme Hadoop
Java habituel.
Nous utilisons Hadoop Streaming qui permet de créer et lancer des jobs MapReduce avec tout
type d’exécutable ou script en tant que mapper et reducer. La manière standard est d’écrire des
programmes MapReduce en Java via l’API Java MapReduce. Ici nos scripts sont écrits en
Python, mais les mappers et reducers pourraient être des classes Java, des utilitaires unix, des
scripts R, Ruby, etc. Les Mappers liront les données fournies dans le flux standard d’entrée unix
stdin et les réécriront dans la sortie standard stdout via print.
Par défaut, le Framework Hadoop MapReduce est écrit en Java et prend en charge l'écriture de
programmes map/reduce en Java uniquement. Mais Hadoop fournit une API pour écrire des
programmes MapReduce dans des langages autres que Java.
Hadoop Streaming est l'utilitaire qui nous permet de créer et d'exécuter des tâches MapReduce
avec n'importe quel script ou exécutable en tant que mappeur ou réducteur. Il utilise des flux
Unix comme interface entre Hadoop et notre programme MapReduce afin que nous puissions
utiliser n'importe quel langage capable de lire l'entrée standard et d'écrire sur la sortie standard
pour écrire pour écrire notre programme MapReduce.
Hadoop Streaming prend en charge l'exécution de tâches Java, ainsi que l'exécution de tâches
MapReduce non Java programmées sur le cluster Hadoop. Il prend en charge différents
langages de programmation comme Python, par exemple.
Le mapper et le reducer sont les scripts qui lisent l'entrée ligne par ligne depuis STDIN et
émettent la sortie vers STDOUT.
L'utilitaire crée une tâche Map/Reduce et soumet la tâche à un cluster approprié et surveille la
progression de la tâche jusqu'à son achèvement.
Lorsqu'un script est spécifié pour les mappers, chaque tâche de mapper lance le script en tant
que processus distinct lorsque le mapper est initialisé.
La tâche de mapper convertit ses entrées (clés, paires de valeurs) en lignes et pousse les lignes
vers l'entrée standard du processus. Pendant ce temps, le mapper collecte les sorties orientées
9
ligne à partir de la sortie standard et convertit chaque ligne en une paire (clé, paire de valeurs),
qui est collectée en tant que résultat du mapper.
Lorsque le script du reducer est spécifié, chaque tâche du reducer lance le script en tant que
processus distinct, puis le reducer est initialisé.
Au fur et à mesure de l'exécution de la tâche du reducer, il convertit ses paires clé/valeur d'entrée
en lignes et alimente les lignes vers l'entrée standard du processus. Pendant ce temps, le reducer
rassemble les sorties orientées ligne de la sortie standard du processus et convertit chaque ligne
collectée en une paire clé/valeur, qui est ensuite collectée en tant que résultat du reducer.
C'est dans ces options de ligne de commande qu'on va indiquer les scripts ou programmes
Hadoop map et reduce à utiliser.
Syntaxe :
hadoop jar [Link] -input [HDFS INPUT FILES] \
-output [HDFS OUTPUT FILES] \
-mapper [MAP PROGRAM] \
-reducer [REDUCE PROGRAM]
Les paramètres à utiliser sont résumés dans ce tableau :
Paramètres Tâches
-input directory/file-name Affiche l'emplacement de l’input du Mapper
-output directory-name Affiche l'emplacement de l’output
-mapper executable Utilisé pour l'exécutable Mapper
ou script
ou JavaClassName
-reducer executable Utilisé pour l'exécutable du Reducer
ou script
ou JavaClassName
-file file-name Rend le Mapper, le reducer, le combiner exécutables
disponibles localement sur les nœuds de calcul
-numReduceTasks Ceci est utilisé pour spécifier le nombre de reducers
-mapdebug Script à appeler lorsque la tâche du Map échoue
-reducedebug Script à appeler lorsque la tâche du Reducer échoue
Cette instruction donne en paramètres les fichiers correspondant aux Mappers et Reducers, et
les répertoires contenant le fichier d’entrée (myinput) et la sortie à générer (joboutput). Le
répertoire de sortie, après exécution, contiendra un fichier appelé part-00000, représentant la
sortie désirée.
Pour le mapper et le reducer, le préfixe d'une ligne jusqu'au premier caractère de tabulation est
la clé, et le reste de la ligne est la valeur à l'exception du caractère de tabulation. En l'absence
de caractère de tabulation dans la ligne, la ligne entière est considérée comme clé et la valeur
10
est considérée comme nulle. Ceci est personnalisable en définissant l'option de commande -
inputformat pour le mapper et l'option -outputformat pour le reducer.
Le répertoire d’entrée doit contenir un seul fichier. Le répertoire de sortie ne doit pas exister
avant l’exécution de l’instruction.
Avant d’exécuter le job, tout d’abord, nous devons copier le fichier texte du système de fichier
local vers HDFS. Pour cela nous allons utiliser –put comme suit :
hdfs dfs -put [Link]
Après quoi la tâche Hadoop s'exécutera exactement comme une tâche standard. Dans notre cas
le fichier correspondant à [Link] dans la syntaxe est :
/usr/lib/hadoop-0.20-mapreduce/contrib/streaming/[Link]
hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-
[Link] -mapper [Link] -reducer [Link] -
file expy/[Link] -file expy/[Link] -input [Link] -output
outExpy
11
Nous avons exécuté un job hadoop sur le fichier [Link] en utilisant les fichiers [Link]
et [Link] déjà fournis. Le résultat est stocké dans le répertoire outExpy. Dans lequel on
trouvera le fichier part-00000 ayant le résultat final. Pour visualiser le résultat nous allons
utiliser la commande cat.
hdfs dfs -cat outExpy/part-00000
12
Remarque : On peut visualiser l’état du job exécuté via la commande hadoop job –status suivi
du nom du job qu’on pourra le connaître lors de l’exécution du job en question.
Dans notre cas il s’agit de : job_1628774421893_0001
13
Nous exécutons la commande avec l’option status :
Par la suite nous obtenons ce résultat :
14