0% ont trouvé ce document utile (0 vote)
48 vues105 pages

Big Data & Spark - Scala

Transféré par

aliou.diack
Copyright
© © All Rights Reserved
Nous prenons très au sérieux les droits relatifs au contenu. Si vous pensez qu’il s’agit de votre contenu, signalez une atteinte au droit d’auteur ici.
Formats disponibles
Téléchargez aux formats PDF, TXT ou lisez en ligne sur Scribd
0% ont trouvé ce document utile (0 vote)
48 vues105 pages

Big Data & Spark - Scala

Transféré par

aliou.diack
Copyright
© © All Rights Reserved
Nous prenons très au sérieux les droits relatifs au contenu. Si vous pensez qu’il s’agit de votre contenu, signalez une atteinte au droit d’auteur ici.
Formats disponibles
Téléchargez aux formats PDF, TXT ou lisez en ligne sur Scribd

BIG DATA

Babacar GUEYE - Data Engineer


PLAN

INTRODUCTION AU BIG DATA

INITIATION AU LANGAGE SCALA

UTILISATION DE SPARK / SCALA


INTRODUCTION AU BIG DATA
EXPLOSION DE LA DATA
C’EST QUOI LA DATA ?

• Les données sont une collection de faits, tels que des nombres, des mots, des mesures, des
observations ou simplement des descriptions de choses.
• Chiffres, caractères, symboles images …
• Les données doivent être interprétables par un humain ou une machine pour en tirer un
sens il faut que cette dernière soit placée dans un contexte
C’EST QUOI LA METADATA ?

Par définition Par exemple

• Les données de la donnée • Type de données: Texte, image, audio, vidéo, etc.

• Décrivent les caractéristiques des • Format de données: CSV, JSON, XML, etc.
données elles-mêmes • Taille de données: en octets, kilooctets, mégaoctets,
etc.
• Aide à comprendre les données
• Date de création et de modification des données
• Source des données: site web, système d'information,
capteurs, etc.
• Auteur des données
DATA LIFECYCLE ?
DATA PIPELINE ?

Others …

Data Ingestion Data Processing Consumption


ELT VS ETL

• Données brutes extraites à partir différentes


sources
• Les données brutes sont ensuite traitées a
partir du staging
• Les données traitées sont chargées dans des
systèmes de stockage.
ELT VS ETL

• Données brutes extraites à partir de


diverses sources
• Les données brutes sont chargées dans
des systèmes de stockage
• Les données chargées sont traitées depuis
le système de stockage.
DÉFINITION DU BIG DATA

A chaque instant, nous produisons de la donnée par exemple :


- Un avion en vole génère environ 10 Terabytes de données chaque 30min
- Facebook ingère chaque jour plus de 500 Terabytes de nouvelles données dans ses bases
de données.
Des données de tous types sont produites (images, vidéo, textes…) et ce phénomène est
appelé Big Data.
Le terme de Big Data désigne de vastes ensembles de données collectées, pouvant être
explorées et analysées afin d’en dégager des informations exploitables.
Le Big Data peut être défini à travers 5 axes
❑ Volume : Chaque jour en moyenne, 2,5 quintillions d'octets
des données sont produites par l'homme
❑ Variété : Ces données peuvent être de tout type (structurées,
semi-structurées et non structurées)
❑ Vitesse : Les données sont produites à des fréquences élevées
et doivent également être traitées très rapidement afin d’en
ressortir de la valeur.
❑ Valeur : Il s’agit des actions faites ou décisions prises sur la
base des résultats obtenus après traitement de la donnée.
❑ Véracité : Ce pendant, il faudra s’assurer que la donnée reçue
est bonne, de qualité et n’a pas été altérée
BIG DATA JOBS

Data Analyst
Analyser les données pour les transformer en informations exploitables
• Définir la stratégie Data-Driven de l’entreprise
• Créer et maintenir les bases de données de l’entreprise
• Elaborer les critères de segmentation
Skills:
• Computer Science
• Statistiques
• Utilisation Excel
• Visualisation de données
• Programmation: Python, Scala, SAS, SQL, …
BIG DATA JOBS

Data Scientist
La science des données est une méthode systématique dédiée à la connaissance, découverte via l'analyse des données
• En entreprise, optimiser les processus organisationnels pour Efficacité
• En sciences, analyser des données expérimentales/observationnelles pour dériver des résultats
Skills:
• Computer Science
• Statistiques + Mathématiques
• Programmation: Python, R, SAS, JAVA, SQL
• Machine Learning
• Visualisation
• Connaissances du domaine
BIG DATA JOBS

Big Data Architect


L’architecte Big Data se charge de créer et d’optimiser des infrastructures de stockage, manipulation et restitution. Il doit
élaborer une architecture de Data Management et concevoir un plan pour intégrer, centraliser, protéger et maintenir les
données.
Skills:
• Computer Science
• Maitrise des infrastructures serveur
• Programmation: Python, SQL, Scala, …
• Connaissance pointue des technologies Big Data
BIG DATA JOBS

Big Data Administrator


Il intervient dans la conception, l’optimisation et la configuration des infrastructures de stockage des données massives. Il
assure également la sécurisation des données ainsi que l’attribution des autorisations et des droits d’accès aux différents
utilisateurs.
Skills:
• Computer Science
• Connaissance admin système
• Programmation: Python, Shell scripting
• Maintenir la plateforme Big Data
BIG DATA JOBS

Business Intelligence Engineer (informatique décisionnelle)


Il doit analyser les informations pertinentes, et fournir des rapports détaillés basés sur ces analyses pour permettre à
l’entreprise d’agir. Il implémente enfin des recommandations en matière d’outils de reporting et de gestion d’intelligence.
Skills:
• Computer Science
• Concevoir des architectures de stockage de données
• Base de données : Sql Server, Mysql, Postgres …
• Programmation: Python, SAS, SQL, Talend …
• Visualisation: Power BI, Tableau
• Connaissances du domaine
• Machine Learning
BIG DATA JOBS
Data Engineer
L'ingénierie des données est le domaine qui développe et fournit
• Des systèmes de gestion et d'analyse de données volumineuses
• Construire des plates-formes de données modulaires et évolutives pour les données scientifiques
• Déployer des solutions Big Data

