PySpark Selvstudium for begyndere: Lær med EKSEMPLER

Før du lærer PySpark, lad os forstå:

Hvad er Apache Spark?

Spark er en big data-løsning, der har vist sig at være nemmere og hurtigere end Hadoop MapReduce. Spark er en open source-software udviklet af UC Berkeley RAD lab i 2009. Siden den blev frigivet til offentligheden i 2010, Spark er vokset i popularitet og bruges gennem branchen med et hidtil uset omfang.

I æraen af Big data, har praktikere mere end nogensinde brug for hurtige og pålidelige værktøjer til at behandle streaming af data. Tidligere værktøjer som MapReduce var foretrukne, men var langsomme. For at overvinde dette problem, Spark tilbyder en løsning, der både er hurtig og til almen brug. Den væsentligste forskel mellem Spark og MapReduce er det Spark kører beregninger i hukommelsen under den senere på harddisken. Det tillader højhastighedsadgang og databehandling, hvilket reducerer tiden fra timer til minutter.

Hvad er PySpark?

PySpark er et værktøj skabt af Apache Spark Fællesskab til brug Python med Spark. Det gør det muligt at arbejde med RDD (Resilient Distributed Dataset) i Python. Det tilbyder også PySpark Shell til at linke Python API'er med Spark kerne at igangsætte Spark Sammenhæng. Spark er navnemotoren til at realisere cluster computing, mens PySpark is Python's bibliotek at bruge Spark.

Hvordan virker det? Spark arbejde?

Spark er baseret på beregningsmotor, hvilket betyder, at den tager sig af planlægnings-, distributions- og overvågningsapplikationen. Hver opgave udføres på tværs af forskellige arbejdsmaskiner kaldet computing cluster. En computerklynge refererer til opgavefordelingen. En maskine udfører én opgave, mens de andre bidrager til det endelige output gennem en anden opgave. I sidste ende bliver alle opgaverne samlet for at producere et output. De Spark admin giver en 360 oversigt over forskellige Spark Job.

Hvordan Spark Arbejde
Hvordan Spark Arbejde

Spark er designet til at arbejde med

  • Python
  • Java
  • Scala
  • SQL

Et væsentligt træk ved Spark er den store mængde indbygget bibliotek, inklusive MLlib til maskinlæring. Spark er også designet til at arbejde med Hadoop-klynger og kan læse den brede type filer, herunder Hive-data, CSV, JSON, Casandra-data blandt andet.

Hvorfor bruge Spark?

Som fremtidig datapraktiker bør du være bekendt med pythons berømte biblioteker: Pandaer og scikit-learn. Disse to biblioteker er fantastiske til at udforske datasæt op til mellemstørrelse. Regelmæssige maskinlæringsprojekter er bygget op omkring følgende metodologi:

  • Indlæs dataene til disken
  • Importer dataene til maskinens hukommelse
  • Bearbejde/analysere data
  • Byg maskinlæringsmodellen
  • Gem forudsigelsen tilbage på disken

Problemet opstår, hvis dataforskeren ønsker at behandle data, der er for store til én computer. I tidligere dage med datavidenskab prøvede praktikerne, da træning i enorme datasæt ikke altid var nødvendig. Dataforskeren ville finde en god statistisk prøve, udføre et ekstra robusthedstjek og komme med en fremragende model.

Der er dog nogle problemer med dette:

  • Afspejler datasættet den virkelige verden?
  • Indeholder dataene et specifikt eksempel?
  • Er modellen egnet til prøveudtagning?

Tag brugeranbefaling for eksempel. Anbefalere er afhængige af at sammenligne brugere med andre brugere for at vurdere deres præferencer. Hvis datapraktikeren kun tager en delmængde af dataene, vil der ikke være en kohorte af brugere, der ligner hinanden meget. Anbefalinger skal køre på det fulde datasæt eller slet ikke.

Hvad er løsningen?

Løsningen har været tydelig i lang tid, del problemet op på flere computere. Parallel computing kommer også med flere problemer. Udviklere har ofte problemer med at skrive parallel kode og ender med at skulle løse en masse af de komplekse problemer omkring selve multi-processing.

Pyspark giver dataforskeren et API, der kan bruges til at løse problemerne med parallelle dataforløb. Pyspark håndterer kompleksiteten af ​​multiprocessing, såsom at distribuere data, distribuere kode og indsamle output fra arbejderne på en klynge af maskiner.

Spark kan køre selvstændigt, men kører oftest oven på en cluster computing framework såsom Hadoop. I test og udvikling kan en dataforsker dog effektivt køre Spark på deres udviklingsbokse eller bærbare computere uden en klynge

• En af de vigtigste fordele ved Spark er at bygge en arkitektur, der omfatter datastreaming management, problemfri dataforespørgsler, maskinlæringsforudsigelse og realtidsadgang til forskellige analyser.

• Spark arbejder tæt sammen med SQL-sprog, dvs. strukturerede data. Det giver mulighed for at forespørge data i realtid.

