Bases de Données et Traitement Distribué
Bases de Données et Traitement Distribué
2
Mise en place du ‘sharding’ en deux étapes : L’évolution de son débit dans le temps :
- Allocation des Shards aux machines (Plusieurs Shards possibles par - Débit constant
machine) - Débit borné
- Affectation des tuples à leur Shard - Débit variant
Range-based partionning : - Avec motifs régulier
EX : Shard 1 : Valeur < 15 - De manière erratique
Shard 2 : 15 < Valeur < 25
Shard 3 : 25 < Valeur
Hash partitionning :
EX : shard 1 : hash( id ) % 3 -> 0
shard 2 : hash( id ) % 3 -> 1
shard 3 : hash( id ) % 3 -> 2
AVANTAGES DU SHARDING
Gain de temps de traitement :
- Traitement en parallèle (sur différentes machine CPU, RAM) des
‘shard’ ➔ Possibilités d’adapter la voilure de l’infrastructure (nombre
de CPU et RAM) en fonction de la quantité de données à traiter
Evite la centralisation des données sur un seul et unique serveur (sécurité)
INCONVENIENTS DU SHARDING
Une mauvaise stratégie de ‘sharding’ (choix de la clé de sharding) peut
rendre contreproductif le processus à cause de ‘shard’ de taille trop
hétérogène
- Rééquilibrage complexe
Sensible à la performance des réseaux REQUETES CONTINUES
Sensible à la disponibilité et l’évolution du cluster de machine
DEFINITION
GESTION DE GRANDES MASSES DE DONNEES - REVISION CM3 Une requête continue est une requête qui est émise une fois et qui
SYSTEME DE GESTION DES FLUX DE DONNEES (DATA STREAM s'exécute de façon logique sur les données jusqu'à ce qu’elle soit
MANAGEMENT SYSTEM DSMS) terminée.
Le traitement d’une requête continue nécessite l’exécution d’opérateurs
Traitement de flux de données à débit variant (avec ou sans état) avec une fréquence et une portée paramétrée.
Contraintes :
OPERATEURS
- Maîtriser la qualité des résultats retournés
- Maîtriser les ressources (CPU, RAM bande passante) nécessaires au Le traitement d’une requête continue peut nécessiter des opérateurs de
traitement différents types
Opérateur sans état (‘Stateless’) :
- Calcul indépendant de l’état/résultat calculé précédemment
- Exemple : requête appliquant un filtre sur les données
Opérateur avec état (‘Stateful’) :
- Calcul dépendant de l’état/résultat calculé précédemment
- Exemple : requête calculant l’évolution du nombre moyen de valeurs
apparaissant toutes les x secondes
REQUETES CONTINUES : PARAMETRES
Exemple de requete : Donner la moyenne sur les 15 dernières minutes du
niveau de polluants HAP observées à partir des 10 000 capteurs répartis
dans la région Auvergne-RhôneAlpes et ce toutes les 5 minutes.
Une requête continue dispose :
- D'une fréquence de résultat attendu : "toutes les 5 minutes."
- D'une portée sur laquelle la requête est calculée : "sur les 15 dernières
minutes."
FENETRAGE
FLUX EN ENTREE Rappel :
- Un flux est infini
DEFINITION
Conséquence :
Un flux S est un ensemble potentiellement infini de paires < s, τ >, où : - Il n’est pas possible d’interroger le flux dans sa globalité
- s est un tuple respectant le schéma de S Besoin :
- τ est le timestamp de cet élément. - Système de fenêtrage
Remarque : Paramètres : taille, pas
- Un timestamp τ n’appartient pas au schéma de S et il peut y avoir Différents types :
zéro, un ou plusieurs éléments de S partageant le même τ. - Fenêtres basées sur le temps
CARACTERISTIQUES - Fenêtres basées sur le nombre de tuples reçus
Un flux S est caractérisé par : - Fenêtres basées sur le nombre de valeurs de tuples reçus
- La valeur de son débit exprimé en nombre de tuples par unité de FENETRES SAUTANTES
temps - Fenêtre glissante où pas = taille
3
FENETRE BASEE SUR LE TEMPS (TIME-BASED) DISTRIBUEE
- La fenêtre est définie à partir d’une durée Grille, Cloud :
- Le calcul de la requête se fait sur les tuples reçus depuis δ secondes et - Possibilité de parallélisation des threads
ce toutes les π secondes - Mise à disposition de nouvelles ressources
- Introduction de latence réseau
SYSTEMES DE GESTION DE FLUX (DSMS)
PERFORMANCE DE TRAITEMENT
Pourvoir obtenir des résultats en quasi-temps réel
Optimiser le traitement de la requête
QUALITE DES RESULTATS
Obtenir des résultats en ayant limité les pertes de données
Eviter la congestion des opérateurs
ADAPTABILITE DES RESSOURCES
Avoir les ressources nécessaires pour traiter le flux
Ne pas bloquer des ressources inutilement
REMARQUE
La fin d’une fenêtre n’implique pas forcément la disponibilité d’un
FENETRE SAUTANTE résultat.
- La fenêtre est définie à partir d’une durée
- Le calcul de la requête se fait sur les tuples reçus depuis δ secondes et
ce, toutes les π secondes
CONGESTION
Le système est considéré comme congestionné, si :
- Des données en entrée du système sont perdues (file d’attente
saturée)
FENETRES SUR LE NB DE TUPLES REÇUS (TUPLE-BASED - Des données avec TTL dépassé
- La fenêtre est définie à partir d’un nombre de tuples reçus ELASTICITE DES DSMS
- Le calcul de la requête se fait sur les n tuples reçus DEFINITION
Adapter les ressources nécessaires au traitement des requêtes
ENJEUX (INTERSECTION BIG DATA / GREEN IT)
Performance du traitement de la requête
Qualité des résultats retournés
Consommation énergétique :
- Pouvoir adapter la quantité de ressources nécessaires en cas de
variations du débit des flux à traiter
Automaticité :
FENETRES SUR LE NB DE TUPLES REÇUS (PARTITION-BASED) - Pouvoir adapter automatiquement les ressources
- La fenêtre est définie à partir d’un nombre de valeurs de tuples reçus GESTION DE L’ELASTICITE
- Le calcul de la requête se fait sur les n tuples reçus et vérifiant une Deux niveaux de traitements de l’élasticité :
formule logique - Ajout/suppression de ressources supplémentaires pour l’exécution
des threads
- Concentration/Etalement des threads
Deux contextes :
- Ressources disponibles et suffisantes
- Ressources limitées et insuffisantes
APPROCHES WORKFLOW
DEFINITION
GESTION DU TEMPS Les requêtes sont considérées comme des graphes d’opérateurs
Problème de synchronisation présent en cas de plusieurs sources (ordre TRAITEMENT DE REQUETES
non garanti, latence réseau…). Choix de la topologie
Système en ‘battement de cœur’ possible de deux manières : - Définition d’une topologie réduisant le nombre de tuples manipulés
- Associer un timestamp à chaque tuple entrant. Choix du degré de parallélisme des opérateurs
- Chaque source envoie une ponctuation lorsqu’elle a fini d’envoyer des - Définition du nombre de threads permettant de paralléliser
tuples associées à un timestamp et la synchronisation s’effectue grâce l’exécution d’un opérateur
aux sources Choix de l’allocation d’une tache sur une unité de traitement
INFRASTRUCTURE - Définition d’une stratégie d’allocation des threads sur les unités de
traitement
CENTRALISEE Choix de l’implémentation des opérateurs
Multicœurs : - Définition d’une stratégie de sélection d’implémentation des
- Possibilité de parallélisation des threads opérateurs
4
STRATEGIE DE REPLICATION TP 2 - GGMD
Réplication incrémentale : DEFINITION DES CONNEXIONS INTER-BASES
- Ajout / suppression d’une réplique à la fois par reconfiguration
- Peut générer de nombreuses reconfigurations Création d’un index sur personne :
Réplication multiple : CREATE INDEX Q2 ON personne (nomprenom,
- Ajout / suppression de plusieurs répliques à la fois par reconfiguration datenaiss, lieunaiss);
- Peut générer de fortes variations de ressources TP 3 - SPARK
STRATEGIES D’ALLOCATION - DIFFERENTES STRATEGIES : LECTURE DE DONNEES
Orienté répartition : - [Link](path): l'appel à cette méthode crée un RDD à
- Round robin des affectations des threads sur les unités de traitements
partir des lignes du fichier. A noter que le path du fichier peut être
- Permet d’équilibrer la charge (si les temps de traitement des tuples
un chemin réseau, ce qui nous sera utile par la suite.
considérés comme identiques)
- Pas de considération du réseau TRANSFORMATIONS
Orienté échange réseau : - map(func): applique une fonction à chacune des données.
- Tient compte de la position des opérateurs dans la topologie pour - filter(func): permet d'éliminer certaines données.
choisir leur affectation - flatMap(func): tout comme map, mais chacune des données
- Permet de réduit le trafic réseau d'entrée peut être transformée en plusieurs données de sortie.
- Mais concentre la charge sur certaines unités de traitement - sample(withReplacement, fraction, seed):
Orienté ressources : récolte un échantillon aléatoire des données. Cette méthode est utile
- Tient compte des ressources (CPU, RAM) disponibles sur une unité de pour tester un algorithme sur un petit pourcentage de données.
traitement L'argument seed permet de réaliser des expériences
- Permet une occupation maximale d’un sous-ensemble minimal de reproductibles. Exemple :.sample(False, 0.01, 42).
ressources (algo sac à dos) - distinct(): supprime les doublons.
- Mais reste faible robustesse en cas de variation du flux - reduceByKey(func): applique une fonction de réduction aux
FOCUS SUR LE DEGRE DE PARALLELISME valeurs de chaque clé. Par exemple, la fonction func peut renvoyer
Changer le degré de parallélisme d’un opérateur. Quand ? : le maximum entre deux valeurs.
- La détection de la congestion de l’opérateur (approche curative) - sortByKey(ascending): utilisée pour trier le résultat par clé.
- La détection d’un risque de congestion de l’opérateur (approche - join(rdd): permet de réaliser une jointure, ce qui a le même sens
préventive) que dans les bases de données relationnelles.
Comment ? ACTIONS
- En répliquant un opérateur (scale out) / supprimant une réplique
(scale in) - reduce(func): applique une réduction à l'ensemble des
données.
TP1 - DEPLOIEMENT D'UNE BASE DE DONNEES REPARTIE - collect(): retourne toutes les données contenues dans le RDD
DEFINITION DES CONNEXIONS INTER-BASES sous la forme de liste.
- count(): retourne le nombre de données contenues dans RDD.
Définir le 'wrapper' qui permettra de vous connecter à la base :
CREATE EXTENSION IF NOT EXISTS postgres_fdw;
CREATE SERVER small FOREIGN DATA WRAPPER EXEMPLE
postgres_fdw OPTIONS (host '[Link]',
port '5432', dbname 'insee'); cleanedTortoises = [Link](lambda t:
Définir l'association utilisateur local/utilisateur distant qui vous len(t) > 1 and int(c[t]) == 21)
donnera le droit d'accès à la base insee :
CREATE USER MAPPING FOR "postgres" SERVER small jsonTortoises = [Link](lambda t: {
OPTIONS( user 'etum2' ,password 'etum2'); "id": int(t[1]), "top": int(t[2]), "position":
Définir les 'foreign tables' dbase insees depuis une VM : int(t[3]), "nbAvant": int(t[4]), "nbTour":
CREATE FOREIGN TABLE remote_personnes(idp int(t[5]), "distanceTotal": 254 * int(t[5]) +
serial4, nom varchar(80), prenoms varchar(80)) int(t[3]) })
SERVER small OPTIONS(schema_name 'public',
min = [Link]( (a, b) => a min b)
table_name 'personnes');
Définir une 'table' à partir d’une 'foreign tables' :
CREATE TABLE personnes AS SELECT * (ou les
colonnes qui nous intéressent si fragmentation
vertical) FROM remote_personnes WHERE (si ya
besoin partition horizontale);
Créer une 'publication' :
CREATE PUBLICATION pub_update_mairie FOR TABLE
mairie;
Créer une 'suscriptions' :
CREATE SUBSCRIPTION sub_update_mairie CONNECTION
'host=[Link] port=5432 password=etum2
user=etum2 dbname=insee' PUBLICATION
pub_update_mairie;
Créer un 'Trigger' sur les updates :
CREATE TRIGGGER TT AFTER UPDATE ON commune c FOR
EACH ROW BEGIN UPDATE personne set lieunaiss =
[Link] where lieunaiss = [Link]; END