Skills:
• Computer Science
• Bases de données
• Traitement de données en temps réel / données massives
• Programmation: Python, Scala, SQL, …
• Connaissance ETL / ELT
• Connaissance sur le cloud
• Comprendre les facteurs de performance et les limites des systèmes
BIG DATA JOBS
BIG DATA USE CASE
L’ECOSYSTÈME HADOOP
➢ Les plateformes « On Premise »

Une architecture bien précise est définie (Un Cluster)

Les services qu’embarquent les Frameworks sont installés sur les nœuds du Cluster

Exemple de Framework

Hortonworks

Cloudera

Exemple d’architecture

▪ Type StandAlone

▪ Type Master - Slave – Edge

➢ Les plateformes Cloud


L’ECOSYSTÈME HADOOP

Framework de la fondation Apache


depuis 2009
Ensemble de services permettant :
➢ L’ingestion
➢ Le stockage
➢ L’analyse de gros volumes de
données.
ARCHITECTURE HDFS
Apache HDFS ou Hadoop Distributed File System est un système de fichiers structuré en blocs
dans lequel chaque fichier est divisé en blocs d'une taille prédéterminée. Ces blocs sont stockés sur un
cluster d'une ou plusieurs machines. L'architecture Apache Hadoop HDFS suit une architecture
maître/esclave , où un cluster comprend un seul NameNode (nœud maître) et tous les autres nœuds sont
des DataNodes (nœuds esclaves). HDFS peut être déployé sur un large éventail de machines prenant en
charge Java.
NAMENODE & DATANODES
Fonctions de NameNode :
C'est le démon maître qui maintient et gère les DataNodes (nœuds esclaves)
Il enregistre les métadonnées de tous les fichiers stockés dans le cluster, par exemple l'emplacement des blocs
stockés, la taille des fichiers, les autorisations, la hiérarchie, etc. Il existe deux fichiers associés aux métadonnées :
• FsImage : Il contient l'état complet de l'espace de noms du système de fichiers depuis le démarrage du
NameNode.
• EditLogs : Il contient toutes les modifications récentes apportées au système de fichiers par rapport au
FsImage le plus récent.
En cas de défaillance du DataNode, le NameNode choisit de nouveaux DataNodes pour les nouvelles
répliques, équilibre l'utilisation du disque et gère le trafic de communication vers les DataNodes.
Fonctions de DataNode :
▪ Ce sont des démons ou processus esclaves qui s'exécutent sur chaque machine esclave.
▪ Les données réelles sont stockées sur DataNodes.
▪ Les DataNodes effectuent les requêtes de lecture et d'écriture de bas niveau des clients du
système de fichiers.
▪ Ils envoient périodiquement des battements de cœur au NameNode pour signaler la santé
globale de HDFS. Par défaut, cette fréquence est définie sur 3 secondes.
HEARTBEATS

Dans Hadoop, le nœud Name et le nœud de données


communiquent à l'aide de Heartbeat . Par conséquent,
Heartbeat est le signal envoyé par le datanode au
namenode après un intervalle de temps régulier
(3 secondes) pour indiquer sa présence le cas contraire le
NameNode planifie création de nouvelles répliques de ces
blocs sur d'autres DataNodes.
METADATA

Les métadonnées contiennent les informations des blocs de chaque fichier, la composition des
blocs et l’emplacement de ceux-ci sur les datanodes / workernodes.
Les métadonnées sont stockées sur les namenodes / masternode.
BLOCS

Les blocs ne sont rien d’autre que le plus petit emplacement continu sur votre disque dur où les
données sont stockées. En général, dans n'importe quel système de fichiers, vous stockez les
données sous la forme d'un ensemble de blocs. De même, HDFS stocke chaque fichier sous
forme de blocs dispersés dans le cluster Apache Hadoop. La taille par défaut de chaque bloc est
de 128 Mo dans Apache Hadoop 2 . x qu’on peut configurer selon les besoins.
HDFS fournit un moyen fiable de stocker des données volumineuses dans un environnement
distribué sous forme de blocs de données. Les blocs sont également répliqués pour fournir une
tolérance aux pannes. Le facteur de réplication par défaut est 3, qui est à nouveau configurable.
PROBLÈME DE PETITS FICHIERS

Le système de fichiers distribué Apache Hadoop (HDFS) a été développé pour stocker et traiter
de grands ensembles de données.
Les petits fichiers peuvent entraîner un certain nombre de complications, ce qui entraîne une
utilisation inefficace de la mémoire Namenode une dégradation du débit d'analyse des blocs et
une réduction des performances de la couche application.
Plus de fichiers signifie plus de demandes de lecture qui doivent être servies par le NameNode,
ce qui peut finir par obstruer la capacité de NameNode à le faire. Cela entraînera une
dégradation des performances et de la réactivité
INITIATION AU LANGAGE SCALA
LES RÈGLES DE BASE

