0% ont trouvé ce document utile (0 vote)
38 vues9 pages

Analyse de données avec PySpark TP3

Ce document contient le code Python pour résoudre plusieurs exercices sur l'analyse de données météorologiques à l'aide de Spark. Les exercices impliquent le filtrage, la transformation et l'agrégation de données météorologiques chargées sous forme de RDD afin de répondre à des questions telles que le pays avec le plus grand nombre de stations météorologiques.

Transféré par

22061
Copyright
© © All Rights Reserved
Nous prenons très au sérieux les droits relatifs au contenu. Si vous pensez qu’il s’agit de votre contenu, signalez une atteinte au droit d’auteur ici.
Formats disponibles
Téléchargez aux formats PDF, TXT ou lisez en ligne sur Scribd
0% ont trouvé ce document utile (0 vote)
38 vues9 pages

Analyse de données avec PySpark TP3

Ce document contient le code Python pour résoudre plusieurs exercices sur l'analyse de données météorologiques à l'aide de Spark. Les exercices impliquent le filtrage, la transformation et l'agrégation de données météorologiques chargées sous forme de RDD afin de répondre à des questions telles que le pays avec le plus grand nombre de stations météorologiques.

Transféré par

22061
Copyright
© © All Rights Reserved
Nous prenons très au sérieux les droits relatifs au contenu. Si vous pensez qu’il s’agit de votre contenu, signalez une atteinte au droit d’auteur ici.
Formats disponibles
Téléchargez aux formats PDF, TXT ou lisez en ligne sur Scribd

Compte Rendu du TP3 :

22061

Exercice 4 :

Q1/ le nombre de ligne dans le fichier romeoetjuliette.txt est :

# Importer les bibliothèques nécessaires


from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date

# Créer une session Spark


spark = SparkSession.builder \
.appName("Exercice 4") \
.getOrCreate()

# 1. Lire le fichier CSV en tant que DataFrame


df = spark.read.option("header", "true").csv("commandes.csv")

# Afficher le schéma du DataFrame


df.printSchema()

# 2. Afficher le nombre total de lignes dans le fichier


total_rows = df.count()
print("Nombre total de lignes dans le fichier:", total_rows)

# 3. Convertir le champ "date_commande" en format Date


df = df.withColumn("date_commande",
to_date(col("date_commande")))

# 4. Adresse postale de Champagne Louise


adresse_champagne_louise = df.filter((col("nom") == "Louise") &
(col("prenom") == "Champagne")) \
.select("id_client", "nom", "prenom", "adresse") \
.distinct()

adresse_champagne_louise.show(truncate=False)

# 5. Clients avec des commandes d'un montant supérieur à 298


clients_montant_sup_298 = df.filter(col("montant") > 298) \
.select("id_client") \
.distinct()