• Data scientists hovedopgave er at analysere og bygge prædiktive modeller. Kort sagt skal en dataforsker vide, hvordan man forespørger data ved hjælp af SQL, lav en statistisk rapport og gør brug af maskinlæring til at producere forudsigelser. Data scientist bruger en betydelig del af deres tid på at rense, transformere og analysere dataene. Når datasættet eller dataarbejdsgangen er klar, bruger dataforskeren forskellige teknikker til at opdage indsigt og skjulte mønstre. Datamanipulationen skal være robust og lige så nem at bruge. Spark er det rigtige værktøj takket være dets hastighed og rige API'er.

I denne PySpark tutorial, vil du lære, hvordan du bygger en klassificering med PySpark eksempler.

Sådan installeres PySpark med AWS

Jupyter team bygger et Docker-image til at køre Spark effektivt. Nedenfor er de trin, du kan følge for at installere PySpark instans i AWS.

Se vores tutorial på AWS og TensorFlow

Trin 1: Opret en instans

Først og fremmest skal du oprette en instans. Gå til din AWS-konto og start forekomsten. Du kan øge lagerpladsen op til 15g og bruge den samme sikkerhedsgruppe som i TensorFlow-tutorial.

Trin 2: Åbn forbindelsen

Åbn forbindelsen og installer docker-beholder. For flere detaljer henvises til selvstudiet med TensorFlow med Docker. Bemærk, at du skal være i den korrekte arbejdsmappe.

Kør blot disse koder for at installere Docker:

sudo yum update -y
sudo yum install -y docker
sudo service docker start
sudo user-mod -a -G docker ec2-user
exit

Trin 3: Genåbn forbindelsen og installer Spark

Når du har genåbnet forbindelsen, kan du installere billedet, der indeholder PySpark.

## Spark
docker run -v ~/work:/home/jovyan/work -d -p 8888:8888 jupyter/pyspark-notebook

## Allow preserving Jupyter notebook
sudo chown 1000 ~/work

## Install tree to see our working directory next
sudo yum install -y tree

Trin 4: Åben Jupyter

Tjek beholderen og dens navn

docker ps

Start docker med docker logs efterfulgt af navnet på docker. For eksempel logger docker zealous_goldwasser

Gå til din browser og start Jupyter. Adressen er http://localhost:8888/. Indsæt adgangskoden fra terminalen.

Bemærk: hvis du vil uploade/downloade en fil til din AWS-maskine, kan du bruge softwaren Cyberduck, https://cyberduck.io/.

Sådan installeres PySpark on Windows/Mac med Conda

Følgende er en detaljeret proces om, hvordan man installerer PySpark on Windows/Mac ved hjælp af Anaconda:

Sådan installerer du Spark på din lokale maskine er en anbefalet praksis at skabe et nyt conda-miljø. Dette nye miljø installeres Python 3.6, Spark og alle afhængigheder.

Mac -bruger

cd anaconda3
touch hello-spark.yml
vi hello-spark.yml

Windows Bruger

cd C:\Users\Admin\Anaconda3
echo.>hello-spark.yml
notepad hello-spark.yml

Du kan redigere .yml-filen. Vær forsigtig med indrykket. Der kræves to pladser før –

name: hello-spark 
    dependencies:
    
    - python=3.6
    - jupyter
    - ipython
    - numpy
    - numpy-base
    - pandas
    - py4j
    - pyspark
    - pytz

Gem det og skab miljøet. Det tager noget tid

conda env create -f hello-spark.yml

For flere detaljer om placeringen, se venligst selvstudiet Installer TensorFlow

Du kan kontrollere alt det miljø, der er installeret på din maskine

conda env list
Activate hello-spark

Mac -bruger

source activate hello-spark

Windows Bruger

activate hello-spark

Bemærk: Du har allerede oprettet et specifikt TensorFlow-miljø til at køre tutorials på TensorFlow. Det er mere bekvemt at skabe et nyt miljø anderledes end hello-tf. Det giver ingen mening at overbelaste hallo-tf med Spark eller andre maskinlæringsbiblioteker.

Forestil dig, at det meste af dit projekt involverer TensorFlow, men du skal bruge Spark til et bestemt projekt. Du kan indstille et TensorFlow-miljø for hele dit projekt og oprette et separat miljø for Spark. Du kan tilføje lige så mange biblioteker Spark miljø, som du ønsker uden at forstyrre TensorFlow-miljøet. Når du er færdig med Spark's projekt, kan du slette det uden at påvirke TensorFlow-miljøet.

Jupyter

Åbne Jupyter Notesbog og prøv om PySpark virker. Indsæt følgende Py i en ny notesbogSpark eksempelkode:

import pyspark
from pyspark import SparkContext
sc =SparkContext()

Hvis der vises en fejl, er det sandsynligt Java er ikke installeret på din maskine. I mac skal du åbne terminalen og skrive java -version, hvis der er en java version, så sørg for at den er 1.8. I Windows, gå til Application og tjek om der er en Java folder. Hvis der er en Java mappe, tjek det Java 1.8 er installeret. Mens dette skrives, PySpark er ikke kompatibel med Java9 og derover.

Hvis du har brug for at installere Java, du skal tænke link og download jdk-8u181-windows-x64.exe

