0% ont trouvé ce document utile (0 vote)
78 vues53 pages

Introduction à Apache Spark et Hadoop

Ce document décrit une formation sur Apache Spark qui comprend des sujets tels que l'architecture et les modes d'exécution de Spark, l'installation de Spark, HDFS et Zeppelin, ainsi que des commandes HDFS et l'interaction avec Spark.

Transféré par

lasthiat
Copyright
© © All Rights Reserved
Nous prenons très au sérieux les droits relatifs au contenu. Si vous pensez qu’il s’agit de votre contenu, signalez une atteinte au droit d’auteur ici.
Formats disponibles
Téléchargez aux formats PPTX, PDF, TXT ou lisez en ligne sur Scribd
0% ont trouvé ce document utile (0 vote)
78 vues53 pages

Introduction à Apache Spark et Hadoop

Ce document décrit une formation sur Apache Spark qui comprend des sujets tels que l'architecture et les modes d'exécution de Spark, l'installation de Spark, HDFS et Zeppelin, ainsi que des commandes HDFS et l'interaction avec Spark.

Transféré par

lasthiat
Copyright
© © All Rights Reserved
Nous prenons très au sérieux les droits relatifs au contenu. Si vous pensez qu’il s’agit de votre contenu, signalez une atteinte au droit d’auteur ici.
Formats disponibles
Téléchargez aux formats PPTX, PDF, TXT ou lisez en ligne sur Scribd

Cette formation dans les grandes lignes

1. Introduction à Apache Spark.


2. L’Architecture et l’écosystèmes de Spark.
3. Modes d'exécution.
4. Installation de Spark, Zeppelin, Hadoop en local.
5. Introduction à Hadoop.
6. Les commandes HDFS. -
7. Interagir avec Spark.
8. Resilient Distributed Datasets.
9. Les opérations RDD
10. Spark SQL / Dataframe
11. Spark UI / Job Monitoring.
12. Utilisation des UDF.
13. Persistance / Partitioning
14. Spark Streaming.
Introduction à Apache Spark

• Apache Spark est un Framework open source de calcul distribué construit


pour effectuer des traitements Big Data et conçu pour la rapidité et la facilité
d’utilisation.
• Développé à l'université de Californie à Berkeley, en 2009 et passé open
source en 2010.
• En 2013, transmis à la fondation Apache, et devient l’un des projets les plus
actifs sur Github.
• Il propose un Framework complet et unifié pour répondre aux besoins de
traitements Big Data pour divers jeux de données et plusieurs type de
source ( Batch, SQL, Streaming, Machine Learning).
L’émergence de Spark

Tendances des tags sur StackOverflow

➢On peut noter l'intérêt grandissant pour Spark en suivant son évolution
dans les termes de recherche sur Google Trends et dans les tags sur
StackOverflow.
L’émergence de Spark

Google Trends de janvier 2013 à septembre 2017.


MapReduce VS Spark

➢MapReduce a grandement simplifié l'analyse Big Data sur des clusters de grande taille
composé de machines susceptibles de tomber en panne.
➢ Les utilisateurs demandent toujours des applications de plus en plus complexes, comme
les opérations itératives ( pour du Machine Learning par exemple) ou des opérations
interactives ( plusieurs requêtes sur un même échantillon des données).

➢ Pour les opérations itératives : le seul moyen par mapReduce est de créé un job pour
chaque itération et partager les données entre les jobs en écrivant physiquement les
données dans HDFS.
➢Les deux applications itératives et interactives nécessitent le partage de données plus
rapide entre les emplois parallèles.

Traitement itératives avec MapReduce


MapReduce VS Spark

➢ Pour des opérations interactives, plusieurs requêtes effectuées manuellement à partir


d'un même dataset : chaque requête doit relire les données, ce qui prend beaucoup de
temps

Traitement interactives avec MapReduce

➢ Le partage des données est lente en MapReduce due à la réplication, la sérialisation, et le


disque IO.
➢La plupart des applications Hadoop, ils dépensent plus de 90% du temps à faire des
opérations de lecture-écriture HDFS.
MapReduce VS Spark

Avec Spark, le traitement basé sur la mémoire partagée, les résultats intermédiaire sont
sauvegardés dans la mémoire centrale ce que rend Spark 100 fois rapide que Mapreduce.

Spark inclut plus de 80 opérateurs pour écrire rapidement des applications en Java, Scala, R
ou Python. Il est possible de l’utiliser de façon interactive pour requêter les données depuis
un Shell.
Architecture de Spark

