Scala
Scala
2
INITIATION À SPARK AVEC JAVA 8 ET SCALA
A propos
Apache Spark se présente comme la nouvelle génération de moteur de calcul distribué qui remplace
progressivement Hadoop/MapReduce.
L'objet de ce Hands-on Labs est de vous familiariser par la pratique au traitement massif et distribué dans
le domaine du data crunching et du machine learning. A l'issue de cette session, vous serez familiers
avec :
Les Resilient Data Sets (RDD) qui désignent l’abstraction essentielle pour la manipulation distribuée
des données.
Les animateurs
Apache Spark Certified Developer, Hayssam Saleh is Senior Architect with a focus on fault tolerant
distributed systems and Web applications. He is currently the technical lead at EBIZNEXT where he
supervises Scala / NoSQL projects since early 2012. He holds a PhD in distributed computing from
Université Pierre et Marie Curie (Paris VI).
Blog: http://blog.ebiznext.com
Introduction 3
INITIATION À SPARK AVEC JAVA 8 ET SCALA
Blog: http://ogirardot.wordpress.com
Introduction 4
INITIATION À SPARK AVEC JAVA 8 ET SCALA
Pré-requis
Industrialisation en Java
mkdir hands-on-spark-java/
cd hands-on-spark-java/
Dans le pom.xml :
<groupId>org.devoxx.spark.lab</groupId>
<artifactId>devoxx2015</artifactId>
<version>1.0-SNAPSHOT</version>
<repositories>
<repository>
<id>Apache Spark temp - Release Candidate repo</id>
<url>https://repository.apache.org/content/repositories/orgapachespark-1080/</
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.3.1</version>
<!--<scope>provided</scope>--><!-- cette partie là a été omise dans notre projet pour pou
</dependency>
</dependencies>
<build>
<plugins>
<!-- we want JDK 1.8 source and binary compatiblility -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Industrialisation en Scala
mkdir hands-on-spark-scala/
cd hands-on-spark-scala/
name := "Devoxx2015"
version := "1.0"
scalaVersion := "2.11.2"
crossScalaVersions := Seq(scalaVersion.value)
Seq(Revolver.settings: _*)
test in assembly := {}
Pour utiliser le plugin SBT-Assembly qui permet de générer via la commande sbt assembly un jar
livrable pour Spark ne contenant ni Spark, ni Scala, mais toutes les autres dépendances nécessaires au
projet.
Pour pouvoir soumettre le projet à un cluster en local, il vous faut télécharger ou récupérer par clé USB le
binaire pré-compilé de Spark.
Si vous ne l'avez pas récupérer par une clé autour de vous, vous pouvez télécharger Spark 1.3.0 en
cliquant ici !
Pour installer à proprement dit Spark, il vous suffit de dézipper le fichier télécharger.
Historique
Pour comprendre Spark, il faut comprendre son contexte historique, Spark emerge près de 8 ans après
les début du Map Reduce chez Google.
Concepts
RDD
L'abstraction de base de Spark est le RDD pour Resilient Distributed Dataset, c'est une structure de
donnée immutable qui représente un Graph Acyclique Direct des différentes opérations à appliquer aux
données chargées par Spark.
Un calcul distribué avec Spark commence toujours par un chargement de données via un Base RDD.
Plusieurs méthodes de chargements existent, mais pour résumé, tout ce qui peut être chargé par Hadoop
peut être chargé par Spark, on s'appuie pour se faire sur un SparkContext ici nommé sc :
En premier lieu, on peut charger dans Spark des données déjà chargées dans une JVM via
sc.paralellize(...) , sinon on utilisera
sc.textFile("hdfs://...") pour charger des données fichiers depuis HDFS ou le système de
fichier local
sc.hadoopFile(...) pour charger des données Hadoop
sc.sequenceFile(...) pour charger des SequenceFile Hadoop
Transformations et Actions
2 concepts de base s'appuient et s'appliquent sur le RDD,
les Transformations
les Actions
Les Transformations sont des actions lazy ou à évaluation paresseuse, elles ne vont lancer aucun calcul
sur un Cluster.
De plus les RDD étant immutables, une transformations appliquée à un RDD ne va pas le modifier mais
plutôt en créer un nouveau enrichit de nouvelles informations correspondant à cette
transformation.
Une fois que vous avez définit toutes les transformations que vous voulez appliquer à votre donnée il
suffira d'appliquer une Action pour lancer le calcul sur votre Cluster ou CPU locaux (selon le
SparkContext utilisé c.f. ci-après)
Le RDD ne correspond en fait qu'à une sorte de plan d'exécution contenant toutes les informations de
quelles opérations vont s'appliquer sur quelle bout ou partition de données.
Spark Context
Le SparkContext est la couche d'abstraction qui permet à Spark de savoir où il va s'exécuter.
Un SparkContext standard sans paramètres correspond à l'exécution en local sur 1 CPU du code Spark
qui va l'utiliser.
val sc = SparkContext()
// on peut ensuite l'utiliser par exemple pour charger des fichiers :
En java :
La création
Les transformations
L'action
Le RDD peut subir des transformations successives au travers de fonctions similaires à celles présentes
dans les collections "classiques. on peut citer par exemple :
map : renvoie un nouveau RDD avec application d'une fonction de transformation sur chacun des
objets du RDD initial
filter : renvoie un nouveau RDD qui contiendra un sous ensemble des données contenues dans
le RDD initial.
Les opérations de création et de transformation de RDDs ne déclenchent aucun traitement sur les
noeuds du cluster. Seul le driver est sollicité. Le driver va construire un graphe acycilique dirigé des
opérations qui seront être exécutées sur le cluster au moment de l'application d'une action.
Une seule action peut être appliquée. Elle consiste à exécuter une opération sur tous les noeuds du
cluster et à renvoyer le résultat au driver. L'action peut ne produire aucun résultat, l'action foreach par
exemple,ou produire un résultat qui soit un objet ou une collection. On peut citer en exemple :
reduce qui renvoie un objet unique (Equivalent à Java reduce et Scala reduce)
take qui renvoie les n premiers éléments d'un RDD (Equivalent à Java take et Scala take)
Un RDD peut être créé à partir d'une collection ou d'une source de données.
val jdbcInputRDD = new JdbcRDD(sc, connect, "select age, name from Account offset ? limit ?"
numPartitions = 10, mapRow = readRecord)
textFile et wholeTextFile permettent de créer un RDD à partir d'un fichier unique ou d'un répertoire.
Dans ce deuxième cas, tous les fichiers présents dans le répertoire seront chargés.
textFile va créer un RDD dont chaque élément correspond à une ligne du fichier
wholeTextFile va créer un RDD dont chaque élément contiendra le nom du fichier (la clef: premier
élément du tuple) et le contenu du fichier en entier (la valeur : deuxième élément du tuple). cette
méthode est adaptée lorsqu'une ligne du fichier ne correspond pas à un enregistrement, cela
concerne notamment les fichiers JSON et XML par exemple.
Nous pouvons trier les utilisateurs par ordre croissant en utilisant la méthode `òrderBy``
Une fois l'action exécutée, l'ensemble des données résultant de cette action est rapatriée sur le driver.
L'action est celle qui déclenche l'exécution sur les noeuds du cluster de la création, de la transformation,
de l'action et du rapatriement des données issues des calculs sur le driver.
L'exemple ci-dessus illustre bien que l'action conditionne le début et la fin de l'exécution du traitement sur
les noeuds du cluster.
Un opération sur un RDD est une action lorsqu'elle ne renvoie pas de RDD mais qu'elle renvoie un objet
ou une collection Scala standard.
Certaines actions peuvent ne rien renvoyer du tout, comme cela est le cas pour l'action foreach .
object Workshop1 {
println( s"""
count=$count
min=$min
mean=$mean
max=$max
""")
}
}
Le cache
Dans l'exemple ci-dessus le fichier sera lu deux fois, une première fois à l'exécution de l'action count et
une seconde fois à l'action first qui renvoie le premier élément du RDD (la ligne la plus longue).
// On rajoute l'appel à persist qui indique à Spark que le RDD ne doit pas être ``déchargé`` suite à
val lines = sc.textFile("...").persist(StorageLevel.MEMORY_AND_DISK)
object Workshop2 {
println( s"""
count=$count
min=$min
mean=$mean
max=$max
""")
}
}
Spark offre des fonctionnalités spécifiques aux RDD clef-valeur - RDD[(K,V)] . Il s'agit notamment des
fonctions groupByKey, reduceByKey, mapValues, countByKey, cogroup ,
https://spark.apache.org/docs/1.3.0/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions
// ...
// Indice : Modifier les transformations précédentes pour avoir un tuple avec une seule valeur si
// ...
// Réaliser une variante en utilisant cogroup
// ...
Les accumulateurs
Le broadcast
Le partitionnement
La sérialisation
L'option MEMORY_AND_DISK indique que Spark peut persister les données sur le disque si la mémoire est
insuffisante. D'autres options sont disponibles et permettent par exemple de réduire l'empreinte mémoire
des objets (au détriment de la CPU toutefois).
Si les opérations requièrent un nombre important d'échange entre les noeuds du cluster, un algorithme de
sérialisation performant en temps d'exécution et capable de générer une représentation plus compacte
peut avoir un impact positif sur la performance globale.
Spark permet de personnaliser le librairie de sérialisation. L'exemple ci-dessus illustre la mise en oeuvre
de la dé/sérialisation avec Kryo.
Les accumulateurs
Consiste à définir une variable qui sera mise à jour par l'ensemble des noeuds du cluster lors de
l'exécution de l'action. Cette variable pourra ensuite être lue sur le driver.
Un accumulateur ne doit jamais être mis à jour dans le cadre d'une transformation. Spark étant
susceptible de rexécuter les transformations en cas de panne, cela résulterait à une double mise à jour
de la valeur de l'accumulateur.
Un accumulateur est mis à jour par les noeuds du cluster et consulté au niveau du driver.
Le broadcast
Le broadcast permet diffuser sur tous les neouds du cluster, des données de référence qui pourront être
accédées par les transformations et l'action lors de l'exécution.
val labels: Broadcast[Map[Int, String]] = sc.broadcast(Map(1 -> "Un", 2->"2", 3 -> "3"))
val rdd = sc.parallelize(1 to 3)
rdd.foreach { x =>
// Affiche les bales correspondant aux identifiants numériques
// en s'appuyant sur les labels diffusés sur chacun des noeuds
println(labels.value(x))
}
Le partitionnement
Lorsqu'un RDD est utilisé plusieurs fois de suite, dans des actions différentes et donc dans des cycles
distincts d'exécution du RDD, Spark est amené à recalculer le partitionnement des données sur le cluster.
Cela peut avoir un effet négatif sur les performances.
Dans un système distribué les communications sont coûteuses, il est donc impératif de les limiter en
renforçant la colocalisation des données. Spark permet d'indiquer la règle de répartition des données sur
les partitions, par contre cela a un intérêt uniquement dans le cas des opérations sur les clefs dans le
cadre des RDDs clefs/valeurs. Cela concerne notamment les opérations de type join , cogroup ,
reduceByKey
Le résultat du partitionnement doit systématiquement être mis en cache si l'on ne veut pas que les appels
subséquents conduisent à un nouveau partitionnement du RDD.
Les transformations susceptibles de modifier la clef (tel que map ) conduisent Spark à reconstruire le
partitionnement du RDD parent.
Un fichier Hadoop est chargé dans un RDD en utilisant un partitionnement que l'on a pris le soin de
mettre en cache.
Un nouvel RDD est créé mais en maintenant les clefs colocalisés sur les mêmes noeuds que le RDD
parent.
Repartitionnement
Suite à des opérations de transformation réduisant de manière significative la taille du RDD, il peut être
utile de réduire le nombre de paritions. C'est ce que permet la transformation coalesce
Exercice 4 : La colocalité
object Workshop4 {
val cachedRDD: RDD[(Long, List[Int])] = ... // key = userid, value = list of rating values
val count = cachedRDD.count()
println(s"usercount=$count")
}
}
}
object Workshop5 {
Exercice 6 : Le broadcast
object Workshop6 {
Spark Streaming offre la reprise sur erreur vie la méthode checkpoint . En cas d'erreur, Spark
Streaming repartira du dernier checkpoint qui devra avoir été fait sur un filesystem fiable (tel que HDFS).
Le checkpoint doit être fait périodiquement. La fréquence de sauvegarde a un impact direct sur la
performance. En moyenne on sauvegardera tous les 10 microbatchs.
Les opérations sur les DStreams sont stateless, d'un batch à l'autre, le contexte est perdu. Spark
Streaming offre deux méthodes qui permettent un traitement stateful : reduceByWindow et
reduceByKeyAndWindow qui vont conserver les valeurs antérieures et permettent de travailler sur une
fenêtre de temps multiples de la fréquence de batch paramétrée.
Dans l'exemple ci-dessous, on affiche toutes les 20s le nombre de mots enovyés dans la minute.
object Workshop {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Workshop").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(1))
ssc.checkpoint("/tmp/spark")
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 7777)
val res: DStream[Int] = lines.flatMap(x => x.split(" ")).map(x => 1).reduceByWindow(_ + _, _ - _,
res.checkpoint(Seconds(100))
res.foreachRDD(rdd => rdd.foreach(println))
res.print()
// lines.print()
ssc.start()
ssc.awaitTermination()
}
}
Pour tester le streaming, il nous faut un serveur de test. Sur macos, il suffit de lancer ncat avec la
commande nc et taper des caractères en ligne de commande qui seront alors retransmis sur le port
d'écoute ncat :
$ nc -l localhost 7777
Hello World ++
Hello World ++
Hello World ++
Hello World ++
^C
res.checkpoint(Seconds(100))
res.foreachRDD(rdd => rdd.foreach(println))
res.print()
Partie 5 : SparkSQL
Cette partie a pour but de vous faire prendre en main les APIs Spark SQL qui, à la différence de Spark
Core, permet de gérer des flux de données structurées (avec un Schéma explicite ou définissable)
Théorie
L'abstraction principale de Spark, les RDDs, n'ont aucune notion de schéma, on peut gérer des données
sans aucune information de typage ou de noms de champs.
Pour permettre de gérer une notion de schéma (une liste de noms de champs avec leur type associé)
Spark SQL a introduit dans les versions < 1.3, la notion de SchemaRDD qui est un RDD standard mais
avec cette notion de schéma et donc des méthodes additionnelles.
Avec la nouvelle release de Spark 1.3.0, ce concept a été généralisé et renommé sous le nom de
DataFrame .
Quand vous avez déjà un RDD avec des données structurées - une classe dédié comme ci-après la
classe Movie , vous pouvez appliquer un schéma et transformer le flux en DataFrame
Dans cet exercice nous allons parser un flux non-structuré via un RDD standard, le transformer en un
RDD<Movie> pour enfin lui appliquer un schéma par reflection !
package org.devoxx.spark.lab.devoxx2015.model;
Partie 5 : SparkSQL 27
INITIATION À SPARK AVEC JAVA 8 ET SCALA
// reflection FTW !
DataFrame df = // TODO utiliser Movie.class pour appliquer un schema
df.printSchema();
df.show(); // show statistics on the DataFrame
}
}
object Workshop9 {
Partie 5 : SparkSQL 28
INITIATION À SPARK AVEC JAVA 8 ET SCALA
val df = ...// Charger un fichier json avec l'API sqlContext.load(...) --> DataFrame
df.show() // show statistics on the DataFrame
}
La nouvelle API des DataFrame a été calquée sur l'API de Pandas en Python. Sauf que bien sûr a la
différence de Pandas, les DataFrame sont parallélisées.
// chargement :
sqlContext.load("people.parquet"); // parquet par default sauf si overrider par spark.sql.sources.def
sqlContext.load("people.json", "json");
Dans cet exercice le but va être d'utiliser notre schéma ainsi définit dans le contexte Spark SQL pour
executer via Spark des requêtes SQL
La première requête va être de : Trouver tout les films dont le nom contient Hook
hooks.show();
}
}
Partie 5 : SparkSQL 29
INITIATION À SPARK AVEC JAVA 8 ET SCALA
object Workshop10 {
hooks.show()
}
}
object Workshop11 {
Partie 5 : SparkSQL 30
INITIATION À SPARK AVEC JAVA 8 ET SCALA
Elasticsearch
ElasticSearch expose une interfe d'entrée sortie permettant de récupérer les données au format RDD et
de sauvegarder les RDDs directement en JSON.
Cassandra - Connecteur
Cassandra possède maintenant son connecteur spécifique et avec l'introduction des DataFrame et du
calcul de plan d'exécution qui vient avec a été codifié le predictive pushdown. TODO
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.10</artifactId>
<version>1.2.0-rc3</version>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector-java_2.10</artifactId>
<version>1.2.0-rc3</version>
</dependency>
et en Scala
import com.datastax.spark.connector._
// ou encore :
val collection = sc.parallelize(Seq(("key3", 3), ("key4", 4)))
Conclusion
Voilà !
Conclusion 33