clients_montant_sup_298.show(truncate=False)
# 6. Ajouter une nouvelle commande à la collection
nouvelle_commande = spark.createDataFrame([(1001, 90,
"2023-07-15", 300, "Villeneuve", "Patrick", "90 avenue de la
République", "[email protected]")],
["id_commande",
"id_client", "date_commande", "montant", "nom", "prenom",
"adresse", "email"])

df = df.union(nouvelle_commande)

# 7. Nombre de commandes d'un montant supérieur à 298


nb_commandes_sup_298 = df.filter(col("montant") > 298).count()
print("Nombre de commandes d'un montant supérieur à 298:",
nb_commandes_sup_298)

# 8. Nombre de clients distincts avec des commandes d'un montant


supérieur à 298
nb_clients_distincts_montant_sup_298 =
clients_montant_sup_298.count()
print("Nombre de clients distincts avec des commandes d'un
montant supérieur à 298:", nb_clients_distincts_montant_sup_298)

# 9. Commande la plus récente du client avec ID 5


commande_recente_client_5 = df.filter(col("id_client") == 5) \
.orderBy(col("date_commande").desc()) \
.limit(1)

commande_recente_client_5.show(truncate=False)

# 10. Clients ayant passé une commande le 09 Septembre 2023


clients_commande_09_sept_2023 = df.filter(col("date_commande") ==
"2023-09-09") \
.select("nom", "prenom") \
.distinct()

clients_commande_09_sept_2023.show(truncate=False)

# 11. Client avec le plus grand nombre de commandes


client_plus_commandes = df.groupBy("id_client").count() \
.orderBy(col("count").desc()) \
.limit(1)

client_plus_commandes.show(truncate=False)

# 12. Nombre de commandes passées par les 10 clients ayant passé


le plus de commandes
clients_plus_commandes = df.groupBy("id_client").count() \
.orderBy(col("count").desc()) \
.limit(10)

clients_plus_commandes.show(truncate=False)

# Fermer la session Spark


spark.stop()
Q2/ :

from pyspark import SparkContext

# Créer le contexte Spark


sc = SparkContext("local", "Exercise3_Q2")

print('*** The BEGINING ***')


print('**************************')
print('**************************')
print('**************************')

# Charger le fichier dans un RDD


stations_rdd = sc.textFile("isd-history.txt")

# Filtrer les lignes contenant des valeurs non numériques pour


les années
def is_valid_year(year):
return year.isdigit()

# Filtrer les lignes contenant des années valides


filtered_rdd = stations_rdd.filter(lambda line:
is_valid_year(line.split(',')[9]) and
is_valid_year(line.split(',')[10]))

# Vérifier si l'RDD filtré est vide


if filtered_rdd.isEmpty():
print("Aucune donnée valide trouvée dans le RDD.")
else:
station_with_max_years_diff = filtered_rdd.map(lambda line:
(line.split(',')[0], (int(line.split(',')[9]),
int(line.split(',')[10])))) \
.map(lambda x:
(x[0], abs(x[1][1] - x[1][0]))) \
.max(lambda x:
x[1])

# Afficher les résultats


print("Station avec le plus grand écart d'années :")
print("Identifiant de la station :",
station_with_max_years_diff[0].strip())
print("Nom de la station :",
station_with_max_years_diff[0].strip())

results_rdd = sc.parallelize([
"Station avec le plus grand écart d'années :",
f"Identifiant de la station :
{station_with_max_years_diff[0].strip()}",
f"Nom de la station :
{station_with_max_years_diff[0].strip()}"
])

results_rdd.saveAsTextFile("output/tp3/22061/exo3/Q2")

print('**************************')
print('**************************')
print('**************************')

print('*** The END ***')

Q3/ :

from pyspark import SparkContext

# Créer le contexte Spark


sc = SparkContext("local", "Exercise3_Q3")

# Charger le fichier dans un RDD


stations_rdd = sc.textFile("isd-history.txt")

# Filtrer les lignes contenant des en-têtes


header = stations_rdd.first()
stations_rdd = stations_rdd.filter(lambda line: line != header)

# Mapper chaque ligne pour extraire le pays


stations_by_country = stations_rdd.map(lambda line:
(line.split(',')[3], 1))

# Réduire pour compter le nombre de stations par pays


stations_count_by_country =
stations_by_country.reduceByKey(lambda x, y: x + y)

# Trouver le pays avec le plus grand nombre de stations


country_with_most_stations = stations_count_by_country.max(lambda
x: x[1])

# Afficher le résultat
print("Le pays avec le plus de stations est :",
country_with_most_stations[0])
print("Nombre de stations :", country_with_most_stations[1])

# Sauvegarder les résultats


results_rdd = sc.parallelize([
f"Le pays avec le plus de stations est :
{country_with_most_stations[0]}",
f"Nombre de stations : {country_with_most_stations[1]}"
])

results_rdd.saveAsTextFile("output/tp3/22061/exo3/Q3")

print('**************************')
print('**************************')
print('**************************')

print('*** The END ***')


Q4/ :

from pyspark import SparkContext

# Créer le contexte Spark


sc = SparkContext("local", "Exercise3_Q4")

# Charger le fichier dans un RDD


stations_rdd = sc.textFile("isd-history.txt")

# Filtrer les lignes contenant des en-têtes


header = stations_rdd.first()
stations_rdd = stations_rdd.filter(lambda line: line != header)

# Mapper chaque ligne pour extraire le pays


countries_rdd = stations_rdd.map(lambda line: line.split(',')[3])

# Supprimer les doublons pour obtenir une liste unique de pays


unique_countries_rdd = countries_rdd.distinct()

# Compter le nombre de pays


num_countries_with_stations = unique_countries_rdd.count()

# Afficher le résultat
print("Le nombre de pays possédant des stations météo est :",
num_countries_with_stations)

# Sauvegarder les résultats


results_rdd = sc.parallelize([
f"Le nombre de pays possédant des stations météo est :
{num_countries_with_stations}"
])

results_rdd.saveAsTextFile("output/tp3/22061/exo3/Q4")

print('**************************')
print('**************************')
print('**************************')

print('*** The END ***')


Q5/ :

from pyspark import SparkContext

# Créer le contexte Spark


sc = SparkContext("local", "Exercise3_Q5")

print('*** The BEGINING ***')


print('**************************')
print('**************************')
print('**************************')

# Charger le fichier dans un RDD


stations_rdd = sc.textFile("isd-history.txt")

# Filtrer les lignes contenant des en-têtes


header = stations_rdd.first()
stations_rdd = stations_rdd.filter(lambda line: line != header)

# Filtrer les lignes correspondant à la Mauritanie


stations_in_mauritania_rdd = stations_rdd.filter(lambda line:
line.split(',')[3] == "Mauritania")

# Compter le nombre de stations en Mauritanie


num_stations_in_mauritania = stations_in_mauritania_rdd.count()

# Afficher le résultat
print("Le nombre de stations météo en Mauritanie est :",
num_stations_in_mauritania)

# Sauvegarder les résultats


results_rdd = sc.parallelize([
f"Le nombre de stations météo en Mauritanie est :
{num_stations_in_mauritania}"
])

results_rdd.saveAsTextFile("output/tp3/22061/exo3/Q5")

print('**************************')
print('**************************')
print('**************************')

print('*** The END ***')

Vous aimerez peut-être aussi