• Une application Spark est une instance de la classe SparkContext, utilisé pour créer
des RDD : les codes et les objets nécessaires pour l'application Spark.
• Pour exécuter des jobs sur un cluster, SparkContext se connecte au gestionnaire de
ressources pour allouer des ressources du système pour l'application et planifier
l'exécution des tâches.
• Le Driver gère l'exécution et la création de SparkContext et l’écriture de logs.
• Le Executor permet d’exécuter les tâches sur les nœuds.
Les APIs de Spark

• Spark est écrit en Scala et s’exécute sur la machine virtuelle Java (JVM).
• Les langages supportés actuellement pour le développement d’applications sont :
Java, Python, Scala et R.
• À côté de l’API principale de Spark, l’écosystème contient les librairies : Spark SQL,
Spark Streaming, MLIB, GraphX qui permettent de travailler dans le domaine des
analyses Big Data.

Java, Python, Scala, R

Spark SQL Spark Mlib GraphX


Données Streaming Machine Traitement
structurées Temps réel learning de graphes

Core Spark
Modes d'exécution

Déploiement en mode local

• Le Driver et le Executor sont dans une seule JVM.


• Le mode local est très pratique pour tester les applications Spark car il ne nécessite
aucune configuration préalable.
• Le parallélisme est le nombre de threads spécifié dans l'URL principale.
• Pour configurer le parallélisme d’une application Spark, utiliser les configs :
- local : 1 thread uniquement.
- local[n] : n threads.
- local [*] : le nombre de thread disponibles sur la JVM.
Modes d'exécution

En mode cluster :

• Mode Standalone : avec le gestionnaire simple inclus avec Spark pour la


configuration d'un cluster. Il est possible de lancer Spark en mode Standalone sur
une seule machine.

• Spark avec Mesos : Avec un gestionnaire de cluster général qui peut exécuter les
applications Spark et autres applications ( MapReduce...)

• Spark avec Yarn: Le gestionnaire de ressources dans Hadoop 2.


Installation : Spark, Zeppelin, Hadoop
• Spark :

> tar xfz spark-2.0.0-bin-hadoop2.7.tar.zip

Ajouter les configs suivantes dans le fichier $SPARK_HOME/conf/


spark-defaults.conf :
spark.driver.host=localhost
spark.driver.port=20002

• Lancement de zeppelin :

> tar xvf zeppelin-0.6.2-bin-all.tgz


Ajouter SPARK_HOME dans config/zeppelin-env.sh

URL Zeppelin notebook : http://localhost:8080


Installation : Hadoop
• Installation Hadoop en local :
> tar -xvf hadoop-2.8.2.tar.gz
Configuration ( en mode locale ) :

Dans « etc/hadoop/core-site.xml » :

<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>

Dans « etc/hadoop/hdfs-site.xml » :

<configuration>
<property>
<name>dfs.replication</name>
<value>1</value> </property>
</configuration>
Introduction à Hadoop

Hadoop est un Framework open source qui permet d'analyser, stocker et


manipuler de grandes quantités de données.
Il se compose des modules suivants :

• HDFS : Système de fichiers distribués.

• MapReduce : pour le traitement des données.

• Yarn : Gestion des ressources du cluster

• Hadoop Common.
Introduction à Hadoop

HDFS :

• NameNode : garde en mémoire les métadonnées.

• DataNode : stockage des blocs de données.

• Secondary Namenode : Backup du Namenode.

MapReduce :
• Job tracker : gére les jobs, plainifie les tâches à exécuter.

• Task tracker : excuter les tâches ( Map ou Reduce ).


Architecture à Hadoop

App
App
MapRedu
HDFS
ce
Hadoop client
Hadoop Plateforme

Backup node Master node


NameN JobTrac
2ème NameNode ode ker

Slave node 1 Slave node 2 Slave node 3


DataNo TaskTra DataNo TaskTra DataNo TaskTra
de cker de cker de cker
Lancement de Hadoop

1. Formater HDFS :

$ bin/hdfs namenode –format

2. Lancement de NameNode et DataNode :

$ sbin/start-dfs.sh

3. Interface Web pour le NameNode : http://localhost:50070/

4. Lancement d’un exemple :

$ bin/hdfs dfs -mkdir /user