Jupyter

For Mac-brugere anbefales det at bruge `brew.`

brew tap caskroom/versions
brew cask install java8

Se denne trinvise vejledning om hvordan du installerer Java

Bemærk: Brug fjern for at slette et miljø fuldstændigt.

 conda env remove -n hello-spark -y

Spark Kontekst

SparkKontekst er den interne motor, der tillader forbindelserne med klyngerne. Hvis du vil køre en operation, skal du bruge en SparkSammenhæng.

Opret en SparkKontekst

Først og fremmest skal du igangsætte en SparkSammenhæng.

import pyspark
from pyspark import SparkContext
sc =SparkContext()

Nu, hvor SparkKonteksten er klar, du kan oprette en samling af data kaldet RDD, Resilient Distributed Dataset. Beregning i en RDD paralleliseres automatisk på tværs af klyngen.

nums= sc.parallelize([1,2,3,4])

Du kan få adgang til den første række med take

nums.take(1)
[1]

Du kan anvende en transformation til dataene med en lambda-funktion. I PySpark eksempel nedenfor returnerer du kvadratet af tal. Det er en korttransformation

squared = nums.map(lambda x: x*x).collect()
for num in squared:
    print('%i ' % (num))
1 
4 
9 
16

SQLContext

En mere bekvem måde er at bruge DataFrame. SparkKonteksten er allerede indstillet, du kan bruge den til at oprette dataFrame. Du skal også erklære SQLContext

SQLContext gør det muligt at forbinde motoren med forskellige datakilder. Det bruges til at starte funktionaliteterne af Spark SQL

from pyspark.sql import Row
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

Nu i dette Spark tutorial Python, lad os oprette en liste over tuple. Hver tupel vil indeholde navnet på personerne og deres alder. Der kræves fire trin:

Trin 1) Opret listen over tuple med oplysningerne

[('John',19),('Smith',29),('Adam',35),('Henry',50)]

Trin 2) Byg en RDD

rdd = sc.parallelize(list_p)

Trin 3) Konverter tupler

rdd.map(lambda x: Row(name=x[0], age=int(x[1])))

Trin 4) Opret en DataFrame-kontekst

sqlContext.createDataFrame(ppl)
list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)]
rdd = sc.parallelize(list_p)
ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
DF_ppl = sqlContext.createDataFrame(ppl)

Hvis du vil have adgang til typen af ​​hver funktion, kan du bruge printSchema()

DF_ppl.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

Eksempel på maskinlæring med PySpark

Nu hvor du har en kort ide om Spark og SQLContext, er du klar til at bygge dit første maskinlæringsprogram.

Følgende er trinene til at bygge et Machine Learning-program med PySpark:

  • Trin 1) Grundlæggende betjening med PySpark
  • Trin 2) Dataforarbejdning
  • Trin 3) Byg en databehandlingspipeline
  • Trin 4) Byg klassificeringen: logistik
  • Trin 5) Træn og evaluer modellen
  • Trin 6) Indstil hyperparameteren

I denne PySpark Machine Learning tutorial, vi vil bruge voksendatasættet. Formålet med denne tutorial er at lære at bruge Pyspark. For mere information om datasættet henvises til denne vejledning.

Bemærk, at datasættet ikke er signifikant, og du tror måske, at beregningen tager lang tid. Spark er designet til at behandle en betydelig mængde data. Spark's ydeevne stiger i forhold til andre maskinlæringsbiblioteker, når det behandlede datasæt bliver større.

Trin 1) Grundlæggende betjening med PySpark

Først og fremmest skal du initialisere SQLContext er ikke allerede i initieret endnu.

#from pyspark.sql import SQLContext
url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"
from pyspark import SparkFiles
sc.addFile(url)
sqlContext = SQLContext(sc)

derefter kan du læse cvs-filen med sqlContext.read.csv. Du bruger inferSchema sat til True for at fortælle Spark for automatisk at gætte typen af ​​data. Som standard er den slået til Falsk.

df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)

Lad os tage et kig på datatypen

df.printSchema()
root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

Du kan se dataene med show.

df.show(5, truncate = False)
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
|age|workclass       |fnlwgt|education|education_num|marital           |occupation       |relationship |race |sex   |capital_gain|capital_loss|hours_week|native_country|label|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
|39 |State-gov       |77516 |Bachelors|13           |Never-married     |Adm-clerical     |Not-in-family|White|Male  |2174        |0           |40        |United-States |<=50K|
|50 |Self-emp-not-inc|83311 |Bachelors|13           |Married-civ-spouse|Exec-managerial  |Husband      |White|Male  |0           |0           |13        |United-States |<=50K|
|38 |Private         |215646|HS-grad  |9            |Divorced          |Handlers-cleaners|Not-in-family|White|Male  |0           |0           |40        |United-States |<=50K|
|53 |Private         |234721|11th     |7            |Married-civ-spouse|Handlers-cleaners|Husband      |Black|Male  |0           |0           |40        |United-States |<=50K|
|28 |Private         |338409|Bachelors|13           |Married-civ-spouse|Prof-specialty   |Wife         |Black|Female|0           |0           |40        |Cuba          |<=50K|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
only showing top 5 rows