Scala est un langage de programmation développé en 2001 par Martin Odersky à l'École
Polytechnique Fédérale de Lausanne (EPFL) en Suisse. La première version publique est
sortie en fin 2003.
Il est Scalable (d’où son nom Scala)
Scala est tiré du Java avec une Syntaxe beaucoup plus simple
Scala combine :
La programmation fonctionnelle :
(basée sur des fonctions. Ex : val a = List (1,2). En
réalité List() est une fonction qui derrière, combine des
Iterable, Seq…etc Mais l’utilisateur ne voit que la
fonction simple)
Et la programmation Orienté Objet :
Scala est Dynamiquement typé (Ex: val a = 2 , renvoie
un Int)
Le caractère ‘;’ en fin d’instruction est optionnel sauf si plusieurs instructions sur une ligne
• Sensible à la casse
Exemple : "NomVariable" est différent de "nomvariable".
• Le nom des classes débute par une majuscule
• Les méthodes commencent par une minuscule
Exemple : def meMethode()
• Le nom du fichier du programme doit correspondre au nom de l’objet en question
La fonction def main(args: Array[String]) est obligatoire et représente le point d’entrée du
programme
• Les noms de variables, d’objets, de classes, de fonctions débutent par une lettre ou un underscore.
Exemple : "nom" , "_nom" , "_1_nom"
• Commentaires:
// …mon commentaire => une ligne
/* …mon commentaire */ => multiple lignes
• Import de modules:
import [Link]._ => importe toutes les méthodes dans me
import [Link] => n’importe que la méthode porterHabit
import [Link].{seLaver, porterHabit} => n’importe que les méthodes porterHabit et seLaver
LES VARIABLES
Syntaxe Générale
Keyword nomVaribale [ : DataType] = valeur
Les valeurs de keywords peuvent être:
val : Pour désigner une variable immuable (dont le contenu est non modifiable après assignation)
var : Pour désigner une variable mutable (dont le contenu est modifiable après assignation)
lazy val : Pour désigner une variable qui n’est évaluée que lorsqu’elle est appelée
Exemple : var maProfession = ’’data engineer’’
On peut déclarer plusieurs Variables en même temps:
Exemple 1 : val (a,b,c) = (1,2,3) # a, b, c valent 1,2,3
Exemple 2 : val a,b = 100 # a et b valent 100
EXERCICE : Variables
▪ Créer une variable mutable nommée « profession » et renseigner la valeur : Data Engineer
Vous avez été muté et vous êtes maintenant « scrum master ». Changer votre profession.
▪ Créer en une seule commande les variable nom, prenom et age. Pour des raisons de fraudes
sur l’identité de la personne, personne ne doit pouvoir modifier les valeurs de ces variables.
▪ Afficher les variables à l’aide de la fonction println(" ")
println(s" Mon age est $age")
println(" Et mon nom complet est %s %s".format(nom, prenom))
EXERCICES : Manipulation des String
Utiliser ces fonctions et commenter le résultat
val proverb = ‘’Je suis informaticien spécialisé dans le domaine de la data’’
▪ proverb(0) ▪ [Link]('e’)
▪ [Link] ▪ [Link](lettre => lettre =='e’)
▪ [Link] ▪ [Link]("e", "a")
▪ proverb. toUpperCase ▪ [Link](" Une autre phrase")
▪ [Link](0,9) ▪ [Link](lettre => lettre !='e')
EXERCICE : Variables
1. Créer une variable « maPhrase » avec la valeur "apprendre à positiver ses emotions pour
être en harmonie avec soi-meme et avec les autres"
2. A l’aide des fonctions prédéfinies sur les String, récupérer la sous chaine "en harmonie
avec soi-meme" et stocker là dans une variable mutable nommée « motivation »
3. Remplacer "soi-meme" se trouvant dans la variable « motivation » avec "autrui"
4. Compter le nombre d’occurrences de la lettre ‘a’ dans la variable « motivation »
LES OPERATEURS
▪ Opérateurs de comparaison
== ➔ égalité
!= ➔ différent
>, >= ➔ strictement supérieur et supérieur ou égal
<= ➔ strictement inférieur et inférieur ou égal
▪ Opérateurs logiques
&& ➔ ET logique
|| ➔ OU logique •
! ➔ NOT
▪ Opérateurs d‟affectation
=, +=, -=, *=, /=, %=
EXERCICE : Opérateurs
▪ Combien de temps (en Jours et en Heure) il faut à un marcheur pour parcourir une distance de 750km à une vitesse de 4.8km/h
NB : Vitesse = Distance/Temps et la fonction println() permet d’afficher un résultat
▪ Le gouvernement a décidé d'offrir une prime de 300€ à certains fonctionnaires en fonction de leur salaire et de leur ancienneté.
Comme toutes les autres mesures prises par le gouvernement, il est difficile de comprendre à qui cette mesure s'applique.
De ce que vous avez compris, une personne peut toucher à la prime si :
• Critère 1 : Elle a moins de 5 ans d'ancienneté et son salaire est strictement inférieur à 1500 euros.
• Critère 2 : Elle a entre 5 et 10 ans d'ancienneté et son salaire est compris entre 1500 et 2300 euros.
• Critère 3 : Elle a plus de 10 ans d'ancienneté et son salaire est strictement inférieur à 1500 euros ou supérieur à 2300 euros. C'est
à dire qu'une personne ayant plus de 10 ans d'ancienneté et un salaire entre 1500 et 2300 euros ne peut pas toucher à cette prime.
▪ Peuvent – ils toucher aux primes ?
Bernadette a 12 ans d'ancienneté et un salaire de 2400 euros.
Marc a 6 ans d'ancienneté et un salaire de 1490 euros.
LES COLLECTIONS
❑ Les Tableaux (ou Array)
Syntaxe :
▪ En initialisant la taille du tableau
val tab = new Array[DataType](longueur)
▪ En précisant les valeurs par défaut
val tab2 [:Array[T]] = Array(value1, value2…., valueN)
Exemple :
val tab = new Array[Int](3) / val tab2 : Array[Int] = Array(2,2,3)
NB : Les array sont mutables
❑ Les Tableaux (ou Array)
Ajout de valeurs (avant - après) :
val v1 = Array(4,5,6)
val v2 = v1 :+ 7 // Array(4, 5, 6, 7)
val v3 = v2 ++ Array(8,9) // Array(4, 5, 6, 7, 8, 9)
val v4 = 3 +: v3 // Array(3, 4, 5, 6, 7, 8, 9)
val v5 = Array(1,2) ++: v4 // Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
❑ Quelques fonctions natives :
val nums = Array(1,2,3) // oubien (1 to 3).toArray
[Link](_ * 2) // Array(2, 4, 6)
[Link](element => element < 3) // Array(2)
[Link](2) // 1
[Link] // 3
EXERCICE : Tableaux
1. Créer un tableau nommé Fruits
▪ Initialiser le tableau avec les Valeurs « Pomme », « Banane », « Poissons », « Mangue »
2. On s’est rendu compte tardivement que les « Poissons » ne sont pas des fruits: Supprimer le
du tableau.
3. Rajouter à la liste de fruits suivante à ’Fruits’ : [‘Orange’, ‘Papaye’]
4. Quelle est la nouvelle taille de ‘Fruits’ ?
5. A quel index se trouve la ’Mangue’ ?
❑ Les Listes
Syntaxe :
val maListe: [List[T]] = List(element1, element2,… elementN)
Val maListe = element1 :: element2 :: ….. :: ElementN :: N
val maListe = List()
Exemple :
val maListe : List[String] = List("a", "b", "c", "d", "a")
NB : Les sont immutables
❑ Les Listes
Ajout de valeurs (avant - après) :
val x = List(1,2,3)
val y = 0 :: x //List(0, 1, 2, 3)
val z = y ::: List(4,5) // List(0, 1, 2, 3, 4, 5)
val l = [Link](y, List(4,5)) // List(0, 1, 2, 3, 4, 5)
val l2 = y :+ 5 // List(0, 1, 2, 3, 4, 5)
Quelques fonctions natives : val maList: List[String] = List("a", "b", "c", "d", "a", "z")
• [Link](0, 2) ➔ List("a", "b") • [Link]("b") ➔ true
• [Link](1) ➔ z • [Link] ➔ 6
• [Link](x => x !="b" ) => List("a", "c", "d", "a") • [Link](x => x =="a") ➔ 2
EXERCICE : Listes
Soit la liste suivante : ["pomme", "banane", "goyave", "banane", "kaki", "pomplemousse"]
1. Créer la liste et stockée la dans une variable « Fruits »
2. Afficher le 1er et le 3e élément de la liste
3. Ajouter les fruits « ananas » et « pastèque » respectivement au début et à la fin de la liste Fruits et
stocker la nouvelle liste dans une liste « Fruits_1 » en une ligne de commande
4. Quelle est la nouvelle taille de « Fruits_1 »
5. Vérifier que « goyave » fait partit de « Fruits_1 »
6. Compter le nombre de fois que « banane » apparait dans « Fruits_1 »
7. Dans une variable « Fruits_sorted », trier les fruits de « Fruits_1 »
8. Supprimer tous les fruits de « Fruits_sorted » dont la taille dépasse 5 caractères et stocker dans une
variable « Fruits_small »
9. Afficher « Fruits_small »
❑ Les Set
Syntaxe :
val maSet: [Set[T]] = Set(element1, element2,… elementN)
Exemple : val maSet : Set[Int] = Set(1, 8, 4, 9)
NB : Les Set ne contiennent pas de doublons
❑ Les Set
Ajout et suppression :
val maSet = Set(1,2,3)
val y = maSet + 0 //Set(1, 2, 3, 0)
val z = y ++ Set(4,5) // Set(0, 1, 2, 3, 4, 5)
val l = z - 2 // Set(0, 1, 3, 4, 5)
val l2 = l -- Set(1, 2, 3, 0) // Set(4, 5)
Quelques fonctions natives :
val maSet1 : Set[Int] = Set(1, 8, 4, 9) val maSet2 : Set[Int] = Set(1, 8, 5, 0, 12)
• [Link] ("-") ➔ "1-8-4-9" • [Link](1) ➔ Set(1) • [Link](4) ➔ true
• [Link](10) ➔ false • [Link](maSet2) ➔ Set(1, 8) • [Link](1) ➔ Set(8, 4, 9)
• [Link](_ > 4) ➔ Set(8, 9) • [Link](maSet2) ➔ Set(4, 9)
EXERCICE : Set
▪ Créer un Set contenant les éléments "I","am","trained","to be", "a", "data", "engineer"
▪ Récupérer les éléments contenant la lettre "a"
▪ En utilisant map, transforme le Set en une collection contenant la taille des éléments de
l'ensemble.
▪ filtrer les valeurs paires
▪ Calculer la somme totale de cet ensemble d'entiers
❑ Les Tuples
Syntaxe :
val maTup : TupleX[T1, T2, …] = TupleX(el1, el2, …)
Exemple :
val maTup : Tuple3[String,String,Int]=Tuple3("dia", "mor", 28)
Quelques fonctions natives :
val maTup : Tuple3[String,String,Int]=Tuple3("dia", "mor", 28) •
maTup._1 ➔ "dia"
maTup._3 ➔ 28
[Link](2) ➔ "mor"
[Link] ➔ 3
EXERCICE : Tuples
▪ Créer un Tuple "moi" contenant votre prenom, nom, taille, sexe
▪ Accéder à la votre taille
▪ Renvoyer la phase suivante: Je suis prenom nom, j'ai une taille de valTaille et de sexe
valSexe.
❑ Les Map
Syntaxe :
val maMap: Map[K, V] = Map(k1 -> v1, k2 -> v2, …)
Exemple :
val maMap : Map[String, String] = Map("nom"-> "dia", "prenom"->"mor", "age"->"28")
Quelques fonctions natives :
val maMap: Map[String, String] = Map("nom"-> "dia", "prenom"->"mor", "age"->"28")
[Link] ➔ Set(nom, prenom, age)
[Link] ➔ MapLike(dia, mor, 28)
[Link]("sex", "Key unknown !") ➔ " Key unknown !"
[Link](_._1 == "age") ➔ Map(age -> 28)
[Link] ➔ 4
EXERCICE : Map

▪ Créer le dictionnaire weekDay conntenant les valeurs suivantes : 1=>lundi, 2=>mardi,


3=>mercredi, 4=>jeudi, 5=>vendredi, 6=>samedi, 7=>dimanche
▪ Recupérer les clefs de weekDay dans une variable weekDayKeys
▪ Convertir weekDayKeys en liste dans une variable weekDayKeys1
▪ Filtrer les éléments de weekDayKeys1 qui sont pairs dans une variable weekDayKeys2
▪ Pour chaque élément de weekDay, afficher la phrase 'La clef ... a pour valeur ...„
▪ Récupérer les valeurrs de weekDay dans une variable weekDayVals
▪ Afficher avec printl 'Les jours de la semaine sont Lundi, Mardi, Mercredi, Jeudi, Vendredi,
Samedi, Dimanche.'
LES STRUCTURES DE CONTRÔLE

❑ IF – ELSE
Syntaxe générale : Exemple : if (1==2){
If (condition) { print(“true”)
// code }else {
} else if (condition) { print(“False”)
// code }
}else {
// code
}
La boucle For La boucle While La boucle Do - While

Syntaxe générale : Syntaxe générale : Syntaxe générale :

for( itterator <- collection) { while( expression booléenne) { do {


//code // code // code

} } } while( expression booléenne)

Exemple : Exemple : Exemple :

for( i <- 1 to 10) { var i = 1 var i = 1

println(i) while( i <=10) { do {

} println(i) i+=1 println(i) i+=1


} } while( i <=10)
PATTERN MATCHING

❑ Match - Case
Syntaxe générale : Exemple :
var age = 18
nomVar match {
age match {
case exp1 => instruction1
case 18 => println("Vous avez 18 ans.")
case exp2 => instruction2 case x if (x < 18) => println("Vous avez moins de 18 ans.")
… case 19 | 20 => println("Vous avez 19 ou 20 ans.")
case _ => instruction par défaut case _ => println("Vous avais plus de 20 ans")
}
}
EXERCICE : Les structures de contrôles
Copier la fonction suivante dans votre ide, au niveau de la fonction Main :
def randomAge(): Int={
val r = [Link]
[Link](100)
}
Déclarer une variable « mon_age » qui est égale randomAge()
Vérifier si vous êtes un enfant, un adolescent un adulte ou un papi. En sachant que :
0 < enfant < 9
10 <= adolescent<= 18
18 < adulte <= 60
Papi > 60
UTILISATION DE SPARK / SCALA
DÉFINITION - OBJECTIFS - ARCHITECTURE
Apache Spark est un système de traitement distribué (répartis sur plusieurs machines) open source utilisé pour les
traitements de données à grande échelle. Spark est en développé en scala.
▪ En 2009, Spark fut conçu par Zaharia lors de son doctorat au sein de l'université de Californie et en 2013,
transmis à la fondation Apache.
▪ En 2014, Spark a gagné le Daytona GraySort dont l'objectif était de trier 100 To de données le plus rapidement
possible.
▪ Ce record était préalablement détenu par Hadoop. Pour ce faire, Spark a utilisé 206 machines obtenant un
temps d'exécution final de 23 minutes alors que Hadoop avait utilisé 2100 machines pour un temps d'exécution
final de 72 minutes.
▪ La puissance de Spark fut démontrée en étant 3 fois plus rapide et en utilisant approximativement 10 fois
moins de machines.
❑ Spark vs Mapreduce
• Sous MapReduce, les données sont lues et écrites (sur disque) à chaque fois entre deux opérations
• Ces lectures et écritures dans HDFS successives rallongent significativement les temps de latence
❑ Spark vs Mapreduce
Avec Spark, le « stockage » en mémoire effectué entre plusieurs opérations est beaucoup plus rapide.
• Un autre point qui donne à Spark des performances supérieures à celle de MapReduce : les
évaluations paresseuses (“lazy evaluation”)
• Dans le cas où le cluster ne possède pas de mémoire suffisante, les données sont écrites/lues sur
disque de la même façon que Hadoop MapReduce.
▪ Spark a été crée pour améliorer MapReduce qui est un sous projet de Hadoop
▪ Mapreduce: les données sont écrites sur le disque après chaque opération
▪ Spark exécute la totalité des opérations en mémoire RAM . Il ne s'appuie sur des disques
seulement lorsque sa mémoire n'est plus suffisante.
▪ De ce fait, là où MapReduce travaille par étape, Spark travaille sur la totalité des données en
même temps.
▪ Spark est capable de travailler avec une partie des données en mémoire et une autre sur
disque
Le Framework Spark possède plusieurs fonctionnalités.
▪ Le Spark Core : c’est le système central de Spark. C’est une brique dédiée au traitement
distribué des données (comme Hadoop MapReduce).
▪ Les modules avancés : Ils sont développés au-dessus de Spark Core et permettent de faire des
traitements complexes (Streaming, machine learning, SQL…)
L’architecture Spark est assez analogue à l’architecture master/slave développée dans Hadoop
MapReduce v1. Sous Spark, un programme exécuté est appelé une application Spark.
L’architecture Spark est formée par 4 entités distinctes:
▪ Le Driver.
▪ Le master / cluster manager
▪ Les slaves / Workers
▪ Les exécuteurs
▪ Spark driver : est une JVM à laquelle on a alloué des ressources (cpu, mémoire, cœurs etc…) Le
Driver n'exécute pas de calculs (filtrer, mapper, reduce, etc.) Lorsque vous appelez collect() sur un
RDD ou un ensemble de données, toutes les données sont envoyées au Driver Le driver est la
machine qui exécute le code main(), il répartie les taches aux exécuteurs Le driver est la JVM qui
possède l’ objet appelé le SparkContext Spark application (souvent appelée Driver Program or
Application Master) est composé de SparkContext et d'un User Program
▪ Spark context : C’est une instance de l’application Spark ou une connexion
▪ Le Spark master aussi appelé le cluster manager : est chargé de négocier l’allocation de ressources
nécessaires aux JVMs. Il se charge également de trouver de l’espace et des ressources pour faire
tourner les workers et les exécuteurs. C est lui qui instancier les Workers
▪ Les Workers sont des JVMs auxquelles on a alloué des ressources (CPU, mémoire, cœurs etc…).
▪ Plus précisément, ce sont des conteneurs Spark dans lesquels vivent les exécuteurs. Sur un clusters
Hadoop, les workers sont généralement présents dans les datanodes.

▪ Les exécuteurs sont des agents situés dans les Workers et qui exécutent des « tasks»
▪ Ils sont dit multi-cœurs ou multithreads
▪ Quand le code est exécuté, les transformations de RDD sont traduites en DAG RDD (plan logique) et soumises à
▪ DAGScheduler qui génère un plan d'exécution physique (DAG de Stage) à partir de notre plan logique et le
soumet au
▪ TaskScheduler qui est responsable de l'envoi des tâches aux exécuteurs, de réessayer en cas d'échec
▪ Les tâches sont exécutées sur les WokersNode , leurs résultats sont retournés au Driver qui fera l’agrégation
Initialisation d’une session Spark
▪ Point d’entrée principal d’une exécution Spark (Main)
▪ A l’initialisation d’une Session (ex: spark-shell), l’objet SparkSession est disponible comme
avec le nom spark
import [Link]
val spark:SparkSession = [Link]()
.master("local")
.appName("mon application")
.getOrCreate()
[Link]
▪ SparkContext a été utilisé comme canal pour accéder à toutes les fonctionnalités de Spark.
De façon analogue au SparkContext est le point d’entrée de toutes les fonctions du module Spark SQL
Le Driver spark l’utlise pour se connecter au gestionnaire de ressources (YARN ou Mesos ..).
▪ SparkConf est requis pour créer l'objet Sparkcontexte , qui stocke les paramètres de configuration
comme appName (pour identifier votre driver ), l'application, le nombre de cœurs et la taille de la
mémoire de l'exécuteur dans les workers pour utiliser les API SQL, HIVE ……
▪ SQLContext est votre passerelle vers SparkSQL
Depuis la version 2.0 de Spark (juin 2016), on favorise une nouvelle classe nommée SparkSession
comme point d’entrée de Spark SQL. Cela étant, pour des raisons de comptabilité, la classe SQLContext
reste également valable.
RDD & DATASET / DATAFRAME
❑ Les RDDs
Resilient Distributed Dataset, sont des « collections » d’éléments partitionnées et distribuées à
travers les nœuds du cluster
C’est la structure fondamentale de base dans Spark
Les RDD permettent d’effectuer des opérations parallèles en mémoire sur un cluster.
Quelques caractéristiques :
▪ Traitement en mémoire
▪ Lazy evaluation
▪ Immuables
❑ Les RDDs : Création
Il existe deux (2) manières de créer un RDD
A partir d’une collection :
[Link](collection)
Exemple:
val fruits= Seq((1,"orange" ),(2,"pomme"),(3,"fraise"),(4,"citron"))
val fruitRDD = [Link](fruits)
[Link]().foreach(println)
A partir d’une source externe :
[Link]("cheminVersFichier")
❑ Les RDDs : Opérations
De manière générale, les rdds supportent 2 types d’opérations:
1. Les Transformations : 2. Les Actions :
A partir d’un RDD, retourne un autre RDD après un traitement A partir d’un RDD retourne une valeur
map(func) reduce(func)
filter(func) collect()
flatMap(func) count()
groupByKey(([numPartitions]) first()
reduceByKey(func, [numPartitions]) countByKey()
sortByKey([ascending], [numPartitions]) foreach(func)
join(otherDataset, [numPartitions ])
coalesce(numPartitions)
repartition(numPartitions)
Exemple : Les RDDs
le_renard_et_les_raisins.txt
Considérons le code suivant. Il permet de : Certain Renard Gascon, d'autres disent
Normand,
• Lire un fichier (le_renard_et_les_raisins.txt) Mourant presque de faim, vit au haut d'une
treille
• Convertir les lettres en majuscules Des raisins mûrs apparemment

• Récupère les lignes commençant par M


• Enfin affiche le nombre d’occurrence
val mydata = [Link]("le_renard_et_les_raisins.txt")
val mydata_uc = [Link](element => [Link]())
val mydata_filt = mydata_uc.filter(element => [Link](‘M'))
mydata_filt.count()
EXERCICE : Les RDDs
Ouvrez une session spark
Créer un RDD en se basant sur le fichier [Link] qui se trouve dans votre répertoire de base
Supprimer la première ligne (entête)
Créer un RDD contenant les PClass de manières distinctes
Calculer le nombre de survivant par TicketClass
Créer un RDD, ne contenant que les informations de Sex et Survived
Quel est le nombre de femmes qui ont survécus dans titanic ?
Créer un RDD ne contenant que les femmes.
Le max age des survivants et des morts ?
Quel est le nombre de ligne de [Link] ?
❑ Les DataFrame : Définition
▪ Est une collection distribuée de données
▪ Est organisé en colonnes nommées.
▪ Il est équivalent à une table dans une base de données relationnelle.
❑ Les DataFrame : Création
Il existe plusieurs manières de créer un DataFrame
▪ A partir d’une Collection
▪ A partir d’un RDD
▪ A partir d’une source de données Externes
❑ A partir d’un RDD
▪ Création
val plats = Seq(("Thieb djeune", "2000"), ("Mafe", "3000"), ("Soupou kandja", "3500"))
val plats_rdd = [Link](plats)
val plats_dataframe = plats_rdd.toDF("Menu", "Prix")
▪ Affichage
plats_dataframe.show(false)
▪ NB : La methode show() prend 2 arguments optionnelles :
Truncate : true | false Affiche les lignes du dataframe sans être tronquée
Nb_row : Int Le nombre de ligne du dataframe à afficher
❑ A partir d’une collection
▪ Création en utilisant ToDF
import [Link]._
val plats_list = List(("Thieb djeune", "2000"), ("Mafe", "3000"), ("Soupou kandja", "3500"))
val plats_dataframe = plats_list.toDF("Menu", "Prix")
Création en utilisant [Link]
val plats_list = List(("Thieb djeune", "2000"), ("Mafe", "3000"), ("Soupou kandja", "3500"))
val plats_dataframe = [Link](plats_list).toDF("Menu", "Prix")
❑ A partir d’une source externe
▪ Création en utilisant [Link].
▪ csv
▪ json
▪ avro
▪ parquet
▪ text / textFile
▪ orc
▪ jdbc
▪ Création en à partir d’une table en utilisant [Link]("")
Exemple : [Link]("select * from ingestion.personne_tmp")
❑ Ajout / Update de colonne d’un Dataframe
Spark WithColumn est une fonction qui permet de :
▪ D’ajouter une nouvelle colonne dans un DataFrame avec une valeur spécifique
▪ Si la colonne existe déjà, elle permet de mettre à jour le contenu de la colonne
▪ De changer le type de la colonne
Sa syntaxe est la suivante :
withColumn(colname: String, valeur: Column)
val DataFrame = [Link]("salutation_column", lit("Bonjour"))
Exemple :
val DataFrame2 = [Link]("salaire", col("salaire") * 2)
// résultat: renvoie un nouveau dataframe avec la colonne salaire multipliée par 2
val DataFrame3 = [Link]("salaire", col("salaire").cast("double"))
// résultat: renvoie un nouveau dataframe avec la colonne salaire convertie en double
❑ Renommer une colonne d’un Dataframe
Spark WithColumnRenamed est une fonction utilisée pour renommer une colonne d’un DataFrame
Sa syntaxe est la suivante :
.withColumnRenamed(oldColname: String, newColname: String)
val DataFrame = [Link]("Prix", "Montant")
Une autre méthode est également utilisée pour renommer des colonnes. Il s’agit de la .as() utilisé souvent dans :
• Les agrégations
• Un Select
Exemple :
.as(newColname: String)
val DataFrame = [Link](col("Prix").as("Montant"))
❑ Supprimer une colonne d’un Dataframe
Spark drop() est une fonction utilisée pour supprimer une ou plusieurs colonnes d’un DataFrame
Sa syntaxe est la suivante :
.drop(colname: String)
.drop(colnames: String*)
val DataFrame = [Link]("salaire")
val DataFrame = [Link]("Prix", "Montant")
❑ Récupérer les valeurs d’un Dataframe
Spark collect() est une fonction utilisée pour récupérer tous les éléments d’un Dataframe sur tous les noeuds.
Cette opération est très couteuse et ne doit être faite que sur les DataFrame de petites tailles
val DataFrame = [Link]()
❑ Supprimer les doublons d’une colonne
• Les doublons (ou ligne dupliquées) peuvent être supprimées en utilisant la méthode distinct() de Spark
Sa syntaxe est la suivante :
val DataFrame = [Link]()
Dans certains cas, nous pouvons être amenés à vouloir supprimer les doublons que sur une ou des colonnes
spécifiques
• La méthode Distinct() ne permet pas de choisir des colonnes particulières
• Dans ce genre de situation, il est préférable d’utiliser la méthode DropDuplicates()
Sa syntaxe est la suivante :
.dropDuplicates() ou .dropDuplicates(colnames : String*)
val DataFrame = [Link]()
val DataFrame = [Link]("nom", "prenom")
❑ Filtre sur une colonne : Filter / Where
• Les fonctions Filter() et Where() sont utilisées pour sélectionner des lignes d’un Dataframe/Dataset, basées sur
une ou plusieurs conditions
Sa syntaxe est la suivante :
.filter(condition : Column) ou .where(condition : Column)
.filter(ConditionExpression : String) ou .where(ConditionExpression : Column)
val DataFrame = [Link](col("montant") > 1000)
// résultat: renvoie un nouveau dataframe avec les lignes dont le montant est supérieur à 1000
val DataFrame = [Link]("gender == 'M'")
// résultat: renvoie un nouveau dataframe avec les lignes dont le genre est M
val DataFrame = [Link](col("montant") > 250 && col("produit")=== "lait")
// résultat: renvoie un nouveau dataframe avec les lignes dont le montant est supérieur à 250 et produit est égal à Lait
❑ Ajout de valeur à partir de conditions
A l’instar de SQL, il est possible d’ajouter des valeurs à des colonnes suivants des conditions.
Les fonctions When() – Otherwise() sont utilisées dans ce cas
Sa syntaxe est la suivante :
[Link]("colname", when(condition, "valeur").otherwise(default_value))
val DataFrame = [Link]("resultat",when(col("code")===200,
"OK").otherwise("KO"))
// résultat: renvoie un nouveau dataframe avec la colonne "resultat" qui contient la valeur OK en cas de
code 200
❑ Les DataFrame : Windows
C’est des fonctions de fenêtre dans Spark SQL qui attribue un numéro de ligne (numéro entier
séquentiel) à chaque ligne du DataFrame résultat. Cette fonction est utilisée pour
[Link]() partitionner les données en cadres Windows et en clause orderBy() pour
trier les lignes de chaque partition. (Fonctions : row_number, dense_rank, rank)
Ex : val windowSpec = [Link]("department").orderBy("salary")
[Link]("row_number",row_number.over(windowSpec)).show()
❑ Les DataFrame : Agrégation
Les requêtes d'agrégation effectuent un calcul sur un ensemble de valeurs et renvoient une seule
valeur
Les agrégations sont des opérations très courantes lors de manipulation de données
La méthode groupBy permet de regrouper les données selon les différentes modalités de la
colonne sélectionnée.
Ex : [Link]("col1").sum("col2").show(false)
[Link]("department").agg(sum("salary").as("sum_salary"),
avg("salary").as("avg_salary"))
Autres fonctions :
count(), countDistinct, min(), max(), avg(), etc…
Exercice :
Soit le dataframe suivant
import [Link]._
val simpleData = Seq(("James","Sales","NY",90000,34,10000),
("Michael","Sales","NY",86000,56,20000), ("Robert","Sales","CA",),
("Maria","Finance","CA",90000,24,23000), ("Raman","Finance","CA",99000,481000,30,230000,24000),
("Scott","Finance","NY",83000,36,19000), ("Jen","Finance","NY",79000,53,15000),
("Jeff","Marketing","CA",80000,25,18000), ("Kumar","Marketing","NY",91000,50,21000) )
val df = [Link]("employee_name","department","state","salary","age","bonus")
1. Exécuter le code ci-dessus et afficher le dataframe « df »
2. Afficher la somme des salaires par Department
3. Afficher le bonus minimum en fonction des department et State
4. Afficher dans un même dataframe la moyenne des salaire, la somme des bonus et le max
bonus par department à condition que la somme des bonus soit supérieure à 40000
Les DataFrame : Jointure
Les types de jointures supportées par les DataFrame sont :
▪ Inner
▪ Outer
▪ Left / right_outer
▪ Leftsemi
▪ Leftanti
▪ unionAll()
Syntaxe générale :
df_with_agg = [Link](df2, df1(colJointure1) == df2(colJointure2), ‘type_jointure')
df_with_agg = [Link](df2, Seq(colJointure), ‘type_jointure')
Les DataFrame : Lecture et Ecriture dans Spark
▪ Lecture fichier CSV
[Link]
.format("csv")
.option("header","true")
.option("delimiter", ",")
.csv("cheminFichier_CSV")
▪ Ecriture en fichier CSV
[Link]
.format("csv")
.mode("mode") // mode = append | overwrite | ignore | errorifexists
.option("header","true")
.option("delimiter", ",")
.save("cheminFichier_CSV")
Les DataFrame : Lecture et Ecriture dans Spark
▪ Lecture fichier Parquet
[Link].("cheminFichier_parquet")
▪ Ecriture en fichier Parquet
[Link]
.format("parquet")
.mode("mode") // mode = append | overwrite | ignore | errorifexists
.parquet("cheminFichier_parquet")
Les DataFrame : Lecture et Ecriture dans Spark
▪ Lecture fichier Avro
[Link]("cheminFichier_avro")
[Link]("[Link]").avro("cheminFichier_avro")
▪ Ecriture en fichier Avro
[Link]
.format("[Link]")
.mode("mode") // mode = append | overwrite | ignore | errorifexists
.avro("cheminFichier_avro")
Les DataFrame : Lecture et Ecriture dans Spark
▪ Lecture fichier Json
[Link]("multiline", "true").json("cheminFichier_Json")
▪ Ecriture en fichier Json
[Link]
.mode("mode") // mode = append | overwrite | ignore | errorifexists
.json("cheminFichier_json")
Les DataFrame : Lecture et Ecriture dans Spark
▪ Lecture fichier Hive
[Link]("select ……..from [Link]")
▪ Ecriture en fichier Hive
[Link]
.format("format") // format = parquet | orc …
.mode("mode") // mode = append | overwrite | ignore | errorifexists
.option("path","cheminStorage")
. partitionBy("col1", "col2", "colN")
.saveAsTable("[Link]")
Exercice :
Nous allons étudier les cas de Covid par pays. Pour cela considérez le fichier
[Link] contenant les colonnes suivantes
Colonne Définition
date : La date
location : Le pays affécté
variant : Le type de covid
num_sequences : Le nombre de cas par type de covid
perc_sequences : Le pourcentage des cas de covid par type
numsequencestotal : Le nombre total de cas par pays
Dans votre projet Intellij, créez un sous répertoire ressources dans le répertoire src et déplacez y le
fichier [Link]
Exercice :
1. Chargez le fichier CSV dans une variable nommée covid-variants_df
1.1 Afficher le DataFrame obtenu
2. Afficher le Schema du dataFrame obtenu
Tips : La méthode « [Link] » permet d’afficher le schéma de monDataFrame
3. Renommer la colonne « location » en « pays »
4. Pour manipuler les colonnes, il existe une fonction col qui se trouve dans
[Link]. Veuillez l’importer dans votre code.
5. Changer le type des colonnes « perc_sequences » en double, « num_sequences_total » en int
Exercice :
6. Supprimez d’abord la colonne : num_sequences
7. Enlever par un filtre les lignes de la colonne « variant » contenant la valeur « non_who ».
8. Ajouter une colonne « total_covid_par_jour » qui contient le nombre de cas de covid par jour et par
pays
9. Afficher les cas du Sénégal pour les journées du « 2021-01-04 », « 2020-06-22», « 2020-08-03 »
10. Ecrire le DataFrame obtenu à la question 8 sur une fichier csv nommée covid_afrique_adn
Exercice :
Créer 2 dataframes en lisant les fichier [Link] et [Link] avec le delimiter |
Supprimer la colonne Country
Effectuer une jointure les 2 dataframes en utilisant comme clé la colonne PRODUCT ID
Supprimer les doublons en se basant sur la colonne PRODUCT ID
Créer une colonne nommée cout en multipliant la quantity et le Unit_price
Modifier le nom de la colonne discount en remise
Compter le nombre de Consumer de la colonne Segment
Agréger la colonne région et calculer la somme et la moyenne
Appliquer un row_number en partitionner les colonnes State et Region et en ordonnant de
manière descendante sur la colonne cout
MERCI

Vous aimerez peut-être aussi