Big Data Analytics
Introduction au big data
Hadoop (HDFS, MapReduce, Hive)
SGBD Not Only SQL (Hbase, MongoDB, ElasticSearch et Kibana)
Spark
Introduction au big data
• La science des données/Data science : extraction intelligentes et efficace des connaissances à partir du BigData : 2 types :
• Data architects : définir la plateforme technique et les solutions logicielles adaptées
• Data analysts : prendront la suite en appliquant des algorithmes prédictifs
• Modèles de données : 3
• Structurées : Schéma prédéfini -> SGBDR
(relationnel)
• Semi-Structurées : Schéma défini
ultérieurement -> XML, fichier log, PDF
• Non-Structurées : sans schéma -> text,
image, vidéo, article, blog, son
• Caractéristiques du BigData (4V):
• Volume : GB -> TB -> PB
• Veriety/variété : bdd , photo, audio, vidéo,…
• Velocity/Vitesse : temps réel, near real time,
periodic, batch
• Veracity/fiabilité : l’exactitude des données,
minimiser le biais
• Processus BigData : -> schéma
L’écosystème Hadoop
• Hadoop : est un écosystème open source permet le stockage et le traitement des données volumineuses ->
diviser les données en plusieurs machines ->
(+) Cout réduit(OS) (-) Temps réel
• Principes de Hadoop :
• Diviser les données + les sauvegarder dans une collection de données/Cluster
• Traitement des données directement stockées plutôt que les télécharger d’un serveur
• On peut ajouter des machines au cluster au fur et à mesure
• Dristributions Hadoop : Vendeur de distribution :
• Cloudera Distribution for Hadoop : CDH
• MapR distribution
• Hortonworks data platform : HDP
• IBM Infosphere Big Insights : IBP
L’écosystème Hadoop
• Oozie : planificateur et manager du workflow Hadoop
• Hive : DataWareHouse avec interface SQL (select, sum,…)
• Pig : langage de traitement des données
• Hue : frontend graphique pour le cluster, permet de fournir :
• Un navigateur pour HDFS et Hbase
• Des éditeurs pour Hive, Pig, Impala et Sqoop
• Mahout : bibliothèque d’apprentissage automatique ML : grouper des
documents, catégories au documents,…
• MapReduce : framework de traitement de données distribuées : java, python,
ruby, scala
• Hbase : bdd Hadoop orienté colonne, supporte batch et lecture aléatoire
• HDFS (Hadoop Distributed File System)
• Connexion Externe du HDFS :
• Sqoop : mettre les données d’une bdd traditionnelle dans HDFS pour être traiter avec d’autres données
• Flume : système distribué permettant de collecter, regrouper et déplacer efficacement un ensemble de données (des
logs) à partir de plusieurs sources vers HDFS
L’écosystème Hadoop : HDFS
• HDFS : est le système de fichier distribué de Apache Hadoop sur n machines = 1 cluster , composé de 1 nameNode (NN) + x
dataNode (DD) + 1 secondary nameNode (SNN) -> 1 cluster = 1 NN (master) + x DN + 1 SNN
(+) traitement rapide des fichiers de grandes tailles (stockage sur plusieurs machines et bénéficier de ses ressources)
(+) garantir l’integrité des données lors d’un plantage d’une machine
(-) beaucoup de fichiers d’une petite taille
• Hadoop V1 vs V2 :
V1 = 1 NN + gestion des ressources (Job tracker + task tracker)
V2 = n NN + gestion des ressources (Manager de ressource +
manager de nœuds)
Rq : en cas de panne NN actif, le NN passif (SNN) devient actif
Rq : HDFS2 assure la haute disponibilité du NN
Rq : HDFS2 Federation assure la scalabilité horizontale du NN, les NN sont
indépendants, les DN envoit des heartbeat à tt les NN
• Hadoop Replication = 3 : créer 3 copies dans les nœuds au hasard
Rq : si un DN tombe en panne, le NN le détecte et s’occupe de répliquer
encore les blocs pour avoir tjr 3 copies stocké
Rq : chaque DN envoie des heartbeat au NN pour s’assurer sa disponibilité
L’écosystème Hadoop : HDFS
• NameNode : contient des méta-données et des fichiers :
• Les méta-données : des informations sur les données
• Exp : liste de fichiers, liste de block pour chaque fichier( taille par defaut 64Mb), liste de DN pour chaque bloc, fichier log,.
• Les fichiers :
• Version : informations sur le NN + id system de fichiers + type de stockage + la version de la structure de données
HDFS
• Edit : ecrure des fichiers logs des actions d’écriture
• FS Image : un fichier binaire permet de check point si le système tombe en panne
• FS Time : stocke les infos sur la dernière opération de crontrole sur les données
(+) si le NN est défaillant, on peut utiliser SNN
(-) limité à 4000 nœud par cluster (-) le cas d’un problème d’accés au réseau, les données sont inaccessible
(-) si le NN est défaillant, les données sont perdues à jamais, l’admin doit restaurer manuellement
• Secondary NameNode : c’est une copie du nameNode (architecture identique), il n’est pas un fail-over pour NN, ne garantit
pas la haute disponibilité et n’améliore pas les performances de NN
• DataNode : permet de stocker des données
Rq : en cas de pannes la replication créer des nouvelles copies pour maintenir le nombre de copies disponible dans le cluster
L’écosystème Hadoop : HDFS
• Configuration du NN (master) : conf/core-site.xml + conf/mapred-site.xml + conf/masters:localhost
• Configuration du DN (slaves) : conf/slaves:localhost
• Configuration de la réplication : conf/hdfs-site.xml:<configuration><property><name>dfs.replication</><value>3</></>
--getdit /etc/hadoop/conf/hdfs-site.xml
• Commandes utiles : Cloudera cmd :
--hadoop fs -help /de meme/ --hdfs dfs -help : lister les cmd
--hdfs dfs -mkdir dir : créer un dossier sous hdfs --hdfs dfs -cp dir/file1 dir/file2 : copier un fichier
--hdfs dfs -ls dir/ : lister un dossier --hdfs dfs -cat /dir/file.txt | head –n 25 : afficher 25 premières lignes
--hdfs dfs -du dir/ : afficher la taille des files d’un dir(byte) --hdfs dfs -du -s dir/ : afficher la taille totale d’un répertoire
--hdfs dfs -rmr -f dir : supprimer un dossier --hdfs dfs -rm dir/file : supprimer un fichier
--hdfs dfs fsck <path> : reporter des pbs (bloc manquants) --hdfs dfs dfsadmin -<cmd> : reporter la statistique hdfs (mode safe)
• Importer/exporter des fichiers :
local -> HDFS : 1) copier le fichier dans /home/cloudera 2) --hdfs -dfs -put /home/cloudera/file.txt dir/ : copier dans le dossier
HDFS -> local : --hdfs dfs -get dir/file.txt /home/cloudera/file_copy.txt
• Afficher le nombre de bloc d’un fichier dans HDFS :
méthode 1 : (mozilla) -> Hdfs NameNode -> Browse the filesystem -> goto : /user/cloudera/dir -> file.txt
méthode 2 : --hadoop fsck /user/cloudera/dir/file.txt -files -blocks
L’écosystème Hadoop : MapReduce
• MapReduce : est un patron d’architecture de développement qui permet de traiter des données volumineuses de
manière parallèle et distribué
• langages : Java, python, ruby, scala
• Etapes :
• Data Locality : un fichier est diviser en bloques de 128Mo
• Mapping (map tasks) : le développeur définit une fonction de mappage dont le but sera d’analyser les données
brutes (dans les fichier stockés sur HDFS) pour en sortir les données correctement formatées
Mappers : petits programmes permettent d’extraire les enregistrements à partir des données
• Réduction (reduce task) : cette tâche récupère les données construites dans l’étape de mappage et s’occupe de
les analyser dans le but d’en extraire les informations les plus importante
Reducers : générer le résultat final
L’écosystème Hadoop : MapReduce
• Processus MapReduce :
• Job Tracker (dans NN): planifier et affecter les taches aux TaskTrackers
+ gérer les jobs MapReduce et surveiller les progrès réalisés
+ gestion des ressources
+ gestion du traitement
• Task Tracker (dans DN): envoyer des heatbeat au JobTracker pour lui
notifier la progression d’une tache ou bien une erreur pour qu’il puisse
continuer à faire le traitement. Chaque TaskTracker est configuré
au démarrage pour avoir un nombre bien déterminé de slots
(map slot, reduce slot) pour l’exécution des tâches
(Une tache est exécutée dans un seul slot)
Rq : si un taskTracker tombe en panne -> pas de heartbeat to jobTracker
-> le jobTracker va trouver un autre taskTracker plus proche
Rq : si le jobTtracker tombe en panne -> tt le job tombe en panne : pas de resultat
job = ensemble de taches = tasks
• Gestion des ressources : MPR1 (ancienne version) /ou/ yarn (hadoop2) /ou/ MRv2 (nouveau framework)
• YARN : se compose de ressource manager + Node manager (pas de jobTraker et taskTracker), il sépare la gestion des ressources et
l’ordonnancement des job du traitement de données
L’écosystème Hadoop : MapReduce: sous Eclipse
• Ajouter des librairies (.jar) au projet : Click droit sur le projet -> properties -> java build path -> add external jars ->
/usr/lib/hadoop/client-0.20 -> tous les jar
• Création des classes : maxtemperatureMapper.java (séparé les différents champs) + maxtemperatureReducer.java
(calculer la température maximale) + maxtemperatureDriver.java (définir le fichier jar qui va contenir : le driver + le
mapper + le reducer) + (Démarrer le job mapreduce)
• Execution du projet sous eclipse : program arguments : input_file output_directory (projet)
1) Copier le fichier sample.txt juste à la racine du projet
2) Run as -> run configurations -> arguments -> program arguments : sample.txt output -> apply
3) Run as -> java application -> maxtemperatureDriver
4) Rafraichir le projet -> /output/part-00000
• Exécution du projet(.jar) sous HDFS :
1) Export du projet en .jar : Clic droit sur le nom du projet puis cliqué sur EXPORT …. /home/cloudera/MaxTemperature.jar
2) Creation du repertoire du destination dans HDFS : hdfs dfs -mkdir myinput + copier le ficher input (sample.txt) dans ce
repertoire --hdfs dfs -put data/sample.txt myinput
3) Exécuter le jar : --cd /home/cloudera --hadoop jar MaxTemperature.jar myinput/sample.txt joboutput
4) Voir le résultat : --hdfs dfs -cat joboutput/part-r-00000
L’écosystème Hadoop : Hive
• Pig : développé par Yahoo!, langage de type Data Flow nommé Pig latin, le schéma est optionnel
• Hive : est un langage de requête pour traiter les données stocké dans hdfs. C’est une Solution Data Warehouse intégrée
dans Hadoop développé par Facebook, il fournit un langage de requête similaire au SQL nommé HiveQL (bdd, table, ligne,
colonne), le schéma est obligatoire. Les requêtes HiveQL sont traduites en un ensemble de jobs MapReduce qui seront
exécutés dans un cluster Hadoop
• Démarrage du Hive + quitter : cmd --cd home/cloudera --hive --exit;
• Rq : en cas de problème : cloudera manager -> hive -> restart (service)
• Création + affichage d’une bdd : --create database test; --use test; -- show databases;
• Création d’une table (deliminateur’,’ + dans l’emplacement dans hdfs ‘/user/cloudera/atelier3’):
-- create table posts(user string , post string , time bigint) row format delimited fields terminated by ',’
stored as textfile location '/user/cloudera/atelier3’;
• Afficher le schéma d’une table : --desc posts --describe posts;
• Chargement des données(.txt) dans une table : --load data local inpath '/home/cloudera/hive/user-posts.txt' overwrite
into table posts;
• Afficher les 5 lignes d’une table : --select post from posts where user = "user2" limit 5;
• Suppression d’une table logiquement : --drop table posts;
L’écosystème Hadoop : Hive
• Jointure :
--select posts.user,post,nb_likes,posts.time from posts inner join likes on posts.user = likes.user;
• Partitionner d'une table : création d’une colonne country + séparation physique des fichiers :
--load data local inpath '/home/cloudera/hive/user-posts-AUSTRALIA.txt’ overwrite into table posts
partition(country='australia');
--load data local inpath '/home/cloudera/hive/user-posts-US.txt’ overwrite into table posts partition(country='us’);
Rq : ---hdfs dfs -ls /user/hive/warehouse/posts : il faut utiliser --use test; pour creer le repertoire dans
/user/hive/warehouse/test.db si non le repertoire est crée dans /user/hive/warehouse
Resultat : -ls : /user/hive/warehouse/posts/country=australia + /user/hive/warehouse/posts/country=us
Resultat : --desc posts : une colonne country est ajoutée
• Remplissage d’une table d’association (vente_livres) :
--insert overwrite table posts_likes select posts.user,post,nb_likes from posts inner join likes on posts.user =
likes.user; --hdfs dfs -ls /user/hive/warehouse/posts/posts_likes
• Afficher tous les partitions d’une table : --show partitions media;
SGBD Not Only SQL : Notions de base
• Scalabilité
• horizontale : distribuer les données sur plusieurs machines (qui vont former ensemble un cluster)
• verticale : ajouter des disques de stockage dans un même serveur (augmenter la performance de la machine)
• Propriétés
• ACID (atomicité, cohérence/Consistance, isolation et durabilité) : critères à vérifier pour n'importe bdd relationnelle
• Atomicité : Toute transaction doit s'exécuter entièrement sans erreur ou pas du tout
• Consistance: La bdd doit toujours être dans un état cohérent entre deux transactions (assurer par les contraintes d’intégrité)
• Isolation : Les modifications d’une transaction ne sont pas visibles par les autres tant qu’elle n’a pas été validée
• Durabilité : Une fois validées, les données sont permanentes jusqu’à leur prochaine modification (persistance)
• BASE (Basically Available, Soft-State, Eventual Consistency) : (+) plus flexible et accepte certaines erreurs
• Basically Available : Le système garantit la disponibilité, comme définie dans le théorème CAP
• Soft-State : L’état du système peut changer dans le temps, même sans nouvelles entrées, et ce à cause du principe de consistance
éventuelle
• Eventual Consistency : Le système deviendra cohérent au fil du temps, en supposant que le système ne reçoit pas des entrées
pendant ce temps la.
• CAP (Consistency , Partition Tolerance , Availability) : cap englobe l'architecture relationnelle et non relationnelle
• CA : un seul cluster + tous les nœuds sont en contact. Quant on ajoute une partition le système est bloqué. exp : SGBDR
• CP : des données peuvent être non accessible mais le reste est cohérent. exp : MongoDB, HBase, Redis
• AP : la valeur lue par un nœud peu être incohérente avec le contenu des autres nœuds. exp : CouchDB, Cassandra, DynamoDB
SGBD Not Only SQL : SGBDR vs SGBD no sql
SGBD Relationnel SGBD-Non Relationnel / not only sql / Cloud Databases
/ Non-Relational Databases / BigData Databases
• Modèle : entités-relations • largement distribuées (Architecture distribuée)
• requête : SQL • Adaptation des BD NOSQL au BigData : verifie les 4V
• transaction : vérifie les Propriétés/le théorème (Volume, vitesse, variété, veracité)
ACID • transaction : Utilisation des propriétés BASE
(-) Montée en charge (augmentation des users) que (+) Business Intelligence et Analyse : Permet une analyse
verticalement et une organisation rapides des données de très grands
(-) Scalabilité : empêche l'utilisation de plusieurs volumes et de types de données divers
machines pour répartir la charge (pas de S.horizontale) (+) Fiabilité (tolérance aux pannes), Extensibilité, Partage
(-) Relation : non décomposable des ressources Réseau, Vitesse/Performance
(-) Contraintes ACID impossibles à respecter sur un (-) Logiciel, Réparation, Réseau, Sécurité
cluster : si on ajouter des serveurs la consistance n'est (-) Encore en évolution, pas de standards : Pas de langage
pas validé de requêtage commun comme SQL : Consistance
éventuelle + Enorme quantité de données Évolutivité
facile SQL + Très haute disponibilité
SGBD Not Only SQL : 4 Types
Clé/valeur Orientées colonnes Orientées documents Orientées graphes
• Conçues pour sauvegarder les • Évolution de la BD clé/valeur, • Basé sur un système clé-valeur • On ne stocke plus un simple
données sans définir de schéma Ressemble aux SGBDR, mais avec • la valeur est une structure que le ensemble de données mais des
• stockage simple et direct d'un un nombre de colonnes système connaît et peut donc relations.
tableau associatif(hashtable) dynamiques, différent d’un parcourir • S’appuie sur les notions de nœuds,
• Interrogation uniquement par la enregistrement à un autre (pas de • Chaque document est un objet qui de relations et des propriétés qui
clé(recuperer les données par la clé) colonnes portant les valeurs NULL) contient un ou plusieurs champs, et leur sont rattachées
• 3 opérations possibles : • (+)A utliser pour Données avec chaque champs contient une valeur • (+)Conçues pour les données dont
partie structurée et partie non typée (string, date... ) les relations sont représentées
• Put : donner une valeur à une clé
structurée • (+) pouvoir récupérer, via une seule comme graphes, et ayant des
• Get: récupérer la valeur d'une clé
• (+)A utliser pour Données de clé, un ensemble d’informations éléments interconnectés, avec un
• Delete : effacer la clé et sa valeur publication variables:CMS, Blogging nombre indéterminé de relations
structurées de manière
• (+)A utiser pour De l'information avec commentaires, contenu hiérarchique entre elles
très volatile (stockage des données) dynamique, etc... • (+)Adapté aux traitements des
• (+)A utiser pour Données avec
• (+)A utliser pour De l'information • (+)A utliser pour Compteurs et partie structurée et partie non données des réseaux sociaux, Les
très peu volatile et accéder très analytiques structurée moteurs de recommandations, Les
fréquemment • (-)A éviter pour Des besoins de services basés sur la localisation ou
• (+)A utiser pour Données de suivi
• (-)A éviter pour Des données requêtagecomplexes temps réel ou analytiques le calcul d'itinéraires
possédant des relations • (-)A éviter pour Des besoins de • (-)Le stockage d'un graphe sur
• (-)A éviter pour Opérations
• (-)A éviter pour Des opérations calcul d'agrégation simples nécessitant consistance sur plusieurs serveurs est un
impliquant de multiples clés (nécessité de passer plusieurs agrégats problème difficile (problème de
• (-)A éviter pour Des besoins de systématiquement par Map- performance même pour un
• (-)A éviter pour Des structures
requêtagepar les données Reduceaujourd'hui) simple calcul de chemin si le
d'agrégat très changeantes avec des
graphe est réparti sur plusieurs
besoins de requêtage forts
serveurs).
• (-)A éviter pour Les cas où de
• exp : DynamoDB(Amazon), Redis, • exp : Hbase(Hadoop), Cassandra
nombreux nœuds doivent être mis
Voldemort(LinkedIn) (Facebook, Twitter),
à jour
BigTable(Google)
• exp : Mongo DB (SourceForge), • exp : Neo4J et InfiniteGraph,
CouchDB(Apache), RavenDB OrientDB
SGBD Not Only SQL : Hbase (orienté colonne)
• Hbase : est une bdd orientée colonne, distribuée, conçue pour stocker des tables de grand volume, supporte des opérations
CRUD en temps réel, open-source, écrit en Java, Scalabilité horizontale avec Shard automatique, Intégration avec le framework
MapReduce, Basé sur le concept "Bigtable" de google (voir documentation), versions : par defaut 3
(+) Avoir suffisamment de hardware, Minimum 5 noeuds + Avoir, pour chaque
bloc, 3 réplications ->Hbase est consommateur d’espace mémoire et de CPU.
(+) hbase : ne prend pas en considération les valeurs null (->valeur null prend
de l'espace)
(-) Utilisation similaire a SGBDR : Analyse relationnelle , 'group by', 'join', et
'where column like', etc....
(-) Accès par Recherche textuelle
• Modèle de données HBase :
tables-lignes-(clé,colonnes regroupés par famille de colonne)
• table fi hbase = bag
• Les familles permettent de stockées un ensemble dans un fichier
HFile/StoreFile(exp : "family:column" / user:first_name) avec un
nombre limité de famille par table
SGBD Not Only SQL : Hbase (orienté colonne)
• Déploiement de données Hbase :
• Zookeeper : S’assure qu’il n’y a qu’un seul Master en marche ,
Enregistre les Régions Serveurs + allocation des régions, Gère
les erreurs des Régions et du serveur Master
• un Master : Responsable de la coordination entre les Régions
Serveurs , Détecte l’état des Régions Serveurs, Assigne les
Régions aux Régions Serveurs, Utilise Zookeeperpour distribuer
et coordonner les services , Ne stocke/lit les données,
Possibilité de plusieurs serveurs Master : 1 primaire et plusieurs
serveurs backup
• une région serveur : comme tablespace en bddr : un pointeur
qui va pointer physiquement sur les données qui sont stockés
dans hdfs
• une région : Ensemble de lignes de table, Les données d’une
région sont automatiquement réparties par le serveur région
quand elles atteignent une taille bien déterminée. Contient les
tables, exécute les lectures, enregistre les écritures,
Périodiquement, un load balancer déplacera les régions dans
les clusters pour équilibrer les charges Les Clients interagissent
directement avec eux pour les opérations de lecture/écriture
SGBD Not Only SQL : Hbase (orienté colonne)
• Démarrage Hbase : --cd home/cloudera --hbase shell rq : vérifier état : dans cloudera manager-> ouvrir le service
• Création de Table, famille : --create 'table', {NAME => 'family', VERSIONS => 7}
--create't1', {NAME => 'f1', VERSIONS => 5} : table T1 + famille f1 avec la version 5
--create 'Blog', {NAME=>'info'}, {NAME=>'content'} : table Bloc + 2 familles
• Insersion dans une table + creation des colonnes dans les familles : --put 'table', 'row_id', 'family:column', 'value‘
--put 'Blog', 'Matt-001', 'info:title', 'Elephant’
--put 'Blog', 'Matt-001', 'content:post', 'Do elephants like monkeys’
• Suppression des enregistrements : --delete 'table', 'row_id', 'family:column', 'value’
--delete 'Blog', 'Matt-001', 'info:date’
--delete 'Blog', 'Michelle-004', 'info:date', 1326254739791 : par timestamp
--disable 'Blog’ : desactiver la table
--drop 'Blog' : supprimer la table : rq : avant de supprimer, il faut désactiver la table
rq : chaque record est stocker par version car Hbase n’écrase pas une mise a jour mais il stocke et incrémente la version
rq : si id existe il fait un update , si non il ajoute
rq : pour supprimer une version , il faut specifier le timestamp + supprimer les versions précédentes + garder le doc
SGBD Not Only SQL : Hbase (orienté colonne)
• affichage : scan(afficher une liste de ligne) / get(afficher une seule ligne) / count(nombre total d’enregistrement)
--list : afficher la liste des tables crée
--scan 'Blog’ : afficher ttes les lignes de la table Blog
--scan 'table_name', {LIMIT=>2} : afficher 2 lignes d'une table
--scan 'Blog',{COLUMNS=>'info:title', STARTROW=>'startRow', STOPROW=>'stopRow'}:afficher un ensemble de données
--get 'Blog' ,'Matt-001’ : afficher une seule ligne de la table Blog avec l'id Matt-001
--get 't1', 'f1', {COLUMN => 'c1', TIMESTAMP => ts1} : afficher pour un timestamp bien déterminé
--get 't1', 'r1', {VERSIONS => 4} : afficher plus qu'une version
--get 'Blog', 'Michelle-004',{COLUMN=>'info:date', VERSIONS=>2}
--get 'Blog', 'Michelle-004',{COLUMN=>'info:date’}
--get'Blog','Matt-001',{COLUMN=>'info:date',VERSIONS=>2}: afficher plus qu'une version ->
info:datetimestamp=1326071670471, value=1990.07.08, info:datetimestamp=1326071670442, value=1990.07.07
--count 'Blog’ : afficher le nb de lignes dans la table Blog
--count 'Blog', {INTERVAL=>2} : afficher le nb de lignes dont leurs familles ayant plus 1 lignes -> Current count: 2, row: John-
005, Current count: 4, row: Matt-002
SGBD Not Only SQL : MongoDB (orienté docments)
• MongoDB : est une base de données NoSQLOrientée document, Open Source, Développée en C++, Utilise la notion de schemaless(ne
respecte pas les schemas: pas de relation) , Stocke les données/documents(json, excel, xml) sous format JSON ou BSON (Binary JSON)
(+) les avantages du bdd o.documents (+)Tolérance aux pannes (Réplication) (+)Garantit la scalabilité horizontale (Sharding)
• Modèle des données MongoDB : bdd->bdd, Table->Collection,
Ligne->Document, Colonne->Champ, Index->Index, Jointure->Imbrication,
Clé étrangère->Référence, Clé primaire->Clé primaire
-Un objet est une liste non-ordonnée de paires (clé, valeur) :Une clé est une
chaine de caractères. Une valeur est soit une chaîne de caractère, un
numérique, un Booléen, un objet ou un tableau : {first_name: ‘Joe’,
last_name: ‘Doe’}
-Chaque document crée contient le champ « _id », Les documents sont sans
schéma. Les documents utilisent la syntaxe BSON. La taille maximale d’un
document est 16Mo. (voir notes1)
• Les Replica Set : permet d'assurer la haute disponibilité des données.
-Chaque Replica set possède :
• 1 nœud principal (primaire): lecture/écriture ->master
• N serveurs de backup (secondaire): lecture uniquement ->slaves
-Election : Dans le cas où le nœud primaire ne répond pas pendant 10 secondes, un nœud secondaire est élu comme nouveau
nœud primaire(processus rapide et automatique environ une minute) (voir notes2)
SGBD Not Only SQL : MongoDB (orienté docments)
• Sharding/partitionnement : permet d'assurer la scalabilité/extensibilité horizontale. Il s’agit de créer un cluster MongoDB
(sharded cluster) composé de plusieurs machines (noeuds) sur lesquelles les données vont être réparties. La répartition peut
être effectuée de façon arbitraire ou bien selon un sharding key (ou clé de partitionnement). On définit comme clé un champ
présent dans tous les documents.
Rq : si on supprime un shard (les replicats sont dans le meme
shard) -> on va perdre ls data
• Architecture d’un sharded cluster: 3 types de nœuds différents :
• Mongod: shard de données( nœud de données), Contient
un sous ensemble de données. S’il est saturé, il suffit
d’ajouter d’autres shards => scalabilité horizontale
• Mongos: routeur de requêtes. Agit comme un équilibreur
de charge (load balancer). Elle joue le rôle d’interface entre
l’application cliente et le sharded cluster : le routeur
communique avec le serveur de configuration pour
connaître la répartition des données et donc choisir le bon
shard
• Mongoc: serveur de configuration qui Stocke les
métadonnées et les paramètres de configuration du cluster.
Est en charge de la localisation des données, il sait quelles
données se trouvent sur quels shards
SGBD Not Only SQL : MongoDB (orienté docments)
• Connecté sur mongo : --mongo --mongo -host localhost -port 37023
• bdd : -- show dbs : afficher les bdd --use mydb : créer + utiliser une bdd
• Collection :
--db.createCollection('clients’); : création d’une collection
--db.clients.insert([ {"nom" : "nadhir" , "prenom" : "bh"} , {"nom" : "anas" , "prenom" : "fattoum" , "age" : "25" } ]);
--db.clients.find(); : afficher les données d’une collection /ou/ --db.clients.find().pretty();
--db.clients.update( {nom : "nadhir" } , { $set:{age : "28" } ); : modifier une ligne($set) par son nom (ajout/modifier d'une colonne)
--db.clients.update( {nom : "nadhir" } , { $unset:{age : 1 } ); : modifier une ligne($unset) par son nom (supp d'une colonne)
--db.clients.update( {nom : "nadhir" } , { $rename:{"age" : "age+"} ); : modifier une ligne($rename) par son nom (rename une colonne)
--db.clients.remove( {nom : "nadhir" } ); : supprimer une ligne
--db.clients.find( { $or: [ {age+:26} , {nom : "nadhir"} ] } ); : recherche : $or $and $lt(less <) $gt(great)
--db.clients.find( { $or: [ {age+:26} , {nom : "nadhir"} ] } ).sort( {nom:1} ).pretty();
--db.clients.find( { $or: [ {age+:26} , {nom : "nadhir"} ] } ).count();
--db.clients.find( { $or: [ {age+:26} , {nom : "nadhir"} ] } ).limit(4);
• Importer un fichier csv dans mongodb (voir la configuration 0-1-mongo db tuto.txt : 1)--cd ..mongodb/server/4.4/bin
2)--mongoimport -d nom_base_local -c nom_collection --type csv --file nom_dataset.csv --headerline
SGBD Not Only SQL : MongoDB (orienté docments)
• Création du sharded cluster : lancer demarrage_cluster_mongo.bat (avant supprimer le dossier C:\data)
sharded cluster = cfgRep (serveur de configuration:port) + 2 Replica set chaque replica set à son shard (existe dans un dossier)
(1 shard (dossier) : { 3 mongod = 3 replica set :3 port(1 primaire + 2 secondaire) })
• Test sharding : 1) cncté sur le serveur de configuration -: -mongo -host localhost -port 37023 2) creation d’une bbd : --use
test 3) activer le sharding : --sh.enableSharding("test") 4) specifier la clé de sharding : -- db.users.ensureIndex( { _id
: "hashed" } ) 5) alimentation de la collection : -- for (var i = 1;i <=50000;i++) db.users.insert({ _id:i,name : "test",
country:"Tunisia"}) 6) voir la repartition des données sur les 2 shards avec deux chunks (segments) sur chaque shard : -
- sh.status() 7) supprimer un shard : --db.adminCommand( { removeShard: "sh2" } ) 8) --use admin 9) afficher
la liste des shard : -- db.runCommand({listshards:1}) -> l’état de "sh2" est draining (purgé) 9) afficher les info du
sharding/partage : -- printShardingStatus() -> aucune donnée n'a donc été perdue : les 2 chunks de la collection "users" qui
étaient sur le shard "sh2" ont été déplacés vers le shard "sh1"
• configuration de replication (changer le serveur primaire) : 1) cncté au replica serveur primaire : --mongo --port 37017
(Primary>) 2) changer la priorité du mongod (0: primaire, 1:secondaire, 2:secondaire) : --cfg = rs.conf()
--cfg.members[1].priority = 2 // primaire->socondaire --rs.config(cfg) //pour appliquer les changements
3)syncronisation : --rs.printSlaveReplicationInfo() // à partir du master : nouveau mongod primaire
• Arrêter le sharded cluster : lancer arret_cluster_mongo.bat
SGBD Not Only SQL : ElasticSearch Kibana
(orienté document)
• ElasticSearch : est un moteur de recherche distribué en temps réel et un outil d’analyse (select where, de trie par nom,
calculer le score selon le nb d’occurence des mots,score, recherche indexé)
• un cluster ElasticSearch : X noeud + Y index (bdd) + Z shards (1 primaire:a0 + n secondaire:a1)
• Index = bdd , shard = division de la bdd , replica = copie des bdd , type = ligne
• kibana : outil qui permet de visualisé les données stocké dans ElasticSearch
• ElasticSearch CRUD :
➢ Ajouter : index+type : --POST /biblio/livres/1 { "title":"MapReduce", "tags":["HDFS", "MapReduce",
"Hadoop","Cloudera", "big data"] }
➢ supprimer index : --DELETE /biblio
➢ modifier : type : --POST /biblio/livres/2/_update { "doc" : { "tags":["big data","HDFS","Hadoop","Cloudera"] } }
➢ afficher : *index : --GET /_cat/indices?v *type : --GET /biblio/livres/2
➢ chercher : type : GET /biblio/livres/_search { "query":{ "match":{"tags":"big data"} } }
• ElasticSearch chargement des données à partir des fichiers .log, .json, .jsonl (voir 0-bidata.txt) :
➢ Chargment .json : --curl -XPOST "localhost:9200/bank/account/_bulk?pretty" --data-binary @accounts.json
➢ Chargement .jsonl : 1) Le mapping divise les documents de l’index en groupes logiques : --PUT /logstash-
2015.05.18?pretty --PUT /logstash-2015.05.19?pretty --PUT /logstash-2015.05.20?pretty 2)chargement : --curl -
XPOST "localhost:9200/_bulk?pretty" --data-binary @logs.jsonl
SGBD Not Only SQL : ElasticSearch Kibana
(orienté document)
• Kibana Visualisation des données : 1) Définition des Index Pattern pour les indexes crées dans ElasticSearch (voir 0-bigdata.txt)
2) Exploration des données : --kibala:discover -> filtre:account_number: <100 AND balance: >47500 3) Visualisation des
données : Pie Chart, bar Chart, map : --kibala:visualize -> pie -> bank -> split slices -> range/intervalle -> balance -> lancer data
-> save -> save 4) Création d’un Dashboard :qui contient les deux visualisations "PieExample" et "MapExample« : --
kibala:dashboard -> add
Spark
• Spark est un Framework complet et unifié, permet de traiter un volume énorme de données, écrit en scala, et supporte java
scala python r , et s’exécute sur la machine virtuelle Java (JVM).
• Spark vs Hadoop : sont des concurrents + complémentaires(on utilise des composants Hadoop : HDFS pour le stockage)
Hadoop Spark
le traitement se fait sur le disque le traitement se fait en mémoire (ram) : in memory
(-) hadoop : avoir plusieurs outils séparés (installation, (+)spark 100 plus rapide que hadoop (ram >> disque
configuration: Mahout pour ML) dur) stockage en mémoire puis sur le disque.
(+) spark travailler a la fois en mémoire et sur le disque
(+) spark : Framework unifié -> programmation tt les
outils dans un même notebook : python scala...
(+) spark et hadoop complémentaire : stocker
l’architecture hadoop(NN+DN+HDFS) dans spark
(+) spark : Des fonctions autres que Map Reduce + peut
être intégré avec : Yarn, Zookeeper, Mesos, HDFS,
Cassandra, Elasticsearch, MongoDB, hive, Zeppelin
Name Node -> Master Node
Data Node -> Worker Node
job tracker -> spark cluster manager : affectation + gestion des taches
task tracker : service qui va réaliser les taches qui vont être signé par spark manager/job tracker
Spark : écosystème
• Ecosystème Spark :
➢ Spark SQL : permet d’extraire, transformer et charger des données sous différents formats (JSON, Parquet, base de données)
➢ Spark Streaming : traitement temps-réel des données en flux. Il s’appuie sur un mode de traitement en "micro batch" et utilisé pour
les données temps-réel Dstream (c’est-à-dire une série de RDD)
➢ Spark MLlib : MLlib est une librairie de machine learning qui contient tous les algorithmes et utilitaires d’apprentissage classiques,
comme la classification, la régression, le clustering…
➢ Spark GraphX : la nouvelle API (en version alpha) pour le traitement et parallélisation de graphes. GraphX inclut une collection
toujours plus importante d’algorithmes et de builders pour simplifier les tâches d’analyse de graphes.
• Lancer spark : cmd anaconda : --pyspark
Spark : architecture
• Architecture Spark : Apache Spark à une architecture master/slave avec deux
demons et un cluster manager :
un cluster spark = 1 Master Node + X Worker Node (slaves) :
➢ Master Daemon = Master Node (Master/Driver Process)
-Spark driver = Spark driver (fonction main()) + Spark Context + planifie
l’exécution des job et négocie les ressources avec le cluster manager. + enregistre
les métadatasdes RDD et leurs partitions.
- SparkContext : permet d’établir une connexion avec le
cluster manager. Peut être utilisé pour créer les RDD. Quand la méthode main du
programme driver se termine ou si on lance la méthode stop () du Spark Context,
le driver terminera tout les executors et libère les ressources du cluster manager
-Spark manager = Cluster manager : Un service externe responsable de
l’allocation des ressources aux job lancés par le drivers. 3 types Standalone
(scheduling) , Mesos (sécurité) , YARN (monitoring)
-hdfs name node (hadoop)
➢ Worker Daemon = Worker Node (Slave Process) : est un nœud qui permet
d’exécuter une tache dans le cluster
-hdfs data node
-executor = task + cache : est un agent distribué responsable de
l’exécution des taches.
• Instancier sparkContext : --sc = SparkContext.getOrCreate()(voir bigdata jupyter)
Spark : RDD
• RDD (resilient distributed dataset) : est un ensemble de fragment à partir d'une source de données et placée en mémoire RAM
distribuée sur les différents nœuds/ workerNode
Resilient : (pas de replication) , tolérance aux pannes grâce au DAG
Distributed : données réparties sur plusieurs nœuds worker d’un cluster.
rq : en cas de panne Spark conserve/crée l'historique des opérations qui a permis de constituer un RDD ( pas de réplication)
• Types RDD : Parallelize collections : creer un RDD à partir d'un tableau/array :--data = [ 1, 2, 3, 4, 5] / = sc.parallelize(data)
External Datasets : creer un RDD à partir d'un fichier externe : --distFile = sc.textFile("data.txt")
Existing RDDs : creer un RDD (apres transformations) à partir d'un RDD
• Operations RDD : 2 types
➢ Les transformations : retourne un ou plusieurs sparkRDD à partir d’un RDD existant (sans changer RDD sources) :
RDD.map(l => l.length) RDD.flatMap(fonction) RDD.parallelize(Array((1,1),(2,2),3,4)) RDD.filter(line =>
line.contains("Komal")) RDD.distinct() RDD.union(RDD2) RDD.intersection(RDD2) RDD.join(RDD2)
➢ Les actions : retourne un resultat/valeur à partir d’un RDD existant : RDD.collect() RDD.count RDD.first()
RDD.take(n) RDD.reduce(fonction) RDD.persist() RDD.cache()
• Sauvegarder un RDD : saveAsTextFile(path) : sous forme de fichier texte
saveAsSequenceFile(path) : sous forme de SequenceFile Hadoop avec
saveAsObjectFile(path) : dans un format simple en utilisant la sérialisation Java avec
Spark : Sparksql
• Sparksql possède que des dataframe (dataframe = RDD + méta-data) (méta-data : le schéma: liste des colonnes nommées,
leurs type). pour utiliser sparksql, on doit instancier sql context : --sql_Context = SQLContext(sc)
• créer un dataframe à partir d'une source :
--dfs = sqlContext.read.json("employee.json") : source json
--myDF =sc.textFile("people.csv").map(lambda l: l.split(",")).map(lambda l: Row(ID=int(l[0]), name=l[1], age=int(l[2]) )).toDF() : csv
--myDF = sc.textFile("people.csv").toDF() : RDD to sparksql dataframe
• sparksql dataframe -> pandas datafram : --pdDF = myDF.toPandas()
• explorer le dataframe :
dfs.show() dfs..show(2) dfs.count() dfs.take(2) dfs.columns dfs.describe()
dfs.printSchema()
• faire des opérations LMD :
--result = dfs.filter(dfs.age>23).show()
--dfs.groupBy("age").count().show
--myDF.filter('age > 21').groupBy('sex').mean('height').show()
• Exécuter des requêtes Sql : --dfs.registerTempTable("Employee")
--age_filter= sqlContext.sql("select * from Employee where age>=20 AND age <= 35") : executer la requete sql
Spark : Spark streaming + MLib
• spark streaming : 0) sources : kafka, twitter, hdfs, kinesis 1) decouper les données en petits batch/lot de données
2) transferer au moteur spark 3)stocker les données dans hdfs , db , dashboards
(+) tt le traitement se fait dans un seul notebook
• Dstream : une séquence continu de RDD :
RDD1 + RDD2 + RDD3 + ... = Dstream
trs1 + trs2 + trs3 + ... : appliquer des transformations/actions sur toute les RDD
RDD1'+ RDD2'+ RDD3'+ ... : résultat des nouveaux RDD
• window operations : appliquer une transformation/action sur un ensemble de RDD
RDD1 + RDD2 + RDD3 + ... = Dstream
trs1 + trs1 + trs2 + ... : window operations
• MLib : machine Learning lib, documentation : http://spark.apache.org/docs/latest/ml-guide.html
(+) tt le traitement se fait dans un seul notebook