Hvis du ikke har sat inderShema til True, her er hvad der sker med typen. Der er alle i snor.

df_string = sqlContext.read.csv(SparkFiles.get("adult.csv"), header=True, inferSchema=  False)
df_string.printSchema()
root
 |-- age: string (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: string (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: string (nullable = true)
 |-- capital_loss: string (nullable = true)
 |-- hours_week: string (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

For at konvertere den kontinuerlige variabel til det rigtige format, kan du bruge omstøbning af kolonnerne. Du kan bruge withColumn til at fortælle Spark hvilken kolonne der skal udføre transformationen.

# Import all from `sql.types`
from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 
# List of continuous features
CONTI_FEATURES  = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week']
# Convert the type
df_string = convertColumn(df_string, CONTI_FEATURES, FloatType())
# Check the dataset
df_string.printSchema()
root
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: float (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: float (nullable = true)
 |-- capital_loss: float (nullable = true)
 |-- hours_week: float (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

from pyspark.ml.feature import StringIndexer
#stringIndexer = StringIndexer(inputCol="label", outputCol="newlabel")
#model = stringIndexer.fit(df)
#df = model.transform(df)
df.printSchema()

Vælg kolonner

Du kan vælge og vise rækkerne med vælg og navnene på funktionerne. Nedenfor er alder og fnlwgt valgt.

df.select('age','fnlwgt').show(5)
+---+------+
|age|fnlwgt|
+---+------+
| 39| 77516|
| 50| 83311|
| 38|215646|
| 53|234721|
| 28|338409|
+---+------+
only showing top 5 rows

Tæl for gruppe

Hvis du vil tælle antallet af forekomster efter gruppe, kan du kæde:

  • groupBy()
  • tælle()

sammen. I PySpark eksempel nedenfor, tæller du antallet af rækker efter uddannelsesniveau.

df.groupBy("education").count().sort("count",ascending=True).show()
+------------+-----+
|   education|count|
+------------+-----+
|   Preschool|   51|
|     1st-4th|  168|
|     5th-6th|  333|
|   Doctorate|  413|
|        12th|  433|
|         9th|  514|
| Prof-school|  576|
|     7th-8th|  646|
|        10th|  933|
|  Assoc-acdm| 1067|
|        11th| 1175|
|   Assoc-voc| 1382|
|     Masters| 1723|
|   Bachelors| 5355|
|Some-college| 7291|
|     HS-grad|10501|
+------------+-----+

Beskriv dataene

For at få en opsummerende statistik over dataene kan du bruge describe(). Det vil beregne:

  • tælle
  • betyde
  • standardafvigelse
  • minut
  • max
df.describe().show()
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
|summary|               age|  workclass|            fnlwgt|   education|    education_num| marital|      occupation|relationship|              race|   sex|      capital_gain|    capital_loss|        hours_week|native_country|label|
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
|  count|             32561|      32561|             32561|       32561|            32561|   32561|           32561|       32561|             32561| 32561|             32561|           32561|             32561|         32561|32561|
|   mean| 38.58164675532078|       null|189778.36651208502|        null| 10.0806793403151|    null|            null|        null|              null|  null|1077.6488437087312| 87.303829734959|40.437455852092995|          null| null|
| stddev|13.640432553581356|       null|105549.97769702227|        null|2.572720332067397|    null|            null|        null|              null|  null| 7385.292084840354|402.960218649002|12.347428681731838|          null| null|
|    min|                17|          ?|             12285|        10th|                1|Divorced|               ?|     Husband|Amer-Indian-Eskimo|Female|                 0|               0|                 1|             ?|<=50K|
|    max|                90|Without-pay|           1484705|Some-college|               16| Widowed|Transport-moving|        Wife|             White|  Male|             99999|            4356|                99|    Yugoslavia| >50K|
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+

Hvis du kun vil have oversigtsstatistikken for én kolonne, skal du tilføje navnet på kolonnen inde i describe()

df.describe('capital_gain').show()
+-------+------------------+
|summary|      capital_gain|
+-------+------------------+
|  count|             32561|
|   mean|1077.6488437087312|
| stddev| 7385.292084840354|
|    min|                 0|
|    max|             99999|
+-------+------------------+

Krydstabelberegning

I nogle tilfælde kan det være interessant at se den beskrivende statistik mellem to parvise kolonner. For eksempel kan du tælle antallet af personer med indkomst under eller over 50k efter uddannelsesniveau. Denne operation kaldes en krydstabel.

df.crosstab('age', 'label').sort("age_label").show()
+---------+-----+----+
|age_label|<=50K|>50K|
+---------+-----+----+
|       17|  395|   0|
|       18|  550|   0|
|       19|  710|   2|
|       20|  753|   0|
|       21|  717|   3|
|       22|  752|  13|
|       23|  865|  12|
|       24|  767|  31|
|       25|  788|  53|
|       26|  722|  63|
|       27|  754|  81|
|       28|  748| 119|
|       29|  679| 134|
|       30|  690| 171|
|       31|  705| 183|
|       32|  639| 189|
|       33|  684| 191|
|       34|  643| 243|
|       35|  659| 217|
|       36|  635| 263|
+---------+-----+----+
only showing top 20 rows

Du kan se, at ingen mennesker har en omsætning over 50, når de er unge.

Drop kolonne

Der er to intuitive API til at slippe kolonner:

  • drop(): Slip en kolonne
  • dropna(): Drop NA'er

Herunder slipper du kolonnen uddannelse_num

df.drop('education_num').columns

['age',
 'workclass',
 'fnlwgt',
 'education',
 'marital',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital_gain',
 'capital_loss',
 'hours_week',
 'native_country',
 'label']

Filtrer data

Du kan bruge filter() til at anvende beskrivende statistik i en delmængde af data. For eksempel kan du tælle antallet af personer over 40 år

df.filter(df.age > 40).count()

13443

Descriptive statistik efter gruppe

Endelig kan du gruppere data efter gruppe og beregne statistiske operationer som gennemsnittet.

df.groupby('marital').agg({'capital_gain': 'mean'}).show()
+--------------------+------------------+
|             marital| avg(capital_gain)|
+--------------------+------------------+
|           Separated| 535.5687804878049|
|       Never-married|376.58831788823363|
|Married-spouse-ab...| 653.9832535885167|
|            Divorced| 728.4148098131893|
|             Widowed| 571.0715005035247|
|   Married-AF-spouse| 432.6521739130435|
|  Married-civ-spouse|1764.8595085470085|
+--------------------+------------------+

Trin 2) Dataforbehandling

Databehandling er et kritisk trin i maskinlæring. Når du har fjernet skralddata, får du nogle vigtige indsigter.

For eksempel ved du, at alder ikke er en lineær funktion med indkomsten. Når folk er unge, er deres indkomst normalt lavere end midaldrende. Efter pensionering bruger en husstand deres opsparing, hvilket betyder et fald i indkomsten. For at fange dette mønster kan du tilføje en firkant til aldersfunktionen

Tilføj aldersfirkant

For at tilføje en ny funktion skal du:

  1. Vælg kolonnen
  2. Anvend transformationen og tilføj den til DataFrame
from pyspark.sql.functions import *

# 1 Select the column
age_square = df.select(col("age")**2)

# 2 Apply the transformation and add it to the DataFrame
df = df.withColumn("age_square", col("age")**2)

df.printSchema()
root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)
 |-- age_square: double (nullable = true)

Du kan se, at age_square er blevet tilføjet til datarammen. Du kan ændre rækkefølgen af ​​variablerne med select. Nedenfor bringer du age_square lige efter alder.

COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital',
           'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss',
           'hours_week', 'native_country', 'label']
df = df.select(COLUMNS)
df.first()
Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K')

Udelad Holand-Holland

Når en gruppe i en funktion kun har én observation, bringer den ingen information til modellen. Tværtimod kan det føre til en fejl under krydsvalideringen.

Lad os tjekke husstandens oprindelse

df.filter(df.native_country == 'Holand-Netherlands').count()
df.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show()
+--------------------+---------------------+
|      native_country|count(native_country)|
+--------------------+---------------------+
|  Holand-Netherlands|                    1|
|            Scotland|                   12|
|             Hungary|                   13|
|            Honduras|                   13|
|Outlying-US(Guam-...|                   14|
|          Yugoslavia|                   16|
|            Thailand|                   18|
|                Laos|                   18|
|            Cambodia|                   19|
|     Trinadad&Tobago|                   19|
|                Hong|                   20|
|             Ireland|                   24|
|             Ecuador|                   28|
|              Greece|                   29|
|              France|                   29|
|                Peru|                   31|
|           Nicaragua|                   34|
|            Portugal|                   37|
|                Iran|                   43|
|               Haiti|                   44|
+--------------------+---------------------+
only showing top 20 rows

Funktionen native_country har kun én husstand, der kommer fra Holland. Du udelukker det.

df_remove = df.filter(df.native_country !=	'Holand-Netherlands')

Trin 3) Byg en databehandlingspipeline

I lighed med scikit-learn har Pyspark en pipeline API.

En pipeline er meget praktisk til at vedligeholde strukturen af ​​dataene. Du skubber dataene ind i pipelinen. Inde i rørledningen udføres forskellige operationer, outputtet bruges til at føde algoritmen.

For eksempel består én universel transformation i maskinlæring i at konvertere en streng til én hot encoder, dvs. én kolonne af en gruppe. En varm encoder er normalt en matrix fuld af nuller.

Trinene til at transformere dataene ligner meget scikit-learn. Du skal:

  • Indeks strengen til numerisk
  • Opret den ene hot encoder
  • Transformér dataene

To API'er gør jobbet: StringIndexer, OneHotEncoder

  1. Først og fremmest vælger du strengkolonnen, der skal indekseres. InputCol er navnet på kolonnen i datasættet. outputCol er det nye navn givet til den transformerede kolonne.
StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
  1. Tilpas dataene og transformer dem
model = stringIndexer.fit(df)		
`indexed = model.transform(df)``
  1. Opret nyhedskolonner baseret på gruppen. For eksempel, hvis der er 10 grupper i funktionen, vil den nye matrix have 10 kolonner, en for hver gruppe.
OneHotEncoder(dropLast=False, inputCol="workclassencoded", outputCol="workclassvec")
### Example encoder
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec")
encoded = encoder.transform(indexed)
encoded.show(2)
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
|age|age_square|       workclass|fnlwgt|education|education_num|           marital|     occupation| relationship| race| sex|capital_gain|capital_loss|hours_week|native_country|label|workclass_encoded|workclass_vec|
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
| 39|    1521.0|       State-gov| 77516|Bachelors|           13|     Never-married|   Adm-clerical|Not-in-family|White|Male|        2174|           0|        40| United-States|<=50K|              4.0|(9,[4],[1.0])|
| 50|    2500.0|Self-emp-not-inc| 83311|Bachelors|           13|Married-civ-spouse|Exec-managerial|      Husband|White|Male|           0|           0|        13| United-States|<=50K|              1.0|(9,[1],[1.0])|
+---+----------+----------------+------+---------+-------------+------------------+---------------+-------------+-----+----+------------+------------+----------+--------------+-----+-----------------+-------------+
only showing top 2 rows

Byg rørledningen

Du vil bygge en pipeline for at konvertere alle de præcise funktioner og tilføje dem til det endelige datasæt. Rørledningen vil have fire operationer, men du er velkommen til at tilføje så mange operationer, du vil.

  1. Indkode de kategoriske data
  2. Indekser etiketfunktionen
  3. Tilføj kontinuerlig variabel
  4. Saml trinene.

Hvert trin er gemt i en liste med navne stadier. Denne liste fortæller VectorAssembler, hvilken operation der skal udføres inde i pipelinen.

1. Indkod de kategoriske data

Dette trin er nøjagtigt det samme som ovenstående eksempel, bortset fra at du går over alle de kategoriske funktioner.

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator
CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country']
stages = [] # stages in our Pipeline
for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()],
                                     outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

2. Indekser etiketfunktionen

Spark, ligesom mange andre biblioteker, accepterer ikke strengværdier for etiketten. Du konverterer etiketfunktionen med StringIndexer og tilføjer den til listestadierne

# Convert label into label indices using the StringIndexer
label_stringIdx =  StringIndexer(inputCol="label", outputCol="newlabel")
stages += [label_stringIdx]

3. Tilføj kontinuerlig variabel

VectorAssemblerens inputCols er en liste over kolonner. Du kan oprette en ny liste med alle de nye kolonner. Koden nedenfor udfylder listen med kodede kategoriske funktioner og de kontinuerlige funktioner.

assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES

4. Saml trinene.

Til sidst består du alle trinene i VectorAssembler

assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")stages += [assembler]

Nu hvor alle trin er klar, skubber du dataene til pipelinen.

# Create a Pipeline.
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df_remove)
model = pipelineModel.transform(df_remove)

Hvis du tjekker det nye datasæt, kan du se, at det indeholder alle funktionerne, transformeret og ikke transformeret. Du er kun interesseret i det nye mærke og funktioner. Funktionerne inkluderer alle de transformerede funktioner og de kontinuerlige variable.

model.take(1)

[Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K', workclassIndex=4.0, workclassclassVec=SparseVector(8, {4: 1.0}), educationIndex=2.0, educationclassVec=SparseVector(15, {2: 1.0}), maritalIndex=1.0, maritalclassVec=SparseVector(6, {1: 1.0}), occupationIndex=3.0, occupationclassVec=SparseVector(14, {3: 1.0}), relationshipIndex=1.0, relationshipclassVec=SparseVector(5, {1: 1.0}), raceIndex=0.0, raceclassVec=SparseVector(4, {0: 1.0}), sexIndex=0.0, sexclassVec=SparseVector(1, {0: 1.0}), native_countryIndex=0.0, native_countryclassVec=SparseVector(40, {0: 1.0}), newlabel=0.0, features=SparseVector(99, {4: 1.0, 10: 1.0, 24: 1.0, 32: 1.0, 44: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 93: 39.0, 94: 77516.0, 95: 2174.0, 96: 13.0, 98: 40.0}))]

Trin 4) Byg klassificeringen: logistisk

For at gøre beregningen hurtigere, konverterer du model til en DataFrame.

Du skal vælge ny etiket og funktioner fra modellen ved hjælp af kortet.

from pyspark.ml.linalg import DenseVector
input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))

Du er klar til at oprette togdataene som en DataFrame. Du bruger sqlContext

df_train = sqlContext.createDataFrame(input_data, ["label", "features"])

Tjek den anden række

df_train.show(2)
+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,1.0,0.0,0.0,...|
+-----+--------------------+
only showing top 2 rows

Lav et tog/testsæt

Du deler datasættet 80/20 med randomSplit.

# Split the data into train and test sets
train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)

Lad os tælle hvor mange personer med indkomst under/over 50k i både træning og testsæt

train_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|       19698|
|  1.0|        6263|
+-----+------------+
test_data.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        5021|
|  1.0|        1578|
+-----+------------+

Byg den logistiske regressor

Sidst, men ikke mindst, kan du bygge klassificeringen. Pyspark har et API kaldet LogisticRegression til at udføre logistisk regression.

Du initialiserer lr ved at angive etiketkolonnen og funktionskolonner. Du indstiller maksimalt 10 iterationer og tilføjer en regulariseringsparameter med en værdi på 0.3. Bemærk, at du i næste afsnit vil bruge krydsvalidering med et parametergitter til at tune modellen

# Import `LinearRegression`
from pyspark.ml.classification import LogisticRegression

# Initialize `lr`
lr = LogisticRegression(labelCol="label",
                        featuresCol="features",
                        maxIter=10,
                        regParam=0.3)

# Fit the data to the model
linearModel = lr.fit(train_data)

#Du kan se koefficienterne fra regressionen

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(linearModel.coefficients))
print("Intercept: " + str(linearModel.intercept))
Coefficients: [-0.0678914665262,-0.153425526813,-0.0706009536407,-0.164057586562,-0.120655298528,0.162922330862,0.149176870438,-0.626836362611,-0.193483661541,-0.0782269980838,0.222667203836,0.399571096381,-0.0222024341804,-0.311925857859,-0.0434497788688,-0.306007744328,-0.41318209688,0.547937504247,-0.395837350854,-0.23166535958,0.618743906733,-0.344088614546,-0.385266881369,0.317324463006,-0.350518889186,-0.201335923138,-0.232878560088,-0.13349278865,-0.119760542498,0.17500602491,-0.0480968101118,0.288484253943,-0.116314616745,0.0524163478063,-0.300952624551,-0.22046421474,-0.16557996579,-0.114676231939,-0.311966431453,-0.344226119233,0.105530129507,0.152243047814,-0.292774545497,0.263628334433,-0.199951374076,-0.30329422583,-0.231087515178,0.418918551,-0.0565930184279,-0.177818073048,-0.0733236680663,-0.267972912252,0.168491215697,-0.12181255723,-0.385648075442,-0.202101794517,0.0469791640782,-0.00842850210625,-0.00373211448629,-0.259296141281,-0.309896554133,-0.168434409756,-0.11048086026,0.0280647963877,-0.204187030092,-0.414392623536,-0.252806580669,0.143366465705,-0.516359222663,-0.435627370849,-0.301949286524,0.0878249035894,-0.210951740965,-0.621417928742,-0.099445190784,-0.232671473401,-0.1077745606,-0.360429419703,-0.420362959052,-0.379729467809,-0.395186242741,0.0826401853838,-0.280251589972,0.187313505214,-0.20295228799,-0.431177064626,0.149759018379,-0.107114299614,-0.319314858424,0.0028450133235,-0.651220387649,-0.327918792207,-0.143659581445,0.00691075160413,8.38517628783e-08,2.18856717378e-05,0.0266701216268,0.000231075966823,0.00893832698698]
Intercept: -1.9884177974805692

Trin 5) Træn og evaluer modellen

For at generere forudsigelse for dit testsæt,

Du kan bruge linearModel med transform() på test_data

# Make predictions on test data using the transform() method.
predictions = linearModel.transform(test_data)

Du kan udskrive elementerne i forudsigelser

predictions.printSchema()
root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)

Du er interesseret i etiketten, forudsigelsen og sandsynligheden

selected = predictions.select("label", "prediction", "probability")
selected.show(20)
+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|  0.0|       0.0|[0.91560704124179...|
|  0.0|       0.0|[0.92812140213994...|
|  0.0|       0.0|[0.92161406774159...|
|  0.0|       0.0|[0.96222760777142...|
|  0.0|       0.0|[0.66363283056957...|
|  0.0|       0.0|[0.65571324475477...|
|  0.0|       0.0|[0.73053376932829...|
|  0.0|       1.0|[0.31265053873570...|
|  0.0|       0.0|[0.80005907577390...|
|  0.0|       0.0|[0.76482251301640...|
|  0.0|       0.0|[0.84447301189069...|
|  0.0|       0.0|[0.75691912026619...|
|  0.0|       0.0|[0.60902504096722...|
|  0.0|       0.0|[0.80799228385509...|
|  0.0|       0.0|[0.87704364852567...|
|  0.0|       0.0|[0.83817652582377...|
|  0.0|       0.0|[0.79655423248500...|
|  0.0|       0.0|[0.82712311232246...|
|  0.0|       0.0|[0.81372823882016...|
|  0.0|       0.0|[0.59687710752201...|
+-----+----------+--------------------+
only showing top 20 rows

Vurder modellen

Du skal se på nøjagtighedsmetrikken for at se, hvor godt (eller dårligt) modellen klarer sig. I øjeblikket er der ingen API til at beregne nøjagtighedsmålet i Spark. Standardværdien er ROC, modtagerens driftskarakteristikkurve. Det er en anden metrik, der tager højde for den falske positive rate.

Før du ser på ROC'en, lad os konstruere nøjagtighedsmålet. Du er mere bekendt med denne metric. Nøjagtighedsmålet er summen af ​​den korrekte forudsigelse over det samlede antal observationer.

Du opretter en DataFrame med etiketten og `forudsigelsen.

cm = predictions.select("label", "prediction")

Du kan tjekke antallet af klasse i etiketten og forudsigelsen

cm.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        5021|
|  1.0|        1578|
+-----+------------+
cm.groupby('prediction').agg({'prediction': 'count'}).show()
+----------+-----------------+
|prediction|count(prediction)|
+----------+-----------------+
|       0.0|             5982|
|       1.0|              617|
+----------+-----------------+

For eksempel er der i testsættet 1578 husstande med en indkomst over 50k og 5021 under. Klassificeringen forudsagde dog 617 husstande med indkomst over 50.

Du kan beregne nøjagtigheden ved at beregne antallet, når etiketten er korrekt klassificeret over det samlede antal rækker.

cm.filter(cm.label == cm.prediction).count() / cm.count()

0.8237611759357478

Du kan pakke alt sammen og skrive en funktion for at beregne nøjagtigheden.

def accuracy_m(model): 
    predictions = model.transform(test_data)
    cm = predictions.select("label", "prediction")
    acc = cm.filter(cm.label == cm.prediction).count() / cm.count()
    print("Model accuracy: %.3f%%" % (acc * 100)) 
accuracy_m(model = linearModel)
Model accuracy: 82.376%

ROC-målinger

Modulet BinaryClassificationEvaluator inkluderer ROC-målene. Modtageren Operating Karakteristisk kurve er et andet almindeligt værktøj, der bruges med binær klassifikation. Den minder meget om præcision/genkaldelseskurven, men i stedet for at plotte præcision versus genkald, viser ROC-kurven den sande positive rate (dvs. tilbagekaldelse) mod den falske positive rate. Den falske positive rate er forholdet mellem negative tilfælde, der er forkert klassificeret som positive. Det er lig med én minus den sande negative kurs. Den sande negative rate kaldes også specificitet. Derfor plotter ROC-kurven sensitivitet (genkaldelse) versus 1 - specificitet

### Use ROC 
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print(evaluator.evaluate(predictions))
print(evaluator.getMetricName())

0.8940481662695192områdeUnderROC

print(evaluator.evaluate(predictions))

0.8940481662695192

Trin 6) Indstil hyperparameteren

Sidst, men ikke mindst, kan du justere hyperparametrene. Svarende til scikit lære du opretter et parametergitter, og du tilføjer de parametre, du vil justere.

For at reducere beregningstiden justerer du kun regulariseringsparameteren med kun to værdier.

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5])
             .build())

