Apache Spark
Mohamed El Marouani
TDIA 2
1
Le problème Big Data
Évolution des performances des processeurs :
Pendant longtemps, les processeurs devenaient plus rapides chaque année, permettant aux applications de
s'exécuter plus vite sans modification de code.
Fin de l’augmentation des vitesses de processeur (vers 2005) :
En raison des limites physiques liées à la dissipation thermique, les fabricants ont cessé d’augmenter la vitesse
des processeurs et ont opté pour l’ajout de cœurs parallèles.
Nécessité du parallélisme :
Avec cette évolution, les applications doivent être modifiées pour tirer parti du parallélisme afin d’améliorer
leurs performances, ce qui a conduit au développement de nouveaux modèles de programmation comme
Apache Spark.
Croissance continue des capacités de stockage et de collecte de données :
Contrairement aux processeurs, le coût du stockage continue de baisser (divisé par deux tous les 14 mois),
rendant la conservation de grandes quantités de données accessible à toutes les organisations.
2
Le problème Big Data
Amélioration des technologies de collecte de données :
Les capteurs, caméras et autres dispositifs deviennent de plus en plus performants et abordables, facilitant la
collecte massive de données (les dispositifs de l’IoT).
Explosion du volume de données à traiter :
Les entreprises collectent de plus en plus de données, mais leur traitement nécessite d’importantes capacités
de calcul parallélisé sur des clusters de machines.
Inadaptation des anciens logiciels et modèles de programmation :
Les logiciels traditionnels ne sont pas conçus pour s’adapter automatiquement à cette nouvelle réalité, d’où la
nécessité de nouvelles approches.
Apache Spark comme réponse à ce besoin :
Il a été conçu spécifiquement pour ce nouvel environnement nécessitant des calculs massivement parallèles
sur de grandes quantités de données.
3
Comparatif Hadoop vs Spark
Critère Limitations d’Apache Hadoop (MapReduce) Solutions apportées par Apache Spark
Basé sur le modèle MapReduce, qui fonctionne par étapes Utilise un traitement en mémoire (in-memory computing),
Modèle de
séquentielles avec lecture et écriture sur disque entre chaque réduisant les opérations d’E/S sur disque et accélérant le
traitement tâche. traitement.
Jusqu’à 100 fois plus rapide que Hadoop MapReduce en
Lenteur due aux multiples accès disque pour chaque étape du
Performance exploitant la mémoire RAM pour stocker les données
traitement.
intermédiaires.
MapReduce ne gère que des tâches par batch (traitement
Gestion des Supporte les traitements en batch, interactifs (Spark SQL) et en
par lots), ce qui le rend inefficace pour les traitements en
tâches temps réel ou interactifs.
streaming temps réel (Spark Streaming).
Simplicité du Programmation complexe nécessitant beaucoup de code API plus simple et expressive, disponible en Scala, Python, Java et
code Java pour implémenter les étapes Map et Reduce. R, facilitant le développement.
Tolérance Utilise un mécanisme de résilience basé sur les RDDs (Resilient
Tolérance assurée via la réplication des données sur HDFS, ce
Distributed Datasets), qui permet de recomputer uniquement les
aux pannes qui augmente le temps d’accès aux données.
partitions perdues sans nécessiter de réplication complète.
Flexibilité Supporte le batch, le streaming, l'apprentissage automatique
Conçu principalement pour le traitement batch, avec des
(MLlib) et le traitement des graphes (GraphX) sur une seule
extensions limitées pour d’autres cas d’usage.
plateforme.
Compatibilité Dépend d’HDFS pour le stockage et fonctionne uniquement Peut s’intégrer avec HDFS, HBase, Cassandra, S3 et d’autres
avec Hadoop avec MapReduce. systèmes de stockage distribués.
4
Spark: Historique
5
Applications Spark
Structure d’une application Spark : Composée d’un processus Driver et
d’un ensemble de processus Executors.
Rôle du driver :
o Exécute la fonction main() et orchestre l’application.
o Maintient les informations sur l’application en cours.
o Analyse, distribue et planifie les tâches sur les executors.
Rôle des Executors :
o Exécutent le code assigné par le driver.
o Rapporte l’état des calculs au driver.
Gestion des ressources : Un cluster manager suit et alloue les ressources
nécessaires.
Flexibilité des API : Le driver peut être piloté depuis différents langages
grâce aux API de Spark.
6
Applications Spark
Sur ce schéma, nous pouvons voir le driver à gauche et quatre executors à droite.
Dans ce schéma, nous avons supprimé le concept de nœuds de cluster.
L’utilisateur peut spécifier, via des configurations, combien d’executors doivent être
assignés à chaque nœud.
Spark, en plus de son mode cluster, dispose également d’un mode local. Le driver et les
executors sont simplement des processus, ce qui signifie qu’ils peuvent s’exécuter sur la
même machine ou sur des machines différentes.
En mode local, le driver et les executors s’exécutent sous forme de threads sur votre
ordinateur personnel au lieu d’un cluster.
Vous contrôlez votre application Spark via un processus driver appelé SparkSession.
L’instance de SparkSession est le moyen par lequel Spark exécute les manipulations définies
par l’utilisateur à travers le cluster.
Il existe une correspondance un-à-un entre une SparkSession et une application Spark.
En Scala et Python, la variable est disponible sous le nom spark lorsque vous démarrez la
console.
Voyons maintenant la SparkSession en Scala et/ou Python. Par exemple on aura:
res0: [Link] = [Link]@...
Ou
<[Link] at 0x7efda4c1ccd0>
7
Spark: DataFrames
DataFrame : L'API structurée la plus courante, représentant une table de données avec des lignes et des colonnes.
Schéma : Liste définissant les colonnes et les types de données au sein de ces colonnes.
Concept de DataFrame : À considérer comme un tableur avec des colonnes nommées.
Différence fondamentale :
o Un tableur est localisé sur une seule machine.
o Un Spark DataFrame peut s'étendre sur des milliers de machines.
Pourquoi répartir les données sur plusieurs machines ? :
o Les données sont trop volumineuses pour tenir sur une seule machine.
o Le traitement des données sur une seule machine prendrait trop de temps.
DataFrame dans R et Python : Concepts similaires, mais les DataFrames dans Python/R existent sur une seule machine, limitant les
ressources disponibles.
Interopérabilité avec Spark :
Spark permet de convertir facilement des Pandas DataFrames (Python) et des DataFrames R en Spark DataFrames.
8
Spark: DataFrames
Partitions: Pour permettre à chaque executor d'exécuter des tâches en parallèle, Spark divise les données en partitions.
Définition d'une partition : Une partition est un ensemble de lignes stockées sur une machine physique dans le cluster.
Distribution des données : Les partitions d'un DataFrame représentent la manière dont les données sont physiquement réparties à
travers le cluster pendant l'exécution.
Parallélisme :
o Si vous avez une seule partition, Spark n’a qu’un parallélisme de un, même si vous avez des milliers d'executors.
o Si vous avez plusieurs partitions, mais un seul executor, le parallélisme restera de un car il n’y a qu’une ressource de calcul.
Manipulation des partitions :
o En général, vous ne manipulez pas les partitions de manière manuelle ou individuelle dans Spark.
o Vous spécifiez des transformations de haut niveau sur les données dans les partitions, et Spark détermine comment
exécuter cette tâche sur le cluster.
9
Spark: Transformations
Structures de données immutables :
Dans Spark, les structures de données sont immutables, ce qui signifie qu’elles ne peuvent pas
être modifiées après leur création.
Modification d’un DataFrame : Pour "modifier" un DataFrame, il faut donner des instructions à
Spark sur la façon de le modifier, ces instructions sont appelées des transformations.
Exemple de transformation : Trouver tous les nombres pairs dans un DataFrame :
Scala : val divisBy2 = [Link]("number % 2 = 0")
Python : divisBy2 = [Link]("number % 2 = 0")
Aucune sortie : Ces transformations ne retournent aucune sortie car elles ne spécifient qu'une
transformation abstraite. Spark ne l'exécutera pas avant qu'une action ne soit appelée.
Types de transformations : Il existe deux types de transformations :
Transformations étroites (narrow transformations) :
Chaque partition d'entrée contribue à une seule partition de sortie.
Exemple : la déclaration where crée une dépendance étroite.
Transformations larges (wide transformations) :
Les partitions d’entrée contribuent à plusieurs partitions de sortie.
Cela implique souvent un shuffle, où Spark échange des partitions à travers le cluster.
Les transformations sont des modes d’expression des manipulations de données. Cela mène au
concept de Lazy Evaluation (évaluation paresseuse).
10
Spark: Lazy Evaluation
Définition du concept
L’évaluation paresseuse signifie que Spark ne calcule pas immédiatement les transformations appliquées à un DataFrame ou
un RDD.
Au lieu d’exécuter chaque transformation une par une dès qu’elle est appelée, Spark attend qu’une action soit déclenchée
pour exécuter toutes les transformations de manière optimisée.
Fonctionnement de l'évaluation paresseuse
o Lorsque vous appliquez une transformation (comme filter(), map(), select(), etc.), Spark enregistre cette transformation
dans un plan logique d’exécution au lieu de l’exécuter immédiatement.
o Ce plan logique est un DAG (Directed Acyclic Graph) qui représente toutes les transformations demandées.
o L’exécution réelle ne démarre que lorsqu’une action est déclenchée (count(), show(), collect(), write(), etc.).
o À ce moment-là, Spark optimise et compile l’ensemble des transformations avant de les exécuter efficacement.
11
Spark: Lazy Evaluation
Exemple d’évaluation paresseuse
Étape 1 : [Link](df["age"] > 30) → Spark enregistre la transformation dans le plan d’exécution.
Étape 2 : filtered_df.select("name", "age") → Encore une transformation enregistrée, mais rien n'est exécuté.
Étape 3 : selected_df.show() → Spark exécute toutes les transformations en une seule fois, après avoir optimisé le plan.
12
Spark: Lazy Evaluation
Pourquoi Spark utilise l’évaluation paresseuse ?
1. Optimisation du plan d’exécution
o Spark peut analyser l’ensemble du plan logique avant d’exécuter quoi que ce soit.
o Cela permet d’appliquer des optimisations comme le réarrangement des opérations pour minimiser le nombre de
calculs.
2. Réduction des coûts de calcul
o Plutôt que d’effectuer une transformation à chaque étape, Spark regroupe les opérations pour réduire les
lectures/écritures disque et les transferts réseau.
o Par exemple, si plusieurs filtres sont appliqués, Spark les fusionne pour ne parcourir les données qu’une seule fois.
3. Exécution en mémoire efficace
o Spark peut éviter des calculs inutiles et ne charger que les données nécessaires en mémoire.
o Exemple : Predicate Pushdown
Si vous appliquez un filtre à un DataFrame lu depuis une base de données, Spark envoie directement cette
condition à la base au lieu de charger toutes les données et de filtrer après.
“Predicate Pushdown is an optimization technique applied in data processing systems to improve query
performance by filtering data as early as possible in the query execution pipeline.”
13
Spark: Lazy Evaluation
Différence entre transformations et actions
Type Explication Exemples
Opérations enregistrées dans le plan d’exécution mais
Transformations filter(), map(), select(), groupBy()
non exécutées immédiatement
Déclenchent l'exécution réelle des transformations et
Actions show(), count(), collect(), write()
retournent un résultat
Impact des transformations narrow vs wide
L’évaluation paresseuse impacte aussi le type de transformation :
Narrow transformations / Transformations étroites (ex. : filter(), map())
Peuvent être exécutées en chaîne (pipelining) en mémoire sans écriture disque.
Wide transformations / Transformations larges (ex. : groupBy(), join())
Nécessitent un shuffle (réorganisation des données sur plusieurs machines), ce qui génère une écriture disque et
prend plus de temps.
14
Spark: Lazy Evaluation
Exemple / Etapes détaillées d’exécution:
1. Construction du plan logique
Dès que vous écrivez ce code, Spark ne charge pas immédiatement les données et n'applique pas les transformations.
Il enregistre simplement la suite d'opérations demandées sous forme d'un plan logique (DAG - Directed Acyclic Graph).
Le plan logique contient :
o Lire le fichier CSV ([Link]("[Link]"))
o Appliquer le filtre (filter(df["age"] > 30))
o Sélectionner uniquement les colonnes name et age (select("name", "age"))
À ce stade, Spark n’exécute encore rien.
15
Spark: Lazy Evaluation
2. Optimisation du plan logique
Avant l’exécution, Spark optimise le plan d’exécution en appliquant plusieurs stratégies d'optimisation via Catalyst Optimizer :
o Fusion des opérations pour éviter plusieurs parcours inutiles des données.
o Predicate Pushdown : S’il est possible d’exécuter le filtre directement au niveau de la source des données (ex. base de
données ou fichier Parquet), Spark l'applique avant la lecture complète.
o Pruning des colonnes (Column Pruning) : Au lieu de charger toutes les colonnes du CSV, Spark sait qu’on a besoin
uniquement de name et age, donc il ne chargera que celles-ci.
Après ces optimisations, le plan physique devient quelque chose comme :
1. Lire uniquement les colonnes name et age (au lieu de tout le fichier).
2. Appliquer le filtre age > 30 directement lors de la lecture.
3. Retourner le résultat dans la console.
3. Exécution du plan optimisé
Lorsque [Link]() est appelé, Spark exécute toutes les transformations en une seule passe optimisée :
Il lit uniquement les colonnes name et age du fichier CSV (grâce au Column Pruning).
Il applique directement le filtre age > 30 pendant la lecture des données (Predicate Pushdown si applicable).
Il affiche le résultat à l’écran, sans stocker inutilement les données intermédiaires.
16
Spark: Structured API
Les Structured APIs en Apache Spark font référence aux interfaces de haut
niveau permettant de manipuler les données sous forme de structures tabulaires.
Ces interfaces incluent DataFrames, Datasets et SQL, et elles s'appuient sur un
modèle de programmation déclaratif optimisé par Catalyst, le moteur de
planification de requêtes de Spark.
L’architecture globale de Spark repose sur plusieurs couches où les Structured
APIs jouent un rôle clé :
Interface utilisateur (User Interface Layer)
Les utilisateurs interagissent avec Spark via les Structured APIs
(DataFrames, Datasets, Spark SQL).
Optimisation et Planification (Catalyst Optimizer & Tungsten Engine)
Spark optimise les requêtes en générant un plan d’exécution efficace
basé sur Catalyst et Tungsten.
Exécution distribuée (Spark Execution Engine)
Les tâches sont converties en RDDs et exécutées sur un cluster en
utilisant le moteur d’exécution distribué de Spark.
Stockage et Sources de données
Spark Structured APIs prennent en charge plusieurs sources de
données : Parquet, JSON, JDBC, HDFS, et bien d’autres.
17
Spark: Dataframes vs Datasets
DataFrames
o Collection de données organisée en colonnes, similaire à une table relationnelle.
o Basé sur des transformations déclaratives optimisées par Catalyst.
o Disponible en Scala, Java, Python et R.
o Fortement typé en Scala/Java, mais dynamique en Python/R.
o Optimisé pour la performance grâce à Tungsten.
Datasets
o Version typée des DataFrames, disponible uniquement en Scala et Java.
o Offre un typage statique, permettant de détecter les erreurs à la compilation.
o Permet une manipulation orientée objet et une sérialisation optimisée via encoders.
o Moins flexible que les DataFrames, mais plus robuste pour le contrôle du typage.
Schéma:
o Chaque colonne a un nom et un type de données défini, et l’ensemble des colonnes forme un schéma.
18
Spark: Les types
19
Spark: L’exécution des Structured API
1. Définition de la Requête (Plan Logique Initial)
L’utilisateur écrit une requête sous forme de DataFrame, Dataset ou SQL.
Cette requête est représentée sous la forme d’un Plan Logique Non Optimisé qui décrit les transformations demandées.
2. Analyse et Optimisation (Catalyst Optimizer)
Analyse syntaxique et sémantique : Spark vérifie la validité de la requête et génère un Plan Logique Analysé.
Optimisation logique : Catalyst applique plusieurs optimisations comme :
o Prédicat pushdown (déplacement des filtres pour minimiser les données lues).
o Projection pruning (lecture uniquement des colonnes nécessaires).
o Réarrangement des jointures pour réduire la latence.
Plan Logique Optimisé : Après ces transformations, Spark dispose d’un plan efficace.
20
Spark: L’exécution des Structured API
3. Génération du Plan Physique
Catalyst traduit le Plan Logique Optimisé en un Plan Physique, qui détermine la stratégie d'exécution réelle.
Sélection du meilleur plan basé sur le coût (ex. broadcast join si une table est suffisamment petite).
4. Génération de Code et Exécution (Tungsten Engine)
Génération de bytecode : Spark compile des parties du code en instructions bas-niveau pour optimiser l'exécution.
Optimisation de la gestion mémoire : Utilisation de structures binaires compactes pour limiter la surcharge du Garbage
Collector.
Exécution parallèle sur le cluster :
o Le plan est découpé en tâches distribuées (RDDs).
o Chaque tâche est exécutée sur un Worker Node dans le cluster Spark.
5. Retour des Résultats
Une action comme .show() ou .collect() déclenche réellement l'exécution.
Les résultats sont agrégés et retournés à l'utilisateur.
21
Spark: L’exécution des Structured API
22
Spark: Catalyst Optimizer & Tungsten Engine
Le Catalyst Optimizer est le moteur d'optimisation de requêtes d'Apache Spark utilisé pour améliorer les performances des
Structured APIs (DataFrames, Datasets et Spark SQL). Il applique une série de transformations et d'optimisations pour générer
un plan d'exécution optimisé avant l'exécution du code. Catalyst repose sur des techniques comme :
o Optimisation logique : Réécriture des requêtes pour réduire la complexité (ex. suppression des colonnes inutiles,
réarrangement des filtres).
o Optimisation physique : Choix du meilleur plan d'exécution (ex. tri fusionné, jointures diffusées).
o Génération de code : Conversion en bytecode Java pour une exécution plus efficace.
Le Tungsten Engine est un ensemble d'optimisations au niveau bas du moteur Spark, conçu pour maximiser l'efficacité de
l'utilisation du CPU et de la mémoire. Il repose sur plusieurs techniques avancées :
o Gestion optimisée de la mémoire : Évite les objets Java inutiles et utilise des structures binaires compactes pour réduire la
surcharge du Garbage Collector.
o Traitement vectorisé : Exécute des opérations en blocs pour exploiter l’architecture moderne des processeurs (SIMD).
o Génération de bytecode : Compile certaines parties du code en instructions bas-niveau pour améliorer la rapidité
d'exécution.
23
Structured API: opérations basiques
Les opérations se divisent en plusieurs catégories : transformations, actions, et opérations SQL.
1. Transformations:
a) Sélection de colonnes (select, drop)
b) Filtrage des données (filter, where)
c) Tri des données (orderBy, sort)
d) Suppression des doublons (distinct, dropDuplicates)
24
Structured API: opérations basiques
e) Transformation de colonnes (withColumn, withColumnRenamed)
e) Agrégation des données (groupBy, agg)
f) Jointures (join)
25
Structured API: opérations basiques
2. Actions:
a) Affichage et récupération des données
b) Comptage et statistiques
c) Écriture des données
26
Structured API: opérations basiques
3. Opérations SQL:
a) Création d’une vue temporaire
b) Requêtes SQL directes
27