$ bin/hdfs dfs -mkdir /user/<username>
$ bin/hdfs dfs -put etc/hadoop input
$ bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.8.2.jar grep input
output 'dfs[a-z.]+'
$ bin/hdfs dfs -cat output/*

5. Arrêter le NameNode et le DataNode :

$ sbin/stop-dfs.sh
Quelques Commendes HDFS

dfs -ls : Lister les fichiers et sous-répertoires d’un répertoire

dfs -cat : afficher les contenu d’un fichier

dfs -text : afficher les contenu d’un fichier même compressé

dfs -chmod, chgrp, -chown : modifier les droits

dfs -put, -get, -copyFromLocal, -copyToLocal : Copie de fichiers entre filesystem


local et HDFS

dfs -mv : Déplacement de fichiers entre filesystem local et HDFS

dfs -stat : statistiques sur les ressources

dfs -rm : supprimer un fichier


Interagir avec Spark

➢ Pour écrire des application Spark en Java il faut ajouter les dépendances Spark via
Maven.
Interagir avec Spark

➢ Pour écrire des applications Spark en python il y a deux possibilités :

▪ Lancement du pyspark (mode interactif) :

▪ Lancement via un spark submit


Interagir avec Spark

➢ Pour écrire des applications Spark en Scala:

▪ Lancement du spark shell (mode interactif) :


Interagir avec Spark

➢Pour lancer une application spark on utilise le script spark-submit :

• En mode local

• En mode yarn (cluster)


Initialisation de Spark

Création d’une instance SparkContext :

// SparkContext est automatiquement créé au lancement de Spark Shell


from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;

SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);


JavaSparkContext sc = new JavaSparkContext(conf);
Resilient Distributed Datasets ( RDD )

➢Un RDD est une collection immuable : chaque opération créant un nouveau
RDD.
Resilient Distributed Datasets ( RDD )

➢Un RDD est une collection partitionnée des objets : plus de partitions plus de
parallélisme.
Resilient Distributed Datasets ( RDD )

➢Un RDD est tolérant à la panne : si une partition RDD échoue, elle sera créée sur
un autre nœud.
Resilient Distributed Datasets ( RDD )

Exemple de RDD :
Le fichier texte suivant va être transformé en RDD.
Fichier: descriptif.txt Le fichier est transformé en RDD par la function textFile.
Chaque ligne de ce fichier est un élément de l’RDD.

descriptif_RDD = sc.textFile("descriptif.txt")

RDD: descriptif_RDD

Le fichier texte suivant va être transformé en RDD.


Le fichier est transformé en RDD par la function textFile.
Chaque ligne de ce fichier est un élément de l’RDD.
Resilient Distributed Datasets ( RDD )

Exemple de RDD :

RDD: descriptif_RDD

Le fichier texte suivant va être transformé en RDD.


Le fichier est transformé en RDD par la function textFile.
Chaque ligne de ce fichier est un élément de l’RDD.

descriptif_RDD.count()

Valeur = 4
Resilient Distributed Datasets ( RDD )

Exemple de RDD :

RDD: descriptif_RDD

Le fichier texte suivant va être transformé en RDD.


Le fichier est transformé en RDD par la function textFile.
Chaque ligne de ce fichier est un élément de l’RDD.

descriptif_RDD.map(line -> line.toUpperCase())

nouveau RDD

LE FICHIER TEXTE SUIVANT VA ÊTRE TRANSFORMÉ EN RDD.


LE FICHIER EST TRANSFORMÉ EN RDD PAR LA FUNCTION TEXTFILE.
CHAQUE LIGNE DE CE FICHIER EST UN ÉLÉMENT DE L’RDD.
Création d’un RDD

On peut créer un RDD à partir de :

Une collection (List..), transformée en RDD avec la fonction parallelize.

Un fichier local ou distribué (HDFS) texte brut, SequenceFile Hadoop, JSON.

Une base de données : JDBC, Cassandra, HBase…

Un autre RDD auquel on aura appliqué une transformation comme filter, map…
Création d’un RDD

❖ Exemples :

// Création d’un RDD à partir d’une collection :


data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

// Création d'un RDD à partir d'un fichier


distFile = sc.textFile("data.txt")

// Création d’un RDD à partir d’une collection :


List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);

// Création d’un RDD à partir d’un fichier :


JavaRDD<String> distFile = sc.textFile("data.txt");
Les opérations RDD

➔ Les transformations : retournant un nouveau RDD.

➔ Les actions : retournent une valeur.


Les actions RDD

Action Signification

Agréger les éléments du RDD en utilisant la fonction func (qui prend 2 arguments et retourne
reduce(func)
1 résultat).

Retourner tous les items du RDD comme un tableau au programme driver. A utiliser
collect() seulement si le RDD a un volume faible (par ex., après des opérations de type filter très
sélectives).

count() Retourner le nombre d'items du le RDD.

take(n) Retourner un tableau avec les n premiers items du RDD.

first() Retourner le premier item du RDD (similaire à take(1)).

Pour RDD de type (clé, valeur), retourne l’ensemble de paires (clé, Int) avec le nombre de
countByKey()
valeurs pour chaque clé.

Écrit les items du RDD dans un fichier texte dans un répertoire du système de fichiers local,
saveAsTextFile(path) HDFS ou autre fichier supporté par Hadoop. Spark appelle toString pour convertir chaque
item en une ligne de texte dans le fichier.
Les actions RDD

❖ Exemples :

// Création d'un RDD à partir d'un fichier


distFile = sc.textFile("data.txt")

// Nombres d'éléments de distFile


distFile.count()

// Tous les éléments de distFile


distFile.collect()

// Création d'un RDD à partir d'un fichier


JavaRDD<String> distFile = sc.textFile("data.txt");

// Nombres d'éléments de distFile


distFile.count()

// Tous les éléments de distFile


distFile.collect()
Les transformations RDD

Transformation Signification

Retourne un nouveau RDD obtenu en appliquant la fonction func à chaque item du RDD de
map(func)
départ.
Retourne un nouveau RDD obtenu en sélectionnant les items de la source pour lesquels la
filter(func)
fonction func retourne “vrai”.

Similaire à map mais chaque item du RDD source peut être transformé en 0 ou plusieurs
flatMap(func)
items ; retourne une séquence (Seq) plutôt qu’un seul item.

Retourne un RDD qui est l’union des items du RDD source et du RDD argument
union(otherDataset)
(otherDataset).

distinct() Retourne un RDD qui est obtenu du RDD source en éliminant les doublons des items.

Liste des actions et transformations : http://spark.apache.org/docs/2.1.1/programming-guide.html


Les transformations RDD

➔ Exemples avec l'expression Lambda:

// Création d’un RDD à partir d’une collection :


data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

// Ajout de 1 à chaque élement


rdd1 = distData.map(lambda e : e + 1)

// Filter des éléments > 3 :


rdd2 = rdd1.filter(lambda e : e > 3)

// Somme des éléments:


somme = rdd2.reduce(lambda e1, e2 : e1 + e2)

somme
Les transformations RDD

➔ Exemples avec l'expression Lambda:

// Création d’un RDD à partir d’une collection :


List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);

// Ajout de 1 à chaque élement


JavaRDD<Integer> rdd1 = distData.map(e -> e + 1);

// Filter des éléments > 3 :


JavaRDD<Integer> rdd2 = rdd1.filter(e -> e > 3);

// Somme des éléments:


int somme = rdd2.reduce( (e1, e2) -> e1 + e2);

System.out.println(somme);
Les transformations RDD

➔ Exemples sans l'expression Lambda:

// Ajout de 1 à chaque élement


JavaRDD<Integer> rdd1 = distData.map(new Function<Integer, Integer>() {
public Integer call(Integer e) { return e + 1; }
});

// Filter des éléments > 3 :


JavaRDD<Integer> rdd2 = rdd1.filter(new Function<Integer, Boolean> () {
public Boolean call(Integer e) { return e > 3; }
});

// Somme des éléments:


int somme = rdd2.reduce(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer e1, Integer e2) throws Exception { return e1 + e2; }
});

System.out.println(somme);
Spark SQL

Spark SQL, composant du framework Apache Spark, est utilisé pour effectuer des
traitements sur des données structurées en exécutant des requêtes de type SQL sur les
données Spark.

Spark SQL construit des Data Frame à partir des différents sources de données comme:
fichiers ORC ou JSON, RDD, Table Hive, etc.

Les Data Frame sont des RDDs représentées logiquement en lignes et colonnes en
mémoire (contrairement à Hive: sur disque).
Spark SQL

Les Data Frames peuvent être créées à partir de différentes sources de données :

Des RDD existants.


Des fichiers de données structurées.
Des jeux de données JSON.
Des tables HIVE.
Des bases de données externes.
Spark SQL

● Spark SQL fournit SQLContext afin d’encapsuler les fonctions SQL dans Spark.
● SQL Context est créé à partir du SparkContext existant.

// Spark 1.6 :
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

// Spark 2 :
spark = SparkSession.builder.appName("appName").getOrCreate();

// Spark 1.6 :
import org.apache.spark.sql.SQLContext;
JavaSparkContext sc = ...; // An existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

// Spark 2 :
SparkSession spark = SparkSession.builder().getOrCreate();
Spark SQL

// Create DateFrame from json file


df = sqlContext.read.json("examples/src/main/resources/people.json")

// Show the content of the DataFrame


df.show()

// Print the schema in a tree format


df.printSchema()

// Select only the "name" column


df.select("name").show()

// Select everybody, but increment the age by 1


df.select(df("name"), df("age") + 1).show()

# Select everybody, but increment the age by 1


df.select(df['name'], df['age'] + 1).show()

# Select people older than 21


df.filter(df['age'] > 21).show()

# Count people by age


df.groupBy("age").count().show()
Console Web Spark – Spark UI

Spark UI est une interface Web pour surveiller les exécutions des Jobs Spark
Console Web Spark – Spark UI

• Une Task Spark est une unité de traitement, exemple : map, filter

• Un Job Spark est un ensemble des Task.

• Une Stage Spark regroupe un ensemble des Task qu’il est possible
d’exécuter ensemble.

• Les Stages sont séparées par une phase de Shuffle.


Utilisation des UDF

Les User Defined Functions (UDF) permettent de créer une nouvelle colonne dans un
dataframe qui sera le résultat d'un calcul pouvant utiliser les valeurs d'une (ou
plusieurs) colonne(s) existante(s).

Une UDF prend en argument :

- un objet de type colonne (pyspark.sql.Column)

et retourne :

- une valeur de type pyspark.sql.types.


RDD persistants et transitoires

•Un RDD peut être marqué comme persistant pour le réutiliser dans d'autres chaînes, il
sera stocké en RAM et disponible comme source de données pour d'autres
transformations.

•S’il n’est pas marqué comme persistant, le RDD sera transitoire et ne sera pas conservé
en mémoire après calcul
df1.cache()
save

DF UDF DF1
LOAD

sum

T2
RDD Partitioning
• Une partition représente une partie de la donnée d’un RDD.
• Le nombre de partitions par défaut d’un RDD est égal au nombre de cores que
possède la machine.
• Chacune des partitions sera à la charge d’un executor lors des traitements.
Spark streaming

➔ Traitement des données en temps-réel.

➔ Les données peuvent être reçues de différentes sources: messages Kafka,


logs des sites web , HDFS, Twitter….
➔ Les données en sortie sont stockées dans : databases (Oracle … ), système des
fichiers (Hdfs …) et Hive …
Spark streaming

➔ Spark Streaming est composé d’un Receiver qui consomme les données en mode
micro-batch et les transforme en un RDD spécifique appelé DStream
(flux discrétisé).
➔ Les données reçues sont découpées en tranches DStream selon une durée
précisée.
Spark streaming

➔ Les DStreams et les Receivers sont tolérants à la panne : par défaut chaque
DStream est dupliqué sur deux exécuteurs différents : si un est échoue l’autre le
remplace.
Spark streaming

➔ StreamingContext() est le point d’entrée de l’application Streaming

// sc : sparkContext est automatiquement créé au lancement de Spark Shell


from pyspark.streaming import StreamingContext

# Création de StreamingContext avec une durée batch égale à 10s


psc = StreamingContext(sc, 10)

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

# Création de JavaStreamingContext avec 2 working thread et une durée égale à 10s


SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("streamApp");
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(10));
Spark streaming

➔ Toutes les opérations doivent être définies avant le lancement de


l’application.

# To set log level


sc.setLogLevel("ERROR")
# To print streaming output to the console from a Dstream :
dStream.pprint()
# To start spark StreamingContext
psc.start()
# To stop spark StreamingContext
psc.stop()

# To set log level


jsc.sparkContext().setLogLevel("ERROR");
# To print streaming output to the console from a Dstream :
dStream.print()
# To start spark JavaStreamingContext
jsc.start()
# To stop spark JavaStreamingContext
jsc.stop()
Spark streaming

❖ textFileStream() : streaming des données à partir d’un répertoire du système de fichiers local ou
HDFS.
❖ socketTextStream() : streaming des données à partir de protocole TCP.

❖ saveAsTextFiles() Ecrit les items dans un fichier texte dans un répertoire du système de fichiers local ou
HDFS.
// Streaming from local/HDFS Directories
textStreamInput = psc.textFileStream("pathToDirectory")

// Streaming from TCP socket


socketStreamInput = socketTextStream("hostName", portNumber)

// Save output as text file


socketStreamInput.saveAsTextFiles("pathToFile")

// Streaming from local/HDFS Directories


JavaDStream<String> textStreamInput = jsc.textFileStream("pathToDirectory");

// Streaming from TCP socket


JavaReceiverInputDStream<String> socketStreamInput =
jsc.socketTextStream("hostName", portNumber);

// Save output as text file


socketStreamInput.dStream().saveAsTextFiles("pathToFile","suffix")

Vous aimerez peut-être aussi