Til sidst vurderer du modellen med brug af krydsvalieringsmetoden med 5 fold. Det tager omkring 16 minutter at træne.

from time import *
start_time = time()

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train_data)
# likely take a fair amount of time
end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

Tid til at træne model: 978.807 sekunder

Den bedste regulariseringshyperparameter er 0.01 med en nøjagtighed på 85.316 procent.

accuracy_m(model = cvModel)
Model accuracy: 85.316%

Du kan udtrække den anbefalede parameter ved at sammenkæde cvModel.bestModel med extractParamMap()

bestModel = cvModel.bestModel
bestModel.extractParamMap()
{Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.0,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='featuresCol', doc='features column name'): 'features',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='fitIntercept', doc='whether to fit an intercept term'): True,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='labelCol', doc='label column name'): 'label',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='maxIter', doc='maximum number of iterations (>= 0)'): 10,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='predictionCol', doc='prediction column name'): 'prediction',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='probabilityCol', doc='Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities'): 'probability',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='rawPredictionCol', doc='raw prediction (a.k.a. confidence) column name'): 'rawPrediction',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='regParam', doc='regularization parameter (>= 0)'): 0.01,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='standardization', doc='whether to standardize the training features before fitting the model'): True,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='threshold', doc='threshold in binary classification prediction, in range [0, 1]'): 0.5,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0)'): 1e-06}

Resumé

Spark er et grundlæggende værktøj for en dataforsker. Det giver praktikeren mulighed for at forbinde en app til forskellige datakilder, udføre dataanalyse problemfrit eller tilføje en forudsigelig model.

Til at starte med Spark, skal du igangsætte en Spark Kontekst med:

'SparkSammenhæng()'

Og og SQL kontekst for at oprette forbindelse til en datakilde:

'SQLContext()'

I selvstudiet lærer du, hvordan du træner en logistisk regression:

  1. Konverter datasættet til en dataramme med:
rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))
sqlContext.createDataFrame(input_data, ["label", "features"])

Bemærk, at etikettens kolonnenavn er newlabel, og alle funktionerne er samlet i funktioner. Skift disse værdier, hvis de er forskellige i dit datasæt.

  1. Opret toget/testsættet
randomSplit([.8,.2],seed=1234)
  1. Træn modellen
LogisticRegression(labelCol="label",featuresCol="features",maxIter=10, regParam=0.3)
lr.fit()
  1. Lav forudsigelse
linearModel.transform()

Opsummer dette indlæg med: