Maitrisez Spark SQL pour
l’ingénierie des bases de
données
N Tweetez
s Partagez
A Enregistrer
D Print T WG
k Partagez 0
hatsApp
PARTAGES
De nombreux data scientists, data analysts, data engineers et
utilisateurs de Business Intelligence s’appuient sur des requêtes
SQL interactives pour explorer les données. Spark SQL est un
module d’Apache Spark pour le traitement de données structurées.
Spark SQL fournit une abstraction de programmation appelée
DataFrame et peut agir comme un moteur de requête SQL
distribué. Ce module permet également aux requêtes Hadoop Hive
non modifiées de s’exécuter 100 fois plus rapidement sur les
déploiements et les données existants. En plus, il offre une forte
intégration avec le reste de l’écosystème Spark (par exemple, en
intégrant le traitement des requêtes SQL à l’apprentissage
automatique).
Dans ce tutoriel exhaustif, nous allons vous apprendre tout ce que
vous devez savoir pour exploiter les données massives et les
bases de données à l’aide de Spark. A la fin de cette chronique,
vous saurez l’utiliser dans vos projets Big Data.
Qu’est-ce que Spark SQL et
quelles sont ses principales
caractéristiques ?
Spark SQL est l’un des modules de Spark pour le traitement des
données structurées. Contrairement à l’API de base spark RDD,
l’interface offerte par spark SQL fournit à spark plus d’informations
sur les structures de données et les calculs en cours, qu’il va
utiliser pour effectuer des optimisations supplémentaires. Il existe
de nombreuses façons d’utiliser spark SQL, notamment l’API SQL,
l’API dataframe et l’API dataset. Il convient de noter que le
moteur d’exécution est le même quelque soit la manière et le
langage utilisés. Grâce à cette unification, les développeurs
peuvent facilement passer d’une API à l’autre, ce qui rend le
traitement des données plus souple. Concrètement, Spark SQL
permettra aux développeurs de :
Importer des données relationnelles à partir de fichiers
Parquet et de tables Hive ;
Exécuter des requêtes SQL sur les données importées et
les RDDs existants ;
Écrire facilement des RDDs vers des tables Hive ou des
fichiers Parquet.
Spark SQL comprend également un optimiseur basé sur les coûts (
Cost-based Optimizer – appelé Catalyst), le stockage en colonnes
et la génération de code pour accélérer les requêtes. En même
temps, il peut évoluer vers des milliers de nœuds et des requêtes
de plusieurs heures en utilisant le moteur Spark, qui offre une
tolérance totale aux pannes en milieu de requête, sans avoir à se
soucier d’utiliser un moteur différent pour les données historiques.
Les principales caractéristiques qui font de Spark SQL la librairie de
traitement de données de Python la plus puissante sont les
suivantes :
Intégration avec Spark
Les requêtes Spark SQL sont intégrées aux programmes Spark.
Spark SQL nous permet d’interroger des données structurées à
l’intérieur des programmes Spark, en utilisant SQL ou une API
DataFrame. Elle peut être utilisée en Java, Scala, Python et R.
D’ailleurs nous possédons une mini formation gratuite pour
maitriser Spark avec Scala qui pourra sans doute vous aider.
Pour exécuter le calcul en continu, les développeurs écrivent
simplement un calcul par lot contre l’API DataFrame / Dataset, et
Spark incrémente automatiquement le calcul pour l’exécuter en
continu. Grâce à cette conception puissante, les développeurs
n’ont pas à gérer manuellement l’état, les défaillances ou la
synchronisation de l’application avec les travaux par lots. Au lieu de
cela, le travail en continu donne toujours la même réponse qu’un
travail par lots sur les mêmes données.
Accès uniforme aux données
DataFrames et SQL prennent en charge un moyen commun
d’accéder à diverses sources de données, comme Hive, Avro,
Parquet, ORC, JSON et JDBC. Cela permet de joindre les données
de ces sources. Cela est très utile pour accueillir tous les
utilisateurs existants dans Spark SQL.
Compatibilité avec Hive
Data Transition Numérique
Spark SQL exécute des requêtes Hive non modifiées sur les
données actuelles. Il réécrit le front-end Hive et le meta store, ce
qui permet une compatibilité totale avec les données, les requêtes
et les UDF Hive actuels.
Connectivité standard
La connexion se fait via JDBC ou ODBC. JDBC et ODBC sont les
normes industrielles de connectivité pour les outils de business
intelligence.
Performance et évolutivité
Spark SQL intègre un optimiseur basé sur les coûts, la génération
de code et le stockage en colonnes pour rendre les requêtes agiles
tout en calculant des milliers de nœuds à l’aide du moteur Spark,
qui offre une tolérance de panne complète à mi-requête. Les
interfaces fournies par Spark SQL donnent à Spark plus
d’informations sur la structure des données et du calcul effectué.
En interne, Spark SQL utilise ces informations supplémentaires
pour effectuer une optimisation supplémentaire. Spark SQL peut
lire directement à partir de plusieurs sources (fichiers, HDFS,
fichiers JSON/Parquet, RDDs existants, Hive, etc.) Il assure
l’exécution rapide des requêtes Hive existantes.
Pourquoi utilise-t-on Spark
SQL ?
Spark SQL a été conçu à l’origine comme Apache Hive pour
fonctionner au-dessus de Spark et est maintenant intégré à la pile
Spark. Apache Hive avait certaines limites.
Hive lance des travaux MapReduce en interne pour
exécuter les requêtes ad-hoc. MapReduce est moins
performant lorsqu’il s’agit d’analyser des ensembles de
données de taille moyenne ( par exemple 200 Go).
Hive n’a pas de capacité de reprise. Cela signifie que si le
traitement s’arrête au milieu d’un flux de travail, vous ne
pouvez pas reprendre là où il s’est arrêté.
Hive ne peut pas déposer des bases de données cryptées
en cascade lorsque la corbeille est activée, ce qui entraîne
une erreur d’exécution. Pour surmonter ce problème, les
utilisateurs doivent utiliser l’option Purge pour ignorer la
corbeille au lieu de déposer.
Ces inconvénients ont donné lieu à la naissance de Spark SQL qui
permet de surmonter ces inconvénients et remplacer Apache Hive.
Mais la question qui se pose encore dans la plupart de nos esprits
est la suivante : Spark SQL est-il une base de données ?
Spark SQL n’est pas une base de données mais un module utilisé
pour le traitement des données structurées. Il fonctionne
principalement sur les DataFrames qui sont l’abstraction de
programmation et agissent généralement comme un moteur de
requête SQL distribué.
Comment fonctionne Spark
SQL ?
Voyons ce que Spark SQL a à offrir. Spark SQL estompe la
frontière entre RDD et table relationnelle. Il offre une intégration
beaucoup plus étroite entre le traitement relationnel et procédural,
grâce à des API déclaratives DataFrame qui s’intègrent au code
Spark. Elle permet également une optimisation plus poussée. L’API
DataFrame et l’API Datasets sont les moyens d’interagir avec
Spark SQL.
Avec Spark SQL, Apache Spark est accessible à un plus grand
nombre d’utilisateurs et améliore l’optimisation pour les utilisateurs.
Spark SQL fournit des API DataFrame qui effectuent des
opérations relationnelles à la fois sur des sources de données
externes et sur les collections distribuées intégrées de Spark. Il
introduit un optimiseur extensible appelé Catalyst, qui permet de
prendre en charge un large éventail de sources de données et
d’algorithmes dans le domaine des Big-Data.
Spark fonctionne à la fois sur des systèmes Windows et des
systèmes de type UNIX (par exemple, Linux, Microsoft, Mac OS). Il
est facile de l’exécuter localement sur une machine – il suffit que
java soit installé dans le PATH de votre système, ou que la variable
d’environnement JAVA_HOME pointe vers une installation Java.
L’architecture de Spark SQL comprend les composants suivants :
1. Data Source API (Application
Programming Interface)
Il s’agit d’une API universelle pour le chargement et le stockage de
données structurées.
Elle a un support intégré pour Hive, Avro, JSON, JDBC,
Parquet, etc.
Prends en charge l’intégration de tiers par le biais de
paquets Spark
Prise en charge des sources intelligentes.
2. L’API DataFrame
Un Dataframe représente une collection de données distribuées
immuables. Son objectif principal est de permettre aux
développeurs de ne se préoccuper que de ce qu’ils doivent faire
lorsqu’ils sont confrontés au traitement des données, et non de la
manière de le faire, et de laisser une partie du travail d’optimisation
au framework spark lui-même. Le dataframe contient des
informations de schéma, c’est-à-dire qu’il peut être considéré
comme une donnée avec un nom et un type de champ, ce qui est
similaire à une table dans une base de données relationnelle, mais
la couche inférieure a fait beaucoup d’optimisation. Une fois le
dataframe créé, vous pouvez utiliser SQL pour le traitement des
données.
Les utilisateurs peuvent construire un dataframe à partir de
diverses sources de données, telles que des fichiers de données
structurés, des tables dans le répertoire de stockage, des bases de
données externes ou des RDD existants. L’API dataframe prend en
charge Scala, Java, Python et R.
3. L’API Dataset
Dataset est une nouvelle interface ajoutée dans spark 3.0 et une
extension de dataframe. Elle présente les avantages du RDD
(entrée de type forte, prise en charge de puissantes fonctions
lambda) et les avantages du moteur d’exécution optimisé de spark
SQL. Vous pouvez construire un dataset à partir d’un objet JVM,
puis utiliser une transformation de fonction (map),flatMap,filter).
Il convient de noter que l’API dataset est disponible en scala et en
Java, et que python ne prend pas en charge l’API dataset.
En outre, l’API dataset peut réduire l’utilisation de la mémoire.
Comme le framework spark connaît la structure de données du
dataset, il peut économiser beaucoup d’espace mémoire lors de la
persistance du dataset.
4. L’optimiseur Catalyst
L’optimiseur Catalyst utilise deux
types de plans :
Plan logique : définit le calcul sur le jeu de données, mais n’a pas
défini la manière d’exécuter le calcul. Chaque plan logique définit
une série de propriétés (champs de requête) et de contraintes
(conditions d’occurrence) requises par le code utilisateur, mais ne
définit pas la manière d’exécuter le calcul.
Plan physique : un plan physique est généré à partir d’un plan
logique, qui définit la manière d’effectuer les calculs et est
exécutable. Par exemple, une jointure dans un plan logique est
convertie en une jointure de fusion de tri dans un plan physique. Il
est à noter que Spark génère plusieurs plans physiques et
sélectionne ensuite le plan physique ayant le coût le plus bas.
Dans spark SQL, toutes les opérations des opérateurs sont
converties en ast (abstract syntax tree) et transmises à l’optimiseur
de catalyseur. L’optimiseur est construit sur la base de la
programmation fonctionnelle de Scala. Catalyst supporte les
stratégies d’optimisation basées sur les règles et sur les coûts.
Le plan de requête de spark SQL comprend quatre étapes (voir la
figure ci-dessous) :
1. Analyse
2. Optimisation logique
3. Plan physique
4. Génération du code et compilation de la partie requête en
bytecode Java.
Attention : Dans la phase de planification physique, Catalyst
génère plusieurs plans, calcule le coût de chaque plan, puis
compare le coût de ces plans, c’est la stratégie basée sur le coût.
Dans les autres phases, il s’agit d’une stratégie d’optimisation
basée sur les règles.
Analyse
Unresolved Logical Plan -> Logical Plan. Le plan de requête de
spark SQL commence par l’ast (abstract syntax tree) retourné par
le parseur SQL ou l’objet dataframe construit par l’API. Dans les
deux cas, il y aura des références d’attributs non traitées (un
champ de requête peut ne pas exister, ou le type de données est
erroné). Lorsque le type d’un champ de propriété ne peut pas être
déterminé ou ne peut pas être mis en correspondance avec la table
d’entrée, il est appelé Untreated. Spark SQL utilise les règles du
catalyst et l’objet catalogue (qui peut accéder aux informations de
la table de la source de données) pour traiter ces propriétés. La
première étape consiste à construire un Unresolved Logical
PlanTree, puis à appliquer une série de règles, et enfin à générer
un Logical Plan.
Optimisation logique
Logical Plan ->Optimized Logical Plan. Dans l’étape
d’optimisation logique, des stratégies d’optimisation basées sur des
règles sont utilisées, telles que la poussée des prédicats, l’écrêtage
des projections, etc. Après une certaine optimisation des colonnes,
le plan logique optimisé est généré.
Plan physique
Optimized Logical Plan ->Physical Plan. Dans la phase de plan
physique, spark SQL va générer plusieurs plans d’exécution
physiques à partir du plan logique optimisé, puis utiliser le modèle
de coût pour calculer le coût de chaque plan physique, et enfin
sélectionner un plan physique. A ce stade, spark SQL utilise la
jointure de diffusion s’il est déterminé qu’une table est très petite
(qui peut être persistée en mémoire).
Il convient de noter que le planificateur physique utilise également
des stratégies d’optimisation basées sur des règles, telles que le
pipelining des opérations de projection et de filtrage dans un
opérateur spark map. En outre, les opérations de la phase de
planification logique sont poussées du côté de la source de
données (le prédicat push-down et la projection push-down sont
pris en charge).
Génération de code
L’étape finale de l’optimisation des requêtes consiste à générer le
bytecode Java et à utiliser les guillemets Quasi pour terminer le
travail.
Après l’analyse ci-dessus, nous avons une compréhension
préliminaire de l’optimiseur Catalyst. Comment les autres
composants de spark interagissent-ils avec l’optimiseur de
catalyseur ? Les détails sont présentés dans la figure suivante :
Comme le montre la figure ci-dessus : Les pipelines ML, le
streaming structuré et les graphframes utilisent tous des API de
type dataframe / dataset. Et ils bénéficient tous de l’optimiseur de
catalyseur.
Démarrer avec Spark SQL
Pour une meilleure maîtrise de cette librairie vous devez avoir une
connaissance de base sur le langage de programmation Python.
Notre article sur la programmation Python pour la data vous
donnera toutes les bases nécessaires pour mieux appréhender cet
article.
Création d’une session Spark
Sparksession est l’entrée de programmation de l’API de dataset et
de dataframe. Elle est supportée à partir de spark 2.0. Elle est
utilisée pour unifier le hivecontext et le sqlcontext. Grâce à une
entrée de session spark, l’utilisabilité de spark est améliorée. Le
code suivant montre comment créer une sparksession :
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic e
xample") \
.config("spark.some.config.optio
n", "some-value") \
.getOrCreate()
Création d’un dataframe
Après avoir créé une session Spark, vous pouvez l’utiliser pour
créer un dataframe à partir d’un RDD existant, d’une table hive ou
d’une autre source de données. L’exemple suivant permet de créer
un dataframe à partir d’une source de données de type fichier
JSON :
df = spark.read.option("multiline","tr
ue").json("student.json")
# Afiifchage du contenu du dataframe
df.show()
Cet exemple affichera donc :
Opérations de base sur les
dataframes
Après avoir créé un dataframe, vous pouvez effectuer quelques
opérations sur les colonnes, comme le montre le code suivant :
# Selection de la colonne "firstname"
df.select("firstname").show()
# Sélection de tout le dataframe, mais
en incrémentant tout les ages de 1
df.select(df['firstname'], df['age'] +
1).show()
# Sélection des étudiants de plus de 2
9 ans
df.filter(df['age'] > 29).show()
# Comptage des étudiants par groupe d'
age
df.groupBy("age").count().show()
Utilisation d’une requête SQL
dans un programme
L’opération ci-dessus useDSL (langage spécifique au domaine).
Vous pouvez également utiliser SQL pour exploiter directement le
cadre de données, comme indiqué ci-dessous :
# Enregistrement du DataFrame comme u
ne vue temporaire
df.createOrReplaceTempView("student")
sqlDF = spark.sql("SELECT * FROM stude
nt WHERE firstname = 'Sylvie'")
sqlDF.show()
Interopérer avec les RDD
Spark SQL prend en charge deux méthodes différentes pour
convertir des RDD existants en dataframe. La première méthode
utilise la réflexion pour déduire le schéma d’un RDD qui contient
des types d’objets spécifiques. Cette approche basée sur la
réflexion conduit à un code plus concis. Elle fonctionne bien
lorsque vous connaissez déjà le schéma lors de l’écriture de votre
application Spark.
La deuxième méthode pour créer un dataframe est une interface
programmatique. Elle vous permet de construire un schéma et de
l’appliquer à un RDD existant. Bien que cette méthode soit plus
verbeuse, elle vous permet de construire des ensembles de
données lorsque les colonnes et leurs types ne sont pas connus
avant l’exécution.
Voyons le fichier people.txt suivant :
Tom, 29
Bob, 30
Sylvie, 19
Déduction du schéma à l’aide de la
réflexion
Spark SQL peut convertir un RDD d’objets Row en un DataFrame,
en déduisant les types de données. Les rangs sont construits en
passant une liste de paires clé/valeur comme kwargs à la classe
Row. Les clés de cette liste définissent les noms des colonnes de
la table, et les types sont déduits par échantillonnage de l’ensemble
des données, de manière similaire à l’inférence effectuée sur les
fichiers JSON.
from pyspark.sql import Row
sc = spark.sparkContext
# Chargement du fichier texte et conve
rsion de chaque ligne en Row.
lines = sc.textFile("people.txt")
parts = lines.map(lambda l: l.spli
t(","))
people = parts.map(lambda p: Row(name=
p[0], age=int(p[1])))
# Déduire le schéma et enregistrer le
DataFrame comme une table.
schemaPeople = spark.createDataFrame(p
eople)
schemaPeople.createOrReplaceTempView("
people")
# Exécution d'une requête
teenagers = spark.sql("SELECT name FRO
M people WHERE age >= 13 AND age <= 1
9")
# Les résultats des requêtes SQL sont
des objets Dataframe.
# rdd retourne le contenu sous la form
e d'un :class:`pyspark.RDD` de :class:
`Row`.teenNames = teenagers.rdd.map(la
mbda p: "Name: " + p.name).collect()
for name in teenNames:
print(name)
# Name: Sylvie
Spécification programmatique du
schéma
Lorsqu’un dictionnaire de kwargs ne peut pas être défini à l’avance
(par exemple, la structure des enregistrements est codée dans une
chaîne de caractères, ou un ensemble de données textuelles sera
analysé et les champs seront projetés différemment pour différents
utilisateurs), un DataFrame peut être créé par programme en trois
étapes.
Créer un RDD de tuples ou de listes à partir du RDD
original ;
Créer le schéma représenté par un StructType
correspondant à la structure des tuples ou des listes dans le
RDD créé à l’étape 1.
Appliquer le schéma au RDD via la méthode
createDataFrame fournie par SparkSession.
Prenons cet exemple :
# Import data types
from pyspark.sql.types import StringTy
pe, StructType, StructField
sc = spark.sparkContext
# Chargement du fichier texte et conve
rsion de chaque ligne en Row.
lines = sc.textFile("people.txt")
parts = lines.map(lambda l: l.spli
t(","))
# Chaque est convertit en tuple
people = parts.map(lambda p: (p[0], p[
1].strip()))
# schema is encodé en string.
schemaString = "name age"
fields = [StructField(field_name, Stri
ngType(), True) for field_name in sche
maString.split()]
schema = StructType(fields)
# Appllication du schema au RDD.
schemaPeople = spark.createDataFrame(p
eople, schema)
# Création d'une vue temporaire
schemaPeople.createOrReplaceTempView("
people")
# SQL peut être exécuté sur des DataFr
ames qui ont été enregistrés comme une
table.
results = spark.sql("SELECT name FROM
people")
results.show()
Les Source de données de spark
SQL
Spark SQL prend en charge l’exploitation de diverses sources de
données par le biais de l’interface dataframe. Il peut utiliser la
transformation relationnelle et la vue temporaire pour opérer sur le
dataframe. Les sources de données courantes sont les suivantes :
Source de données fichier
Les sources de données fichier peuvent être des fichiers :
Parquet ;
JSON ;
CSV ;
Orc ;
#Code pour lire un fichier parquet
parquetFile = spark.read.parquet("stud
ent.parquet")
#Code pour lire un fichier JSON
peopleDF = spark.read.json("student.pa
rquet")
#Code pour lire un fichier CSV
df = spark.read.format("csv")
.load("student.csv")
Source de données hive
Spark SQL prend également en charge la lecture et l’écriture de
données stockées dans Apache Hive. Cependant, comme Hive a
un grand nombre de dépendances, ces dépendances ne sont pas
incluses dans la distribution Spark par défaut. Si les dépendances
Hive peuvent être trouvées sur le classpath, Spark les chargera
automatiquement. Notez que ces dépendances Hive doivent
également être présentes sur tous les nœuds de travail. Ils devront
avoir accès aux bibliothèques de sérialisation et de désérialisation
Hive (SerDes) afin d’accéder aux données stockées dans Hive.
La configuration de Hive se fait en plaçant votre fichier hive-
site.xml, core-site.xml (pour la configuration de la sécurité) et hdfs-
site.xml (pour la configuration HDFS) dans conf/.
Lorsque l’on travaille avec Hive, il faut instancier SparkSession
avec le support Hive. Cela comprend aussi la connectivité à un
métastore Hive persistant, le support des serdes Hive et les
fonctions Hive définies par l’utilisateur. Les utilisateurs qui n’ont pas
de déploiement Hive existant peuvent toujours activer le support
Hive. Lorsqu’il n’est pas configuré par le hive-site.xml, le contexte
crée automatiquement metastore_db dans le répertoire courant et
crée un répertoire configuré par spark.sql.warehouse.dir, qui est
par défaut le répertoire spark-warehouse dans le répertoire
courant où l’application Spark est démarrée. Notez que la propriété
hive.metastore.warehouse.dir dans hive-site.xml est dépréciée
depuis Spark 2.0.0. Utilisez plutôt spark.sql.warehouse.dir pour
spécifier l’emplacement par défaut de la base de données dans
l’entrepôt. Vous devrez peut-être accorder le privilège d’écriture à
l’utilisateur qui démarre l’application Spark.
from os.path import abspath
from pyspark.sql import SparkSession
from pyspark.sql import Row
# warehouse_location indique l'emplace
ment par défaut des bases de données e
t des tables gérées.
warehouse_location = abspath('spark-wa
rehouse')
spark = SparkSession \
.builder \
.appName("Python Spark SQL Hive in
tegration example") \
: